/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.test.infra.aws2.clients;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.camel.test.infra.common.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;

public final class KinesisUtils {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisUtils.class);

    private KinesisUtils() {
    }

    private static void doCreateStream(KinesisClient kinesisClient, String streamName) {
        CreateStreamRequest request = (CreateStreamRequest)CreateStreamRequest.builder().streamName(streamName).shardCount(Integer.valueOf(1)).build();
        try {
            CreateStreamResponse response = kinesisClient.createStream(request);
            if (response.sdkHttpResponse().isSuccessful()) {
                LOG.info("Stream created successfully");
            } else {
                Assertions.fail((String)"Failed to create the stream");
            }
        }
        catch (KinesisException e) {
            LOG.error("Unable to create stream: {}", (Object)e.getMessage(), (Object)e);
            Assertions.fail((String)"Unable to create stream");
        }
    }

    public static void createStream(KinesisClient kinesisClient, String streamName) {
        try {
            LOG.info("Checking whether the stream exists already");
            int status = KinesisUtils.getStreamStatus(kinesisClient, streamName);
            LOG.info("Kinesis stream check result: {}", (Object)status);
        }
        catch (KinesisException e) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("The stream does not exist, auto creating it: {}", (Object)e.getMessage(), (Object)e);
            } else {
                LOG.info("The stream does not exist, auto creating it: {}", (Object)e.getMessage());
            }
            KinesisUtils.doCreateStream(kinesisClient, streamName);
            TestUtils.waitFor(() -> {
                try {
                    GetRecordsRequest getRecordsRequest = KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);
                    GetRecordsResponse response = kinesisClient.getRecords(getRecordsRequest);
                    List recordList = response.records();
                    LOG.debug("Checking for stream creation by reading {} records: SUCCESS!", (Object)recordList.size());
                    return true;
                }
                catch (Exception exc) {
                    LOG.debug("Checking for stream creation by reading records: FAILURE, retrying..");
                    return false;
                }
            });
        }
        catch (SdkClientException e) {
            LOG.info("SDK Error when getting the stream: {}", (Object)e.getMessage());
        }
    }

    private static int getStreamStatus(KinesisClient kinesisClient, String streamName) {
        DescribeStreamRequest request = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(streamName).build();
        DescribeStreamResponse response = kinesisClient.describeStream(request);
        return response.sdkHttpResponse().statusCode();
    }

    public static void doDeleteStream(KinesisClient kinesisClient, String streamName) {
        DeleteStreamRequest request = (DeleteStreamRequest)DeleteStreamRequest.builder().streamName(streamName).build();
        DeleteStreamResponse response = kinesisClient.deleteStream(request);
        if (response.sdkHttpResponse().isSuccessful()) {
            LOG.info("Stream deleted successfully");
        } else {
            Assertions.fail((String)"Failed to delete the stream");
        }
    }

    public static void deleteStream(KinesisClient kinesisClient, String streamName) {
        try {
            LOG.info("Checking whether the stream exists already");
            DescribeStreamRequest request = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(streamName).build();
            DescribeStreamResponse response = kinesisClient.describeStream(request);
            if (response.sdkHttpResponse().isSuccessful()) {
                LOG.info("Kinesis stream check result");
                KinesisUtils.doDeleteStream(kinesisClient, streamName);
            }
        }
        catch (ResourceNotFoundException e) {
            LOG.info("The stream does not exist, skipping deletion");
        }
        catch (ResourceInUseException e) {
            LOG.info("The stream exist but cannot be deleted because it's in use");
            KinesisUtils.doDeleteStream(kinesisClient, streamName);
        }
    }

    public static List<PutRecordsResponse> putRecords(KinesisClient kinesisClient, String streamName, int count) {
        return KinesisUtils.putRecords(kinesisClient, streamName, count, null);
    }

    public static List<PutRecordsResponse> putRecords(KinesisClient kinesisClient, String streamName, int count, Consumer<PutRecordsRequest.Builder> customizer) {
        ArrayList<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
        LOG.debug("Adding data to the Kinesis stream");
        for (int i = 0; i < count; ++i) {
            String partition = String.format("partitionKey-%d", i);
            PutRecordsRequestEntry putRecordsRequestEntry = (PutRecordsRequestEntry)PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArray((byte[])String.valueOf(i).getBytes())).partitionKey(partition).build();
            LOG.debug("Added data {} (as bytes) to partition {}", (Object)i, (Object)partition);
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
        }
        LOG.debug("Done creating the data records");
        PutRecordsRequest.Builder requestBuilder = PutRecordsRequest.builder();
        requestBuilder.streamName(streamName).records(putRecordsRequestEntryList);
        if (customizer != null) {
            customizer.accept(requestBuilder);
        }
        PutRecordsRequest putRecordsRequest = (PutRecordsRequest)requestBuilder.build();
        ArrayList<PutRecordsResponse> replies = new ArrayList<PutRecordsResponse>(count);
        int retries = 5;
        while (true) {
            try {
                replies.add(kinesisClient.putRecords(putRecordsRequest));
            }
            catch (AwsServiceException e) {
                LOG.trace("Failed to put the records: {}. Retrying in 2 seconds ...", (Object)e.getMessage());
                if (--retries == 0) {
                    LOG.error("Failed to put the records: {}", (Object)e.getMessage(), (Object)e);
                    throw e;
                }
                try {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(2L));
                    continue;
                }
                catch (InterruptedException ex) {
                    break;
                }
                if (retries > 0) continue;
            }
            break;
        }
        return replies;
    }

    private static boolean hasShards(KinesisClient kinesisClient, DescribeStreamRequest describeStreamRequest) {
        DescribeStreamResponse streamRes = kinesisClient.describeStream(describeStreamRequest);
        return !streamRes.streamDescription().shards().isEmpty();
    }

    private static List<Shard> getAllShards(KinesisClient kinesisClient, DescribeStreamRequest describeStreamRequest) {
        DescribeStreamResponse streamRes;
        ArrayList<Shard> shards = new ArrayList<Shard>();
        do {
            streamRes = kinesisClient.describeStream(describeStreamRequest);
            shards.addAll(streamRes.streamDescription().shards());
        } while (streamRes.streamDescription().hasMoreShards().booleanValue());
        return shards;
    }

    public static GetRecordsRequest getGetRecordsRequest(KinesisClient kinesisClient, String streamName) {
        DescribeStreamRequest describeStreamRequest = (DescribeStreamRequest)DescribeStreamRequest.builder().streamName(streamName).build();
        TestUtils.waitFor(() -> KinesisUtils.hasShards(kinesisClient, describeStreamRequest));
        List<Shard> shards = KinesisUtils.getAllShards(kinesisClient, describeStreamRequest);
        GetShardIteratorRequest iteratorRequest = (GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(streamName).shardId(shards.get(0).shardId()).shardIteratorType("TRIM_HORIZON").build();
        GetShardIteratorResponse iteratorResponse = kinesisClient.getShardIterator(iteratorRequest);
        return (GetRecordsRequest)GetRecordsRequest.builder().shardIterator(iteratorResponse.shardIterator()).build();
    }
}

