package com.atlassian.jira.plugins.dvcs.event;

import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.plugins.dvcs.model.Repository;
import com.atlassian.plugin.spring.scanner.annotation.imports.ComponentImport;
import com.atlassian.util.concurrent.ThreadFactories;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.Immutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/atlassian/jira/plugins/dvcs/event/EventServiceImpl.class */
public class EventServiceImpl implements EventService {
    private static final Logger logger = LoggerFactory.getLogger(EventServiceImpl.class);
    private static final int DESTROY_TIMEOUT_SECS = 10;
    private final EventLimiterFactory eventLimiterFactory;
    private final EventPublisher eventPublisher;
    private final SyncEventDao syncEventDao;
    private final ThreadPoolExecutor eventDispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    @Immutable
    /* loaded from: input_file:com/atlassian/jira/plugins/dvcs/event/EventServiceImpl$DispatchRequest.class */
    public static class DispatchRequest {
        private final int repoId;
        private final String repoToString;

        public DispatchRequest(@Nonnull Repository repository) {
            this(repository.getId(), repository.toString());
        }

        private DispatchRequest(int i, String str) {
            this.repoId = i;
            this.repoToString = str;
        }

        public int repoId() {
            return this.repoId;
        }

        public String toString() {
            return String.format("Repository[%s]", this.repoToString);
        }
    }

    @Autowired
    public EventServiceImpl(@ComponentImport EventPublisher eventPublisher, EventLimiterFactory eventLimiterFactory, SyncEventDao syncEventDao) {
        this(eventPublisher, syncEventDao, eventLimiterFactory, createEventDispatcher());
    }

    @VisibleForTesting
    EventServiceImpl(@Nonnull EventPublisher eventPublisher, @Nonnull SyncEventDao syncEventDao, @Nonnull EventLimiterFactory eventLimiterFactory, @Nonnull ThreadPoolExecutor threadPoolExecutor) {
        this.eventPublisher = (EventPublisher) Preconditions.checkNotNull(eventPublisher);
        this.syncEventDao = (SyncEventDao) Preconditions.checkNotNull(syncEventDao);
        this.eventLimiterFactory = (EventLimiterFactory) Preconditions.checkNotNull(eventLimiterFactory);
        this.eventDispatcher = (ThreadPoolExecutor) Preconditions.checkNotNull(threadPoolExecutor);
    }

    private static ThreadPoolExecutor createEventDispatcher() {
        return ThreadPoolUtil.newSingleThreadExecutor(ThreadFactories.named("DVCSConnector.EventService").type(ThreadFactories.Type.DAEMON).build());
    }

    @Override // com.atlassian.jira.plugins.dvcs.event.EventService
    public void storeEvent(ContextAwareSyncEvent contextAwareSyncEvent) {
        this.syncEventDao.save(contextAwareSyncEvent);
    }

    @Override // com.atlassian.jira.plugins.dvcs.event.EventService
    public void dispatchEvents(Repository repository) {
        dispatch(new DispatchRequest(repository));
    }

    @Override // com.atlassian.jira.plugins.dvcs.event.EventService
    public void dispatchEvents(int i) {
        doDispatchEvents(new DispatchRequest(i, "repository with id " + i));
    }

    private void dispatch(DispatchRequest dispatchRequest) {
        this.eventDispatcher.submit(() -> {
            try {
                doDispatchEvents(dispatchRequest);
                return null;
            } catch (RuntimeException e) {
                logger.error("Error dispatching events for: " + dispatchRequest, e);
                throw new RuntimeException(e);
            }
        });
    }

    @PreDestroy
    public void destroy() {
        destroyEventDispatcher();
    }

    private void doDispatchEvents(DispatchRequest dispatchRequest) {
        EventLimiter create = this.eventLimiterFactory.create();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.syncEventDao.foreachByRepoId(dispatchRequest.repoId(), contextAwareSyncEvent -> {
            if (Thread.interrupted()) {
                logger.error("Thread interrupted after dispatching {} events for: {}", Integer.valueOf(atomicInteger.get()), dispatchRequest);
                Thread.currentThread().interrupt();
                return;
            }
            try {
                SyncEvent syncEvent = contextAwareSyncEvent.getSyncEvent();
                if (create.isLimitExceeded(syncEvent, contextAwareSyncEvent.scheduledSync())) {
                    logger.debug("Limit exceeded, dropping event for repository {}: {}", dispatchRequest, syncEvent);
                    this.syncEventDao.delete(contextAwareSyncEvent);
                } else {
                    logger.debug("Publishing event for repository {}: {}", dispatchRequest, syncEvent);
                    this.eventPublisher.publish(syncEvent);
                    atomicInteger.incrementAndGet();
                    this.syncEventDao.delete(contextAwareSyncEvent);
                }
            } catch (Throwable th) {
                this.syncEventDao.delete(contextAwareSyncEvent);
                throw th;
            }
        });
        int limitExceededCount = create.getLimitExceededCount();
        if (limitExceededCount > 0) {
            logger.info("Event limit exceeded for {}. Dropped {} subsequent events.", dispatchRequest, Integer.valueOf(limitExceededCount));
            this.eventPublisher.publish(new LimitExceededEvent(limitExceededCount));
        }
    }

    @Override // com.atlassian.jira.plugins.dvcs.event.EventService
    public void discardEvents(Repository repository) {
        logger.debug("Deleted {} events from repo: {}", Long.valueOf(this.syncEventDao.deleteAll(repository.getId())), repository);
    }

    private void destroyEventDispatcher() {
        this.eventDispatcher.shutdown();
        this.eventDispatcher.getQueue().clear();
        try {
            if (!this.eventDispatcher.awaitTermination(10L, TimeUnit.SECONDS)) {
                logger.error("ExecutorService did not shut down within {}s", 10);
            }
        } catch (InterruptedException e) {
            logger.error("Interrupted while waiting for ExecutorService to shut down.");
            Thread.currentThread().interrupt();
        }
    }
}
