/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.SourceSignalChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Threads;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SignalProcessor<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SignalProcessor.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90L);
    public static final int SEMAPHORE_WAIT_TIME = 10;
    private final Map<String, SignalAction<P>> signalActions = new HashMap<String, SignalAction<P>>();
    private final CommonConnectorConfig connectorConfig;
    private final List<SignalChannelReader> enabledChannelReaders;
    private final List<SignalChannelReader> signalChannelReaders;
    private final ScheduledExecutorService signalProcessorExecutor;
    private final DocumentReader documentReader;
    private Offsets<P, O> previousOffsets;
    private final Semaphore semaphore = new Semaphore(1);

    public SignalProcessor(Class<? extends SourceConnector> connector, CommonConnectorConfig config, Map<String, SignalAction<P>> signalActions, List<SignalChannelReader> signalChannelReaders, DocumentReader documentReader, Offsets<P, O> previousOffsets) {
        this.connectorConfig = config;
        this.signalChannelReaders = signalChannelReaders;
        this.documentReader = documentReader;
        this.previousOffsets = previousOffsets;
        this.signalProcessorExecutor = Threads.newSingleThreadScheduledExecutor(connector, config.getLogicalName(), SignalProcessor.class.getSimpleName(), false);
        this.enabledChannelReaders = this.getEnabledChannelReaders();
        this.enabledChannelReaders.forEach(signalChannelReader -> signalChannelReader.init(this.connectorConfig));
        this.signalActions.putAll(signalActions);
    }

    private Predicate<SignalChannelReader> isEnabled() {
        return reader -> this.connectorConfig.getEnabledChannels().contains(reader.name());
    }

    private List<SignalChannelReader> getEnabledChannelReaders() {
        return this.signalChannelReaders.stream().filter(this.isEnabled()).collect(Collectors.toList());
    }

    public void setContext(O offset) {
        this.previousOffsets = Offsets.of(Collections.singletonMap(this.previousOffsets.getTheOnlyPartition(), offset));
    }

    public void start() {
        LOGGER.info("SignalProcessor started. Scheduling it every {}ms", (Object)this.connectorConfig.getSignalPollInterval().toMillis());
        this.signalProcessorExecutor.scheduleAtFixedRate(this::process, 0L, this.connectorConfig.getSignalPollInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void stop() throws InterruptedException {
        this.signalProcessorExecutor.submit(() -> this.enabledChannelReaders.forEach(SignalChannelReader::close));
        this.signalProcessorExecutor.shutdown();
        boolean isShutdown = this.signalProcessorExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (!isShutdown) {
            LOGGER.warn("SignalProcessor didn't stop in the expected time, shutting down executor now");
            Thread.interrupted();
            this.signalProcessorExecutor.shutdownNow();
            this.signalProcessorExecutor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        }
        LOGGER.info("SignalProcessor stopped");
    }

    public void registerSignalAction(String id, SignalAction<P> signal) {
        LOGGER.debug("Registering signal '{}' using class '{}'", (Object)id, (Object)signal.getClass().getName());
        this.signalActions.put(id, signal);
    }

    public void process() {
        this.executeWithSemaphore(() -> {
            LOGGER.trace("SignalProcessor processing");
            this.enabledChannelReaders.stream().map(SignalChannelReader::read).flatMap(Collection::stream).forEach(this::processSignal);
        });
    }

    public void processSourceSignal() {
        this.executeWithSemaphore(() -> {
            LOGGER.trace("Processing source signals");
            this.enabledChannelReaders.stream().filter(SignalProcessor.isSignal(SourceSignalChannel.class)).map(SignalChannelReader::read).flatMap(Collection::stream).forEach(this::processSignal);
        });
    }

    private void executeWithSemaphore(Runnable operation) {
        boolean acquired = false;
        try {
            acquired = this.semaphore.tryAcquire(10L, TimeUnit.SECONDS);
            operation.run();
        }
        catch (InterruptedException e) {
            LOGGER.error("Not able to acquire semaphore after {}s", (Object)10);
            throw new DebeziumException("Not able to acquire semaphore during signaling processing", (Throwable)e);
        }
        finally {
            if (acquired) {
                this.semaphore.release();
            }
        }
    }

    private void processSignal(SignalRecord signalRecord) {
        LOGGER.debug("Signal Processor offset context {}", this.previousOffsets.getOffsets());
        LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", new Object[]{signalRecord.getId(), signalRecord.getType(), signalRecord.getData()});
        SignalAction<P> action = this.signalActions.get(signalRecord.getType());
        if (action == null) {
            LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", (Object)signalRecord.getId(), (Object)signalRecord.getType());
            return;
        }
        try {
            Document jsonData = signalRecord.getData() == null || signalRecord.getData().isEmpty() ? Document.create() : this.documentReader.read(signalRecord.getData());
            action.arrived(new SignalPayload<P>(this.previousOffsets.getTheOnlyPartition(), signalRecord.getId(), signalRecord.getType(), jsonData, (OffsetContext)this.previousOffsets.getTheOnlyOffset(), signalRecord.getAdditionalData()));
        }
        catch (IOException e) {
            LOGGER.warn("Signal '{}' has been received but the data '{}' cannot be parsed", new Object[]{signalRecord.getId(), signalRecord.getData(), e});
        }
        catch (InterruptedException e) {
            LOGGER.warn("Action {} has been interrupted. The signal {} may not have been processed.", (Object)signalRecord.getType(), (Object)signalRecord);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOGGER.warn("Action {} failed. The signal {} may not have been processed.", new Object[]{signalRecord.getType(), signalRecord, e});
        }
    }

    public <T extends SignalChannelReader> T getSignalChannel(Class<T> channel) {
        return (T)((SignalChannelReader)channel.cast(this.signalChannelReaders.stream().filter(SignalProcessor.isSignal(channel)).findFirst().get()));
    }

    private static <T extends SignalChannelReader> Predicate<SignalChannelReader> isSignal(Class<T> channelClass) {
        return channel -> channel.getClass().equals(channelClass);
    }
}

