/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogFileCollector;
import io.debezium.connector.oracle.logminer.LogMinerSessionContext;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.document.Document;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractLogMinerStreamingAdapter
extends AbstractStreamingAdapter<LogMinerStreamingChangeEventSourceMetrics> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerStreamingAdapter.class);

    public AbstractLogMinerStreamingAdapter(OracleConnectorConfig connectorConfig) {
        super(connectorConfig);
    }

    @Override
    public HistoryRecordComparator getHistoryRecordComparator() {
        return new HistoryRecordComparator(){

            protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
                return AbstractLogMinerStreamingAdapter.this.resolveScn(recorded).compareTo(AbstractLogMinerStreamingAdapter.this.resolveScn(desired)) < 1;
            }
        };
    }

    @Override
    public LogMinerStreamingChangeEventSourceMetrics getStreamingMetrics(OracleTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider, OracleConnectorConfig connectorConfig) {
        return new LogMinerStreamingChangeEventSourceMetrics(taskContext, changeEventQueueMetrics, metadataProvider, connectorConfig);
    }

    @Override
    public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx, OracleConnectorConfig connectorConfig, OracleConnection connection) throws SQLException {
        Scn latestTableDdlScn = this.getLatestTableDdlScn(ctx, connection).orElse(null);
        String tableName = AbstractLogMinerStreamingAdapter.getTransactionTableName(connectorConfig);
        LinkedHashMap<String, Scn> pendingTransactions = new LinkedHashMap<String, Scn>();
        Optional<Scn> currentScn = this.isPendingTransactionSkip(connectorConfig) ? this.getCurrentScn(latestTableDdlScn, connection) : this.getPendingTransactions(latestTableDdlScn, connection, pendingTransactions, tableName);
        if (currentScn.isEmpty()) {
            throw new DebeziumException("Failed to resolve current SCN");
        }
        try (OracleConnection conn = new OracleConnection(connection.config(), false);){
            conn.setAutoCommit(false);
            if (!Strings.isNullOrEmpty((String)connectorConfig.getPdbName())) {
                conn.resetSessionToCdb();
            }
            OracleOffsetContext oracleOffsetContext = this.determineSnapshotOffset(connectorConfig, conn, currentScn.get(), pendingTransactions, tableName);
            return oracleOffsetContext;
        }
    }

    @Override
    public Scn getOffsetScn(OracleOffsetContext offsetContext) {
        return offsetContext.getScn();
    }

    private Optional<Scn> getCurrentScn(Scn latestTableDdlScn, OracleConnection connection) throws SQLException {
        Scn currentScn;
        String query = "SELECT CURRENT_SCN FROM V$DATABASE";
        while (this.areSameTimestamp(latestTableDdlScn, currentScn = (Scn)connection.queryAndMap("SELECT CURRENT_SCN FROM V$DATABASE", rs -> rs.next() ? Scn.valueOf(rs.getString(1)) : Scn.NULL), connection)) {
        }
        return Optional.ofNullable(currentScn);
    }

    private Optional<Scn> getPendingTransactions(Scn latestTableDdlScn, OracleConnection connection, Map<String, Scn> transactions, String transactionTableName) throws SQLException {
        String query = "SELECT d.CURRENT_SCN, t.XID, t.START_SCN FROM V$DATABASE d LEFT OUTER JOIN " + transactionTableName + " t ON t.START_SCN < d.CURRENT_SCN ";
        Scn currentScn = null;
        do {
            currentScn = null;
            transactions.clear();
            try (Statement s = connection.connection().createStatement();
                 ResultSet rs = s.executeQuery(query);){
                while (rs.next()) {
                    String pendingTxStartScn;
                    if (currentScn == null) {
                        currentScn = Scn.valueOf(rs.getString(1));
                    }
                    if (Strings.isNullOrEmpty((String)(pendingTxStartScn = rs.getString(3)))) continue;
                    String transactionId = HexConverter.convertToHexString((byte[])rs.getBytes(2));
                    Scn transactionStartScn = Scn.valueOf(pendingTxStartScn);
                    if (transactionStartScn.compareTo(Scn.ONE) > 0) {
                        transactions.put(transactionId, transactionStartScn);
                        continue;
                    }
                    LOGGER.warn("Unable to determine the start SCN, transaction {} will not be included", (Object)transactionId);
                }
            }
            catch (SQLException e) {
                LOGGER.warn("Could not query the {} view: {}", new Object[]{transactionTableName, e.getMessage(), e});
                throw e;
            }
        } while (this.areSameTimestamp(latestTableDdlScn, currentScn, connection));
        for (Map.Entry<String, Scn> transaction : transactions.entrySet()) {
            LOGGER.trace("\tPending Transaction '{}' started at SCN {}", (Object)transaction.getKey(), (Object)transaction.getValue());
        }
        return Optional.ofNullable(currentScn);
    }

    private OracleOffsetContext determineSnapshotOffset(OracleConnectorConfig connectorConfig, OracleConnection connection, Scn currentScn, Map<String, Scn> pendingTransactions, String transactionTableName) throws SQLException {
        if (this.isPendingTransactionSkip(connectorConfig)) {
            LOGGER.info("\tNo in-progress transactions will be captured.");
        } else if (this.isPendingTransactionViewOnly(connectorConfig)) {
            LOGGER.info("\tSkipping transaction logs for resolving snapshot offset, only using {}.", (Object)transactionTableName);
        } else {
            LOGGER.info("\tConsulting {} and transaction logs for resolving snapshot offset.", (Object)transactionTableName);
            this.getPendingTransactionsFromLogs(connection, currentScn, pendingTransactions);
        }
        if (!pendingTransactions.isEmpty()) {
            for (Map.Entry<String, Scn> entry : pendingTransactions.entrySet()) {
                LOGGER.info("\tFound in-progress transaction {}, starting at SCN {}", (Object)entry.getKey(), (Object)entry.getValue());
            }
        } else if (!this.isPendingTransactionSkip(connectorConfig)) {
            LOGGER.info("\tFound no in-progress transactions.");
        }
        return OracleOffsetContext.create().logicalName(connectorConfig).scn(currentScn).snapshotScn(currentScn).snapshotPendingTransactions(pendingTransactions).transactionContext(new TransactionContext()).incrementalSnapshotContext((IncrementalSnapshotContext<TableId>)new SignalBasedIncrementalSnapshotContext()).build();
    }

    protected Scn getOldestScnAvailableInLogs(OracleConnectorConfig config, OracleConnection connection) throws SQLException {
        Duration archiveLogRetention = config.getArchiveLogRetention();
        String archiveLogDestinationName = config.getArchiveDestinationNameResolver().getDestinationName(connection);
        return (Scn)connection.queryAndMap(SqlUtils.oldestFirstChangeQuery(archiveLogRetention, archiveLogDestinationName), rs -> {
            String value;
            if (rs.next() && !Strings.isNullOrEmpty((String)(value = rs.getString(1)))) {
                return Scn.valueOf(value);
            }
            return Scn.NULL;
        });
    }

    protected List<LogFile> getOrderedLogsFromScn(OracleConnectorConfig config, Scn sinceScn, OracleConnection connection) throws SQLException {
        LogFileCollector collector = new LogFileCollector(config, connection);
        return collector.getLogs(sinceScn).stream().sorted(Comparator.comparing(LogFile::getSequence)).collect(Collectors.toList());
    }

    protected void getPendingTransactionsFromLogs(OracleConnection connection, Scn currentScn, Map<String, Scn> pendingTransactions) throws SQLException {
        Scn oldestScn = this.getOldestScnAvailableInLogs(this.connectorConfig, connection);
        List<LogFile> logFiles = this.getOrderedLogsFromScn(this.connectorConfig, oldestScn, connection);
        if (!logFiles.isEmpty()) {
            try (LogMinerSessionContext context = new LogMinerSessionContext(connection, false, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG, this.connectorConfig.getLogMiningPathToDictionary());){
                context.addLogFiles(this.getMostRecentLogFilesForSearch(logFiles));
                context.startSession(Scn.NULL, Scn.NULL, false);
                LOGGER.info("\tQuerying transaction logs, please wait...");
                connection.query("SELECT START_SCN, XID FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE=7 AND SCN >= " + String.valueOf(currentScn) + " AND START_SCN <= " + String.valueOf(currentScn), rs -> {
                    while (rs.next()) {
                        String transactionId = HexConverter.convertToHexString((byte[])rs.getBytes("XID"));
                        String startScnStr = rs.getString("START_SCN");
                        if (Strings.isNullOrBlank((String)startScnStr)) continue;
                        Scn startScn = Scn.valueOf(rs.getString("START_SCN"));
                        if (pendingTransactions.containsKey(transactionId)) continue;
                        LOGGER.info("\tTransaction '{}' started at SCN '{}'", (Object)transactionId, (Object)startScn);
                        pendingTransactions.put(transactionId, startScn);
                    }
                });
            }
            catch (Exception e) {
                throw new DebeziumException("Failed to resolve snapshot offset", (Throwable)e);
            }
        }
    }

    protected List<LogFile> getMostRecentLogFilesForSearch(List<LogFile> allLogFiles) {
        HashMap recentLogsPerThread = new HashMap();
        for (LogFile logFile : allLogFiles) {
            if (recentLogsPerThread.containsKey(logFile.getThread()) || !logFile.isCurrent()) continue;
            recentLogsPerThread.put(logFile.getThread(), new ArrayList());
            ((List)recentLogsPerThread.get(logFile.getThread())).add(logFile);
            Optional<LogFile> maxArchiveLogFile = allLogFiles.stream().filter(f -> logFile.getThread() == f.getThread() && logFile.getSequence().compareTo(f.getSequence()) > 0).max(Comparator.comparing(LogFile::getSequence));
            maxArchiveLogFile.ifPresent(file -> ((List)recentLogsPerThread.get(logFile.getThread())).add(file));
        }
        ArrayList<LogFile> logs = new ArrayList<LogFile>();
        for (Map.Entry entry : recentLogsPerThread.entrySet()) {
            logs.addAll((Collection)entry.getValue());
        }
        return logs;
    }

    private boolean isPendingTransactionSkip(OracleConnectorConfig config) {
        return config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.SKIP;
    }

    public boolean isPendingTransactionViewOnly(OracleConnectorConfig config) {
        return config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY;
    }

    private static String getTransactionTableName(OracleConnectorConfig config) {
        if (config.getRacNodes() == null || config.getRacNodes().isEmpty()) {
            return "V$TRANSACTION";
        }
        return "GV$TRANSACTION";
    }
}

