/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.aws2.ddbstream;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.aws2.ddbstream.Ddb2StreamConsumerHealthCheck;
import org.apache.camel.component.aws2.ddbstream.Ddb2StreamEndpoint;
import org.apache.camel.component.aws2.ddbstream.ShardIteratorHandler;
import org.apache.camel.health.HealthCheck;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class Ddb2StreamConsumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Ddb2StreamConsumer.class);
    private final ShardIteratorHandler shardIteratorHandler;
    private final Map<String, String> lastSeenSequenceNumbers = new HashMap<String, String>();
    private WritableHealthCheckRepository healthCheckRepository;
    private Ddb2StreamConsumerHealthCheck consumerHealthCheck;

    public Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor) {
        this(endpoint, processor, new ShardIteratorHandler(endpoint));
    }

    Ddb2StreamConsumer(Ddb2StreamEndpoint endpoint, Processor processor, ShardIteratorHandler shardIteratorHandler) {
        super((Endpoint)endpoint, processor);
        this.shardIteratorHandler = shardIteratorHandler;
    }

    protected int poll() throws Exception {
        int processedExchangeCount = 0;
        Map<String, String> shardIterators = this.shardIteratorHandler.getShardIterators();
        for (Map.Entry<String, String> shardIteratorEntry : shardIterators.entrySet()) {
            GetRecordsResponse result;
            int limitPerRecordsRequest = Math.max(1, this.getEndpoint().getConfiguration().getMaxResultsPerRequest() / shardIterators.size());
            String shardId = shardIteratorEntry.getKey();
            String shardIterator = shardIteratorEntry.getValue();
            try {
                GetRecordsRequest req = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).limit(Integer.valueOf(limitPerRecordsRequest)).build();
                result = this.getEndpoint().getClient().getRecords(req);
            }
            catch (ExpiredIteratorException e) {
                String lastSeenSequenceNumber = this.lastSeenSequenceNumbers.get(shardId);
                LOG.warn("Expired Shard Iterator, attempting to resume from {}", (Object)lastSeenSequenceNumber, (Object)e);
                GetRecordsRequest req = (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(this.shardIteratorHandler.requestFreshShardIterator(shardId, lastSeenSequenceNumber)).limit(Integer.valueOf(limitPerRecordsRequest)).build();
                result = this.getEndpoint().getClient().getRecords(req);
            }
            List records = result.records();
            ArrayDeque<Exchange> exchanges = new ArrayDeque<Exchange>();
            for (Record record : records) {
                exchanges.add(this.createExchange(record));
            }
            processedExchangeCount += this.processBatch(CastUtils.cast(exchanges));
            this.shardIteratorHandler.updateShardIterator(shardId, result.nextShardIterator());
            if (records.isEmpty()) continue;
            this.lastSeenSequenceNumbers.put(shardId, ((Record)records.get(records.size() - 1)).dynamodb().sequenceNumber());
        }
        return processedExchangeCount;
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int processedExchanges = 0;
        while (!exchanges.isEmpty()) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
            this.getAsyncProcessor().process(exchange, cb);
            ++processedExchanges;
        }
        return processedExchanges;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.healthCheckRepository = (WritableHealthCheckRepository)HealthCheckHelper.getHealthCheckRepository((CamelContext)this.getEndpoint().getCamelContext(), (String)"components", WritableHealthCheckRepository.class);
        if (this.healthCheckRepository != null) {
            this.consumerHealthCheck = new Ddb2StreamConsumerHealthCheck(this, this.getRouteId());
            this.healthCheckRepository.addHealthCheck((HealthCheck)this.consumerHealthCheck);
        }
    }

    protected Exchange createExchange(Record record) {
        Exchange ex = this.createExchange(true);
        ex.getIn().setBody((Object)record, Record.class);
        return ex;
    }

    public Ddb2StreamEndpoint getEndpoint() {
        return (Ddb2StreamEndpoint)super.getEndpoint();
    }
}

