/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.aws2.kinesis;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
import org.apache.camel.component.aws2.kinesis.KinesisConnection;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

public class Kinesis2Producer
extends DefaultProducer {
    private static final int MAX_BATCH_SIZE = 500;
    private KinesisConnection connection;

    public Kinesis2Producer(Kinesis2Endpoint endpoint) {
        super((Endpoint)endpoint);
    }

    public KinesisConnection getConnection() {
        return this.connection;
    }

    public void setConnection(KinesisConnection connection) {
        this.connection = connection;
    }

    public Kinesis2Endpoint getEndpoint() {
        return (Kinesis2Endpoint)super.getEndpoint();
    }

    protected void doStart() throws Exception {
        super.doStart();
        ObjectHelper.notNull((Object)this.connection, (String)"connection", (Object)((Object)this));
    }

    public void process(Exchange exchange) throws Exception {
        Object body = exchange.getIn().getBody();
        if (body instanceof Iterable) {
            this.sendBatchRecords(exchange);
        } else {
            this.sendSingleRecord(exchange);
        }
    }

    private void sendBatchRecords(Exchange exchange) {
        Object partitionKey = exchange.getIn().getHeader("CamelAwsKinesisPartitionKey");
        this.ensurePartitionKeyNotNull(partitionKey);
        List<List<PutRecordsRequestEntry>> requestBatchList = this.createRequestBatchList(exchange, partitionKey);
        for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) {
            PutRecordsRequest putRecordsRequest = (PutRecordsRequest)PutRecordsRequest.builder().streamName(this.getEndpoint().getConfiguration().getStreamName()).records(requestBatch).build();
            PutRecordsResponse putRecordsResponse = this.connection.getClient(this.getEndpoint()).putRecords(putRecordsRequest);
            if (putRecordsResponse.failedRecordCount() <= 0) continue;
            throw new RuntimeException("Failed to send records " + putRecordsResponse.failedRecordCount() + " of " + putRecordsResponse.records().size());
        }
    }

    private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange exchange, Object partitionKey) {
        ArrayList<List<PutRecordsRequestEntry>> requestBatchList = new ArrayList<List<PutRecordsRequestEntry>>();
        ArrayList<PutRecordsRequestEntry> requestBatch = new ArrayList<PutRecordsRequestEntry>(500);
        for (Object record : (Iterable)exchange.getIn().getBody(Iterable.class)) {
            SdkBytes sdkBytes;
            if (record instanceof byte[]) {
                byte[] bytes = (byte[])record;
                sdkBytes = SdkBytes.fromByteArray((byte[])bytes);
            } else if (record instanceof ByteBuffer) {
                ByteBuffer bf = (ByteBuffer)record;
                sdkBytes = SdkBytes.fromByteBuffer((ByteBuffer)bf);
            } else if (record instanceof InputStream) {
                InputStream is = (InputStream)record;
                sdkBytes = SdkBytes.fromInputStream((InputStream)is);
            } else if (record instanceof String) {
                String str = (String)record;
                sdkBytes = SdkBytes.fromUtf8String((String)str);
            } else {
                throw new IllegalArgumentException("Record type not supported. Must be byte[], ByteBuffer, InputStream or UTF-8 String");
            }
            PutRecordsRequestEntry putRecordsRequestEntry = (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().data(sdkBytes).partitionKey(partitionKey.toString()).build();
            requestBatch.add(putRecordsRequestEntry);
            if (requestBatch.size() != 500) continue;
            requestBatchList.add(requestBatch);
            requestBatch = new ArrayList(500);
        }
        if (!requestBatch.isEmpty()) {
            requestBatchList.add(requestBatch);
        }
        return requestBatchList;
    }

    private void sendSingleRecord(Exchange exchange) {
        PutRecordRequest request = this.createRequest(exchange);
        PutRecordResponse putRecordResult = this.connection.getClient(this.getEndpoint()).putRecord(request);
        Message message = exchange.getMessage();
        message.setHeader("CamelAwsKinesisSequenceNumber", (Object)putRecordResult.sequenceNumber());
        message.setHeader("CamelAwsKinesisShardId", (Object)putRecordResult.shardId());
    }

    private PutRecordRequest createRequest(Exchange exchange) {
        byte[] body = (byte[])exchange.getIn().getBody(byte[].class);
        Object partitionKey = exchange.getIn().getHeader("CamelAwsKinesisPartitionKey");
        Object sequenceNumber = exchange.getIn().getHeader("CamelAwsKinesisSequenceNumber");
        PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder();
        putRecordRequest.data(SdkBytes.fromByteArray((byte[])body));
        putRecordRequest.streamName(this.getEndpoint().getConfiguration().getStreamName());
        this.ensurePartitionKeyNotNull(partitionKey);
        putRecordRequest.partitionKey(partitionKey.toString());
        if (sequenceNumber != null) {
            putRecordRequest.sequenceNumberForOrdering(sequenceNumber.toString());
        }
        return (PutRecordRequest)putRecordRequest.build();
    }

    private void ensurePartitionKeyNotNull(Object partitionKey) {
        if (partitionKey == null) {
            throw new IllegalArgumentException("Partition key must be specified");
        }
    }
}

