/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.sink;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorTask;
import io.debezium.connector.mongodb.sink.MongoProcessedSinkRecordData;
import io.debezium.connector.mongodb.sink.MongoSinkRecordProcessor;
import io.debezium.dlq.ErrorReporter;
import io.debezium.metadata.CollectionId;
import io.debezium.sink.DebeziumSinkRecord;
import io.debezium.sink.spi.ChangeEventSink;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;

final class MongoDbChangeEventSink
implements ChangeEventSink,
AutoCloseable {
    private final MongoDbSinkConnectorConfig sinkConfig;
    private final MongoClient mongoClient;
    private final ErrorReporter errorReporter;

    MongoDbChangeEventSink(MongoDbSinkConnectorConfig sinkConfig, MongoClient mongoClient, ErrorReporter errorReporter) {
        this.sinkConfig = sinkConfig;
        this.mongoClient = mongoClient;
        this.errorReporter = errorReporter;
    }

    @Override
    public void close() {
        MongoClient autoCloseable = this.mongoClient;
        if (autoCloseable != null) {
            autoCloseable.close();
        }
    }

    public Optional<CollectionId> getCollectionId(String collectionName) {
        return Optional.of(new CollectionId(collectionName));
    }

    public void execute(Collection<SinkRecord> records) {
        try {
            this.trackLatestRecordTimestampOffset(records);
            if (records.isEmpty()) {
                MongoDbSinkConnectorTask.LOGGER.debug("No sink records to process for current poll operation");
            } else {
                List<List<MongoProcessedSinkRecordData>> batches = MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(records, this.sinkConfig, this.errorReporter);
                for (List<MongoProcessedSinkRecordData> batch : batches) {
                    this.bulkWriteBatch(batch);
                }
            }
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    private void trackLatestRecordTimestampOffset(Collection<SinkRecord> records) {
        OptionalLong latestRecord = records.stream().filter(v -> v.timestamp() != null).mapToLong(ConnectRecord::timestamp).max();
    }

    private void bulkWriteBatch(List<MongoProcessedSinkRecordData> batch) {
        if (batch.isEmpty()) {
            return;
        }
        MongoNamespace namespace = batch.get(0).getNamespace();
        List writeModels = batch.stream().map(MongoProcessedSinkRecordData::getWriteModel).collect(Collectors.toList());
        boolean bulkWriteOrdered = true;
        try {
            MongoDbSinkConnectorTask.LOGGER.debug("Bulk writing {} document(s) into collection [{}] via an {} bulk write", new Object[]{writeModels.size(), namespace.getFullName(), bulkWriteOrdered ? "ordered" : "unordered"});
            BulkWriteResult result = this.mongoClient.getDatabase(namespace.getDatabaseName()).getCollection(namespace.getCollectionName(), BsonDocument.class).bulkWrite(writeModels, new BulkWriteOptions().ordered(bulkWriteOrdered));
            MongoDbSinkConnectorTask.LOGGER.debug("Mongodb bulk write result: {}", (Object)result);
        }
        catch (RuntimeException e) {
            this.handleTolerableWriteException(batch.stream().map(MongoProcessedSinkRecordData::getSinkRecord).collect(Collectors.toList()), bulkWriteOrdered, e, true, true);
        }
    }

    private void handleTolerableWriteException(List<DebeziumSinkRecord> batch, boolean ordered, RuntimeException e, boolean logErrors, boolean tolerateErrors) {
        if (e instanceof MongoBulkWriteException) {
            throw new DataException((Throwable)e);
        }
        if (logErrors) {
            MongoDbChangeEventSink.log(batch, e);
        }
        if (!tolerateErrors) {
            throw new DataException((Throwable)e);
        }
        batch.forEach(record -> this.errorReporter.report(record, (Exception)e));
    }

    private static void log(Collection<DebeziumSinkRecord> records, RuntimeException e) {
        MongoDbSinkConnectorTask.LOGGER.error("Failed to put into the sink the following records: {}", records, (Object)e);
    }
}

