package com.atlassian.jira.plugins.dvcs.service;

import com.atlassian.jira.plugins.dvcs.activeobjects.v3.MessageMapping;
import com.atlassian.jira.plugins.dvcs.activeobjects.v3.MessageQueueItemMapping;
import com.atlassian.jira.plugins.dvcs.dao.MessageDao;
import com.atlassian.jira.plugins.dvcs.dao.MessageQueueItemDao;
import com.atlassian.jira.plugins.dvcs.dao.SyncAuditLogDao;
import com.atlassian.jira.plugins.dvcs.event.CarefulEventService;
import com.atlassian.jira.plugins.dvcs.exception.SourceControlException;
import com.atlassian.jira.plugins.dvcs.model.DiscardReason;
import com.atlassian.jira.plugins.dvcs.model.Message;
import com.atlassian.jira.plugins.dvcs.model.MessageId;
import com.atlassian.jira.plugins.dvcs.model.MessageQueueItem;
import com.atlassian.jira.plugins.dvcs.model.MessageState;
import com.atlassian.jira.plugins.dvcs.model.Progress;
import com.atlassian.jira.plugins.dvcs.model.Repository;
import com.atlassian.jira.plugins.dvcs.service.message.HasProgress;
import com.atlassian.jira.plugins.dvcs.service.message.MessageAddress;
import com.atlassian.jira.plugins.dvcs.service.message.MessageAddressService;
import com.atlassian.jira.plugins.dvcs.service.message.MessageConsumer;
import com.atlassian.jira.plugins.dvcs.service.message.MessagePayloadSerializer;
import com.atlassian.jira.plugins.dvcs.service.message.MessagingService;
import com.atlassian.jira.plugins.dvcs.spi.bitbucket.clientlibrary.request.HttpClientProvider;
import com.atlassian.jira.plugins.dvcs.sync.SynchronizationFlag;
import com.atlassian.jira.plugins.dvcs.sync.Synchronizer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/atlassian/jira/plugins/dvcs/service/MessagingServiceImpl.class */
public class MessagingServiceImpl implements MessagingService {

    @VisibleForTesting
    static final String SYNCHRONIZATION_REPO_TAG_PREFIX = "synchronization-repository-";
    private static final String SYNCHRONIZATION_AUDIT_TAG_PREFIX = "audit-id-";
    private static final Logger log = LoggerFactory.getLogger(MessagingServiceImpl.class);
    private final ConcurrentMap<String, List<MessageConsumer<?>>> addressToMessageConsumer = new ConcurrentHashMap();
    private final Set<String> pausedTags = new CopyOnWriteArraySet();
    private final Map<Class<?>, MessagePayloadSerializer<?>> payloadTypeToPayloadSerializer = new ConcurrentHashMap();
    private final ConcurrentMap<String, MessageConsumer<?>> queueToMessageConsumer = new ConcurrentHashMap();

    @Resource
    protected ChangesetService changesetService;

    @Resource
    private MessageConsumer<?>[] consumers;

    @Resource
    private CarefulEventService eventService;

    @Resource
    private HttpClientProvider httpClientProvider;

    @Resource
    private LinkerService linkerService;

    @Resource
    private MessageAddressService messageAddressService;

    @Resource
    private MessageConsumer<?>[] messageConsumers;

    @Resource
    private MessageDao messageDao;

    @Resource
    private MessageExecutor messageExecutor;

    @Resource(name = "MessageQueueItemDaoQueryDsl")
    private MessageQueueItemDao messageQueueItemDao;

    @Resource
    private MessagePayloadSerializer<?>[] payloadSerializers;

    @Resource
    private RepositoryService repositoryService;

    @Resource
    private SyncAuditLogDao syncAudit;

    @Resource
    private Synchronizer synchronizer;

    private static boolean hasErrors(@Nullable Progress progress) {
        return (progress == null || progress.getError() == null) ? false : true;
    }

    @PostConstruct
    public void init() {
        for (MessageConsumer<?> messageConsumer : this.messageConsumers) {
            this.queueToMessageConsumer.putIfAbsent(messageConsumer.getQueue(), messageConsumer);
            List<MessageConsumer<?>> list = this.addressToMessageConsumer.get(messageConsumer.getAddress().getId());
            if (list == null) {
                ConcurrentMap<String, List<MessageConsumer<?>>> concurrentMap = this.addressToMessageConsumer;
                String id = messageConsumer.getAddress().getId();
                CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                list = copyOnWriteArrayList;
                concurrentMap.putIfAbsent(id, copyOnWriteArrayList);
            }
            list.add(messageConsumer);
        }
        for (MessagePayloadSerializer<?> messagePayloadSerializer : this.payloadSerializers) {
            this.payloadTypeToPayloadSerializer.put(messagePayloadSerializer.getPayloadType(), messagePayloadSerializer);
        }
    }

    private void initRunningToFail() {
        log.debug("Setting messages in running state to fail");
        this.messageQueueItemDao.getByState(MessageState.RUNNING, num -> {
            MessageQueueItem orElseThrow = this.messageQueueItemDao.getQueueItemById(num.intValue()).orElseThrow(IllegalStateException::new);
            fail(this.queueToMessageConsumer.get(orElseThrow.getQueue()), orElseThrow.getMessage(), new RuntimeException("Synchronization has been interrupted (probably plugin un/re/install)."));
        });
    }

    private void restartConsumers() {
        log.debug("Restarting message consumers");
        HashSet hashSet = new HashSet();
        for (MessageConsumer<?> messageConsumer : this.consumers) {
            hashSet.add(messageConsumer.getAddress().getId());
        }
        MessageExecutor messageExecutor = this.messageExecutor;
        messageExecutor.getClass();
        hashSet.forEach(messageExecutor::notify);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void publish(MessageAddress<P> messageAddress, P p, String... strArr) {
        publish(messageAddress, p, 0, strArr);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void publish(MessageAddress<P> messageAddress, P p, int i, String... strArr) {
        MessageState messageState = MessageState.PENDING;
        int length = strArr.length;
        int i2 = 0;
        while (true) {
            if (i2 >= length) {
                break;
            }
            if (this.pausedTags.contains(strArr[i2])) {
                messageState = MessageState.SLEEPING;
                break;
            }
            i2++;
        }
        MessagePayloadSerializer<?> messagePayloadSerializer = this.payloadTypeToPayloadSerializer.get(p.getClass());
        Message<P> message = new Message<>();
        message.setAddress(messageAddress);
        message.setPayload(messagePayloadSerializer.serialize(p));
        message.setPayloadType(messageAddress.getPayloadType());
        message.setTags(strArr);
        message.setPriority(i);
        createMessage(message, messageState, strArr);
        this.messageExecutor.notify(messageAddress.getId());
    }

    protected <P extends HasProgress> void createMessage(Message<P> message, MessageState messageState, String... strArr) {
        Message create = this.messageDao.create(toMessageMap(message), strArr);
        Iterator<MessageConsumer<?>> it = this.addressToMessageConsumer.get(message.getAddress().getId()).iterator();
        while (it.hasNext()) {
            this.messageQueueItemDao.create(messageQueueItemToMap(create.getId().intValue(), it.next().getQueue(), messageState, null));
        }
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public void pause(Repository repository) {
        String tagForSynchronization = getTagForSynchronization(repository);
        this.pausedTags.add(tagForSynchronization);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        this.messageDao.getByTag(tagForSynchronization, messageId -> {
            Stream.of((Object[]) this.messageQueueItemDao.getByMessageId(messageId)).filter(messageQueueItem -> {
                return !MessageState.RUNNING.name().equals(messageQueueItem.getState());
            }).forEach(messageQueueItem2 -> {
                messageQueueItem2.setState(MessageState.SLEEPING.name());
                this.messageQueueItemDao.save(messageQueueItem2);
            });
            int synchronizationAuditIdFromTags = getSynchronizationAuditIdFromTags(this.messageDao.getTags(messageId));
            if (synchronizationAuditIdFromTags != 0) {
                linkedHashSet.add(Integer.valueOf(synchronizationAuditIdFromTags));
            }
        });
        SyncAuditLogDao syncAuditLogDao = this.syncAudit;
        syncAuditLogDao.getClass();
        linkedHashSet.forEach((v1) -> {
            r1.pause(v1);
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> P deserializePayload(Message<P> message) {
        return (P) this.payloadTypeToPayloadSerializer.get(message.getPayloadType()).deserialize(message);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public void resume(Repository repository) {
        String tagForSynchronization = getTagForSynchronization(repository);
        this.pausedTags.remove(tagForSynchronization);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        this.messageQueueItemDao.getByTagAndState(tagForSynchronization, MessageState.SLEEPING, num -> {
            MessageQueueItem orElseThrow = this.messageQueueItemDao.getQueueItemById(num.intValue()).orElseThrow(IllegalStateException::new);
            orElseThrow.setState(MessageState.PENDING.name());
            this.messageQueueItemDao.save(orElseThrow);
            hashSet.add(orElseThrow.getMessage().getAddress().getId());
            int synchronizationAuditIdFromTags = getSynchronizationAuditIdFromTags(orElseThrow.getMessage().getTags());
            if (synchronizationAuditIdFromTags != 0) {
                hashSet2.add(Integer.valueOf(synchronizationAuditIdFromTags));
            }
        });
        MessageExecutor messageExecutor = this.messageExecutor;
        messageExecutor.getClass();
        hashSet.forEach(messageExecutor::notify);
        SyncAuditLogDao syncAuditLogDao = this.syncAudit;
        syncAuditLogDao.getClass();
        hashSet2.forEach((v1) -> {
            r1.resume(v1);
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public void retry(String str, int i) {
        HashSet hashSet = new HashSet();
        this.messageQueueItemDao.getByTagAndState(str, MessageState.WAITING_FOR_RETRY, num -> {
            MessageQueueItem orElseThrow = this.messageQueueItemDao.getQueueItemById(num.intValue()).orElseThrow(IllegalStateException::new);
            updateSyncAuditId(i, orElseThrow);
            hashSet.add(orElseThrow.getMessage().getAddress().getId());
            orElseThrow.setState(MessageState.PENDING.name());
            this.messageQueueItemDao.save(orElseThrow);
        });
        MessageExecutor messageExecutor = this.messageExecutor;
        messageExecutor.getClass();
        hashSet.forEach(messageExecutor::notify);
    }

    private void updateSyncAuditId(int i, MessageQueueItem messageQueueItem) {
        String tagForAuditSynchronization = getTagForAuditSynchronization(i);
        Stream.of((Object[]) messageQueueItem.getMessage().getTags()).filter(str -> {
            return str.startsWith(SYNCHRONIZATION_AUDIT_TAG_PREFIX);
        }).forEach(str2 -> {
            this.messageDao.deleteTag(messageQueueItem.getMessage(), str2);
        });
        this.messageDao.createTag(messageQueueItem.getMessage(), tagForAuditSynchronization);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public void cancel(Repository repository) {
        this.messageDao.getByTag(getTagForSynchronization(repository), messageId -> {
            Stream stream = Arrays.stream(this.messageQueueItemDao.getByMessageId(messageId));
            MessageQueueItemDao messageQueueItemDao = this.messageQueueItemDao;
            messageQueueItemDao.getClass();
            stream.forEach(messageQueueItemDao::delete);
            this.messageDao.delete(this.messageDao.getById(messageId.getId()));
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void running(MessageConsumer<P> messageConsumer, Message<P> message) {
        this.messageQueueItemDao.getByQueueAndMessage(messageConsumer.getQueue(), message.getId().intValue()).ifPresent(messageQueueItem -> {
            messageQueueItem.setState(MessageState.RUNNING.name());
            this.messageQueueItemDao.save(messageQueueItem);
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void ok(MessageConsumer<P> messageConsumer, Message<P> message) {
        Optional<MessageQueueItem> byQueueAndMessage = this.messageQueueItemDao.getByQueueAndMessage(messageConsumer.getQueue(), message.getId().intValue());
        MessageQueueItemDao messageQueueItemDao = this.messageQueueItemDao;
        messageQueueItemDao.getClass();
        byQueueAndMessage.ifPresent(messageQueueItemDao::delete);
        if (this.messageQueueItemDao.getByMessageId(new MessageId(message)).length == 0) {
            this.messageDao.delete(message);
        }
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void fail(MessageConsumer<P> messageConsumer, Message<P> message, Throwable th) {
        this.messageQueueItemDao.getByQueueAndMessage(messageConsumer.getQueue(), message.getId().intValue()).ifPresent(messageQueueItem -> {
            messageQueueItem.setRetryCount(messageQueueItem.getRetryCount() + 1);
            messageQueueItem.setState(MessageState.WAITING_FOR_RETRY.name());
            this.messageQueueItemDao.save(messageQueueItem);
            this.syncAudit.setException(getSynchronizationAuditIdFromTags(message.getTags()), th, false);
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void discard(MessageConsumer<P> messageConsumer, Message<P> message, DiscardReason discardReason) {
        this.messageQueueItemDao.getByQueueAndMessage(messageConsumer.getQueue(), message.getId().intValue()).ifPresent(messageQueueItem -> {
            messageQueueItem.setState(MessageState.DISCARDED.name());
            messageQueueItem.setStateInfo(discardReason.name());
            this.messageQueueItemDao.save(messageQueueItem);
        });
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> Message<P> getNextMessageForConsuming(MessageConsumer<P> messageConsumer, String str) {
        return this.messageQueueItemDao.getNextItemForProcessing(messageConsumer.getQueue(), str);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public int getQueuedCount(String str) {
        return this.messageDao.getMessagesForConsumingCount(str);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> MessageAddress<P> get(Class<P> cls, String str) {
        return this.messageAddressService.get(cls, str);
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public String getTagForSynchronization(Repository repository) {
        return SYNCHRONIZATION_REPO_TAG_PREFIX + repository.getId();
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public String getTagForAuditSynchronization(int i) {
        return SYNCHRONIZATION_AUDIT_TAG_PREFIX + i;
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public int getSynchronizationAuditIdFromTags(String[] strArr) {
        for (String str : strArr) {
            String substringAfter = StringUtils.substringAfter(str, SYNCHRONIZATION_AUDIT_TAG_PREFIX);
            if (StringUtils.isNotBlank(substringAfter)) {
                try {
                    return Integer.parseInt(substringAfter);
                } catch (NumberFormatException e) {
                    log.error("Synchronization audit id tag has invalid format. Tag was: '{}'", str);
                }
            }
        }
        log.warn("No synchronization audit id tag found in {}; {}", Arrays.toString(strArr), ExceptionUtils.getStackTrace(new Exception("Dummy exception")));
        return 0;
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> Repository getRepositoryFromMessage(Message<P> message) {
        for (String str : message.getTags()) {
            if (StringUtils.startsWith(str, SYNCHRONIZATION_REPO_TAG_PREFIX)) {
                try {
                    return this.repositoryService.get(Integer.parseInt(str.substring(SYNCHRONIZATION_REPO_TAG_PREFIX.length())));
                } catch (NumberFormatException e) {
                    log.warn("Get repo ID from message: " + e.getMessage());
                }
            }
        }
        log.warn("Can't get repository ID from tags for message with ID {}", message.getId());
        return null;
    }

    private <P extends HasProgress> Map<String, Object> toMessageMap(Message<P> message) {
        HashMap hashMap = new HashMap();
        hashMap.put(MessageMapping.ADDRESS, message.getAddress().getId());
        hashMap.put(MessageMapping.PRIORITY, Integer.valueOf(message.getPriority()));
        hashMap.put(MessageMapping.PAYLOAD_TYPE, message.getPayloadType().getCanonicalName());
        hashMap.put(MessageMapping.PAYLOAD, message.getPayload());
        return hashMap;
    }

    private Map<String, Object> messageQueueItemToMap(int i, String str, MessageState messageState, String str2) {
        Preconditions.checkNotNull(Integer.valueOf(i));
        HashMap hashMap = new HashMap();
        hashMap.put("MESSAGE_ID", Integer.valueOf(i));
        hashMap.put(MessageQueueItemMapping.QUEUE, str);
        hashMap.put(MessageQueueItemMapping.STATE, messageState.name());
        hashMap.put(MessageQueueItemMapping.STATE_INFO, str2);
        hashMap.put("RETRIES_COUNT", 0);
        return hashMap;
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public <P extends HasProgress> void tryEndProgress(Repository repository, Progress progress, MessageConsumer<P> messageConsumer, int i) {
        Date date;
        Date firstMessageTime;
        int numRequests;
        int flightTimeMs;
        if (endProgress(repository, progress)) {
            this.pausedTags.remove(getTagForSynchronization(repository));
            if (i > 0) {
                if (progress == null) {
                    date = new Date();
                    firstMessageTime = null;
                    numRequests = 0;
                    flightTimeMs = 0;
                } else {
                    date = new Date(progress.getFinishTime().longValue());
                    firstMessageTime = progress.getFirstMessageTime();
                    numRequests = progress.getNumRequests();
                    flightTimeMs = progress.getFlightTimeMs();
                }
                this.syncAudit.finish(i, firstMessageTime, numRequests, flightTimeMs, date);
            }
        }
    }

    private boolean endProgress(Repository repository, Progress progress) {
        if (getQueuedCount(getTagForSynchronization(repository)) != 0) {
            return false;
        }
        try {
            boolean hasErrors = hasErrors(progress);
            if (progress != null && !progress.isFinished()) {
                progress.finish();
                EnumSet<SynchronizationFlag> runAgainFlags = progress.getRunAgainFlags();
                if (runAgainFlags != null) {
                    progress.setRunAgainFlags(null);
                    runAgainFlags.add(SynchronizationFlag.SOFT_SYNC);
                    try {
                        this.synchronizer.doSync(repository, runAgainFlags);
                    } catch (SourceControlException.SynchronizationDisabled e) {
                    }
                }
            }
            if (!hasErrors) {
                this.eventService.dispatchEvents(repository);
            }
            this.linkerService.updateConnectLinkerValues(repository.getOrganizationId());
            this.httpClientProvider.closeIdleConnections();
            return true;
        } catch (Throwable th) {
            this.httpClientProvider.closeIdleConnections();
            throw th;
        }
    }

    @Override // com.atlassian.jira.plugins.dvcs.service.message.MessagingService
    public void onStart() {
        initRunningToFail();
        restartConsumers();
    }
}
