/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.common;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.common.DebeziumHeaderProducerProvider;
import io.debezium.connector.common.DebeziumTaskState;
import io.debezium.connector.common.OffsetReader;
import io.debezium.converters.custom.CustomConverterServiceProvider;
import io.debezium.data.Envelope;
import io.debezium.function.LogPositionValidator;
import io.debezium.openlineage.ConnectorContext;
import io.debezium.openlineage.DebeziumOpenLineageEmitter;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotLockProvider;
import io.debezium.snapshot.SnapshotQueryProvider;
import io.debezium.snapshot.SnapshotterServiceProvider;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Loggings;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext>
extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
    private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5L));
    private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1L));
    private Configuration config;
    private List<SignalChannelReader> signalChannels;
    private final AtomicReference<DebeziumTaskState> state = new AtomicReference<DebeziumTaskState>(DebeziumTaskState.INITIAL);
    private final ReentrantLock stateLock = new ReentrantLock();
    private volatile ElapsedTimeStrategy restartDelay;
    protected ChangeEventSourceCoordinator<P, O> coordinator;
    private final Map<Map<String, ?>, Map<String, ?>> lastOffsets = new HashMap();
    private Duration retriableRestartWait;
    private final ElapsedTimeStrategy pollOutputDelay;
    private final Clock clock = Clock.system();
    private Instant previousOutputInstant;
    private int previousOutputBatchSize;
    protected final AtomicBoolean shouldPerformCommit = new AtomicBoolean(false);
    private final ServiceLoader<SignalChannelReader> availableSignalChannels = ServiceLoader.load(SignalChannelReader.class);
    private final List<NotificationChannel> notificationChannels;
    private boolean offsetLoadedInPast = false;

    protected void validateSchemaHistory(CommonConnectorConfig config, LogPositionValidator logPositionValidator, Offsets<P, O> previousOffsets, DatabaseSchema schema, Snapshotter snapshotter) {
        for (Map.Entry<P, O> previousOffset : previousOffsets) {
            boolean logPositionAvailable;
            Partition partition = (Partition)previousOffset.getKey();
            OffsetContext offset = (OffsetContext)previousOffset.getValue();
            if (offset == null) {
                if (snapshotter.shouldSnapshotOnSchemaError()) {
                    throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
                }
                LOGGER.info("Connector started with no previous offset for partition '{}'", (Object)partition);
                if (schema.isHistorized()) {
                    if (((HistorizedDatabaseSchema)schema).getSchemaHistory().storageExists()) {
                        LOGGER.info("Database schema history storage was found. Connector will use the pre-existing storage. Checking settings for the same.");
                        ((HistorizedDatabaseSchema)schema).getSchemaHistory().checkStorageSettings();
                    } else {
                        ((HistorizedDatabaseSchema)schema).initializeStorage();
                    }
                }
                return;
            }
            if (offset.isInitialSnapshotRunning()) {
                if (snapshotter.shouldSnapshotData(true, true) || snapshotter.shouldSnapshotSchema(true, true)) continue;
                throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
            }
            if (schema.isHistorized() && !((HistorizedDatabaseSchema)schema).getSchemaHistory().exists()) {
                LOGGER.warn("Database schema history was not found but was expected");
                if (snapshotter.shouldSnapshotOnSchemaError()) {
                    LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.", (Object)snapshotter.name());
                    if (schema.isHistorized()) {
                        ((HistorizedDatabaseSchema)schema).initializeStorage();
                    }
                    return;
                }
                throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.");
            }
            if (!config.isLogPositionCheckEnabled() || (logPositionAvailable = this.isLogPositionAvailable(logPositionValidator, partition, offset, config)) || !snapshotter.shouldStream()) continue;
            LOGGER.warn("Last recorded offset is no longer available on the server.");
            if (snapshotter.shouldSnapshotOnDataError()) {
                LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. Attempting to snapshot data to fill the gap.", (Object)snapshotter.name());
                previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
                return;
            }
            throw new DebeziumException("The connector is trying to read change stream starting at " + String.valueOf(offset) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot mode when needed.");
        }
    }

    public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, Partition partition, OffsetContext offsetContext, CommonConnectorConfig config) {
        if (logPositionValidator == null) {
            LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
            return true;
        }
        return logPositionValidator.validate(partition, offsetContext, config);
    }

    protected BaseSourceTask() {
        this.pollOutputDelay = ElapsedTimeStrategy.exponential(this.clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
        this.previousOutputInstant = this.clock.currentTimeAsInstant();
        this.notificationChannels = StreamSupport.stream(ServiceLoader.load(NotificationChannel.class).spliterator(), false).collect(Collectors.toList());
    }

    public final void start(Map<String, String> props) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        this.stateLock.lock();
        try {
            this.setTaskState(DebeziumTaskState.INITIAL);
            this.config = Configuration.from(props);
            DebeziumOpenLineageEmitter.init(props, (String)this.connectorName());
            DebeziumOpenLineageEmitter.emit((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(props, (String)this.connectorName()), (DebeziumTaskState)DebeziumTaskState.INITIAL);
            this.retriableRestartWait = this.config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
            this.restartDelay = null;
            if (!this.config.validateAndRecord(this.getAllConfigurationFields(), arg_0 -> ((Logger)LOGGER).error(arg_0))) {
                throw new ConnectException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
            }
            if (LOGGER.isInfoEnabled()) {
                StringBuilder configLogBuilder = new StringBuilder("Starting " + ((Object)((Object)this)).getClass().getSimpleName() + " with configuration:");
                this.withMaskedSensitiveOptions(this.config).forEach((propName, propValue) -> configLogBuilder.append("\n   ").append((String)propName).append(" = ").append((String)propValue));
                configLogBuilder.append("\n");
                LOGGER.info(configLogBuilder.toString());
            }
            try {
                this.coordinator = this.start(this.config);
                this.setTaskState(DebeziumTaskState.RUNNING);
                DebeziumOpenLineageEmitter.emit((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(props, (String)this.connectorName()), (DebeziumTaskState)DebeziumTaskState.RUNNING);
            }
            catch (RetriableException e) {
                LOGGER.warn("Failed to start connector, will re-attempt during polling.", (Throwable)e);
                this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait);
                this.setTaskState(DebeziumTaskState.RESTARTING);
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public List<SignalChannelReader> getAvailableSignalChannels() {
        if (this.signalChannels == null) {
            this.signalChannels = this.availableSignalChannels.stream().map(ServiceLoader.Provider::get).collect(Collectors.toList());
        }
        return this.signalChannels;
    }

    public Optional<? extends SignalChannelWriter> getAvailableSignalChannelWriter() {
        return this.getAvailableSignalChannels().stream().filter(SignalChannelWriter.class::isInstance).map(SignalChannelWriter.class::cast).findFirst();
    }

    protected Configuration withMaskedSensitiveOptions(Configuration config) {
        return config.withMaskedPasswords();
    }

    protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration var1);

    protected abstract String connectorName();

    public final List<SourceRecord> poll() throws InterruptedException {
        try {
            if (!this.startIfNeededAndPossible()) {
                return Collections.emptyList();
            }
            if (this.shouldPerformCommit.getAndSet(false)) {
                this.performCommit();
            }
            List<SourceRecord> records = this.doPoll();
            this.logStatistics(records);
            this.resetErrorHandlerRetriesIfNeeded(records);
            return records;
        }
        catch (RetriableException e) {
            this.stop(true);
            throw e;
        }
    }

    protected void logStatistics(List<SourceRecord> records) {
        if (records == null || !LOGGER.isInfoEnabled()) {
            return;
        }
        int batchSize = records.size();
        if (batchSize > 0) {
            if (LOGGER.isDebugEnabled()) {
                LinkedHashMap topicCounts = new LinkedHashMap();
                records.forEach(r -> topicCounts.merge(r.topic(), 1, Integer::sum));
                for (Map.Entry topicCount : topicCounts.entrySet()) {
                    LOGGER.debug("Sending {} records to topic {}", topicCount.getValue(), topicCount.getKey());
                }
            }
            SourceRecord lastRecord = records.get(batchSize - 1);
            this.previousOutputBatchSize += batchSize;
            if (this.pollOutputDelay.hasElapsed()) {
                Instant currentTime = this.clock.currentTime();
                LOGGER.info("{} records sent during previous {}, last recorded offset of {} partition is {}", new Object[]{this.previousOutputBatchSize, Strings.duration(Duration.between(this.previousOutputInstant, currentTime).toMillis()), lastRecord.sourcePartition(), lastRecord.sourceOffset()});
                this.previousOutputInstant = currentTime;
                this.previousOutputBatchSize = 0;
            }
        }
    }

    private void updateLastOffset(Map<String, ?> partition, Map<String, ?> lastOffset) {
        this.stateLock.lock();
        this.lastOffsets.put(partition, lastOffset);
        this.stateLock.unlock();
    }

    protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> records) {
        if (this.containsChangeDataMessages(records) && this.coordinator != null && this.coordinator.getErrorHandler().getRetries() > 0) {
            this.coordinator.getErrorHandler().resetRetries();
        }
    }

    protected boolean containsChangeDataMessages(List<SourceRecord> records) {
        if (records == null || records.isEmpty()) {
            return false;
        }
        for (SourceRecord record : records) {
            if (record.valueSchema() == null || !Envelope.isEnvelopeSchema(record.valueSchema())) continue;
            return true;
        }
        return false;
    }

    protected abstract List<SourceRecord> doPoll() throws InterruptedException;

    protected abstract Optional<ErrorHandler> getErrorHandler();

    private boolean startIfNeededAndPossible() throws InterruptedException {
        this.stateLock.lock();
        boolean result = false;
        try {
            DebeziumTaskState currentState = this.getTaskState();
            if (currentState == DebeziumTaskState.RUNNING) {
                result = true;
            } else if (currentState == DebeziumTaskState.RESTARTING) {
                this.getErrorHandler().ifPresentOrElse(handler -> DebeziumOpenLineageEmitter.emit((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(this.config.asMap(), (String)this.connectorName()), (DebeziumTaskState)DebeziumTaskState.RESTARTING, (Throwable)handler.getProducerThrowable()), () -> DebeziumOpenLineageEmitter.emit((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(this.config.asMap(), (String)this.connectorName()), (DebeziumTaskState)DebeziumTaskState.RESTARTING));
                if (this.restartDelay.hasElapsed()) {
                    LOGGER.info("Attempting to restart task.");
                    this.coordinator = this.start(this.config);
                    LOGGER.info("Successfully restarted task");
                    this.restartDelay = null;
                    this.setTaskState(DebeziumTaskState.RUNNING);
                    result = true;
                } else {
                    LOGGER.info("Awaiting end of restart backoff period after a retriable error");
                    Metronome.parker(this.retriableRestartWait, Clock.SYSTEM).pause();
                }
            }
        }
        finally {
            this.stateLock.unlock();
        }
        return result;
    }

    public final void stop() {
        this.performCommit();
        this.stop(false);
        DebeziumOpenLineageEmitter.cleanup((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(this.config.asMap(), (String)this.connectorName()));
    }

    private void stop(boolean restart) {
        this.stateLock.lock();
        try {
            if (restart) {
                LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", (Object)this.retriableRestartWait.getSeconds());
            } else {
                LOGGER.info("Stopping down connector");
            }
            try {
                if (this.coordinator != null) {
                    this.coordinator.stop();
                    this.coordinator = null;
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.error("Interrupted while stopping coordinator", (Throwable)e);
                throw new ConnectException("Interrupted while stopping coordinator, failing the task");
            }
            this.doStop();
            if (restart) {
                this.setTaskState(DebeziumTaskState.RESTARTING);
                if (this.restartDelay == null) {
                    this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait);
                }
            } else {
                this.setTaskState(DebeziumTaskState.STOPPED);
                DebeziumOpenLineageEmitter.emit((ConnectorContext)DebeziumOpenLineageEmitter.connectorContext(this.config.asMap(), (String)this.connectorName()), (DebeziumTaskState)DebeziumTaskState.STOPPED);
            }
        }
        finally {
            this.stateLock.unlock();
        }
    }

    protected abstract void doStop();

    public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
        LOGGER.trace("Committing record {}", Loggings.maybeRedactSensitiveData(record));
        Map currentOffset = record.sourceOffset();
        if (currentOffset != null) {
            this.updateLastOffset(record.sourcePartition(), currentOffset);
        }
    }

    public void commit() throws InterruptedException {
        this.shouldPerformCommit.set(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void performCommit() {
        boolean locked = this.stateLock.tryLock();
        if (locked) {
            try {
                if (this.coordinator == null) return;
                Iterator<Map<String, ?>> iterator = this.lastOffsets.keySet().iterator();
                while (iterator.hasNext()) {
                    Map<String, ?> partition = iterator.next();
                    Map<String, ?> lastOffset = this.lastOffsets.get(partition);
                    LOGGER.debug("Committing offset '{}' for partition '{}'", partition, lastOffset);
                    this.coordinator.commitOffset(partition, lastOffset);
                    iterator.remove();
                }
                return;
            }
            finally {
                this.stateLock.unlock();
            }
        } else {
            LOGGER.info("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
        Set<P> partitions = provider.getPartitions();
        OffsetReader reader = new OffsetReader(this.context.offsetStorageReader(), loader);
        Map offsets = reader.offsets(partitions);
        boolean found = false;
        for (Partition partition : partitions) {
            OffsetContext offset = (OffsetContext)offsets.get(partition);
            if (offset == null) continue;
            found = true;
            if (this.offsetLoadedInPast) {
                LOGGER.debug("Found previous partition offset {}: {}", (Object)partition, offset.getOffset());
                continue;
            }
            LOGGER.info("Found previous partition offset {}: {}", (Object)partition, offset.getOffset());
            this.offsetLoadedInPast = true;
        }
        if (!found) {
            LOGGER.info("No previous offsets found");
        }
        return Offsets.of(offsets);
    }

    private void setTaskState(DebeziumTaskState newState) {
        DebeziumTaskState oldState = this.state.getAndSet(newState);
        LOGGER.debug("Setting task state to '{}', previous state was '{}'", (Object)newState, (Object)oldState);
    }

    public DebeziumTaskState getTaskState() {
        this.stateLock.lock();
        try {
            DebeziumTaskState debeziumTaskState = this.state.get();
            return debeziumTaskState;
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public List<NotificationChannel> getNotificationChannels() {
        return this.notificationChannels;
    }

    protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
        serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
        serviceRegistry.registerServiceProvider(new SnapshotLockProvider());
        serviceRegistry.registerServiceProvider(new SnapshotQueryProvider());
        serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider());
        serviceRegistry.registerServiceProvider(new DebeziumHeaderProducerProvider());
        serviceRegistry.registerServiceProvider(new CustomConverterServiceProvider());
    }
}

