/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.testsuites;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.connector.testframe.utils.MetricQuerier;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLoggerExtension;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class})
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@Experimental
public abstract class SourceTestSuiteBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceTestSuiteBase.class);

    @TestTemplate
    @DisplayName(value="Test source with single split")
    public void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envSettings = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> source = this.tryCreateSource(externalContext, sourceSettings);
        List<T> testRecords = this.generateAndWriteTestData(0, externalContext, sourceSettings);
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envSettings);
        DataStreamSource stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1);
        CollectIteratorBuilder<T> iteratorBuilder = this.addCollectSink((DataStream<T>)stream);
        JobClient jobClient = this.submitJob(execEnv, "Source Single Split Test");
        try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient);){
            LOG.info("Checking test results");
            this.checkResultWithSemantic((CloseableIterator<T>)resultIterator, Collections.singletonList(testRecords), semantic, null);
        }
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
    }

    @TestTemplate
    @DisplayName(value="Test source with multiple splits")
    public void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> source = this.tryCreateSource(externalContext, sourceSettings);
        int splitNumber = 4;
        ArrayList<List<T>> testRecordsLists = new ArrayList<List<T>>();
        for (int i = 0; i < splitNumber; ++i) {
            testRecordsLists.add(this.generateAndWriteTestData(i, externalContext, sourceSettings));
        }
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
        DataStreamSource stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(splitNumber);
        CollectIteratorBuilder<T> iteratorBuilder = this.addCollectSink((DataStream<T>)stream);
        JobClient jobClient = this.submitJob(execEnv, "Source Multiple Split Test");
        try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient);){
            LOG.info("Checking test results");
            this.checkResultWithSemantic((CloseableIterator<T>)resultIterator, (List<List<T>>)testRecordsLists, semantic, null);
        }
    }

    @TestTemplate
    @DisplayName(value="Test source restarting from a savepoint")
    public void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 4, 4, 4);
    }

    @TestTemplate
    @DisplayName(value="Test source restarting with a higher parallelism")
    public void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 4, 2, 4);
    }

    @TestTemplate
    @DisplayName(value="Test source restarting with a lower parallelism")
    public void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        this.restartFromSavepoint(testEnv, externalContext, semantic, 4, 4, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restartFromSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic, int splitNumber, int beforeParallelism, int afterParallelism) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        ArrayList<ExternalSystemSplitDataWriter<T>> writers = new ArrayList<ExternalSystemSplitDataWriter<T>>();
        ArrayList<List<T>> testRecordCollections = new ArrayList<List<T>>();
        for (int i = 0; i < splitNumber; ++i) {
            writers.add(externalContext.createSourceSplitDataWriter(sourceSettings));
            testRecordCollections.add(this.generateTestDataForWriter(externalContext, sourceSettings, i, (ExternalSystemSplitDataWriter)writers.get(i)));
        }
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
        execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        execEnv.enableCheckpointing(50L);
        execEnv.setRestartStrategy(RestartStrategies.noRestart());
        DataStreamSource source = execEnv.fromSource(this.tryCreateSource(externalContext, sourceSettings), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(beforeParallelism);
        CollectIteratorBuilder<T> iteratorBuilder = this.addCollectSink((DataStream<T>)source);
        JobClient jobClient = execEnv.executeAsync("Restart Test");
        CollectResultIterator<T> iterator = null;
        try {
            iterator = iteratorBuilder.build(jobClient);
            this.checkResultWithSemantic((CloseableIterator<T>)iterator, (List<List<T>>)testRecordCollections, semantic, this.getTestDataSize(testRecordCollections));
        }
        catch (Exception e) {
            this.killJob(jobClient);
            throw e;
        }
        String savepointPath = (String)jobClient.stopWithSavepoint(true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL).get(30L, TimeUnit.SECONDS);
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
        ArrayList<List<T>> newTestRecordCollections = new ArrayList<List<T>>();
        for (int i = 0; i < splitNumber; ++i) {
            newTestRecordCollections.add(this.generateTestDataForWriter(externalContext, sourceSettings, i, (ExternalSystemSplitDataWriter)writers.get(i)));
        }
        TestEnvironmentSettings restartEnvOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).setSavepointRestorePath(savepointPath).build();
        StreamExecutionEnvironment restartEnv = testEnv.createExecutionEnvironment(restartEnvOptions);
        restartEnv.enableCheckpointing(500L);
        restartEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource restartSource = restartEnv.fromSource(this.tryCreateSource(externalContext, sourceSettings), WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(afterParallelism);
        this.addCollectSink((DataStream<T>)restartSource);
        JobClient restartJobClient = restartEnv.executeAsync("Restart Test");
        CommonTestUtils.waitForJobStatus((JobClient)restartJobClient, Collections.singletonList(JobStatus.RUNNING));
        try {
            iterator.setJobClient(restartJobClient);
            this.checkResultWithSemantic((CloseableIterator<T>)iterator, (List<List<T>>)newTestRecordCollections, semantic, this.getTestDataSize(newTestRecordCollections));
        }
        finally {
            this.killJob(restartJobClient);
            iterator.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    @DisplayName(value="Test source metrics")
    public void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        int splitNumber = 4;
        ArrayList<List<T>> testRecordCollections = new ArrayList<List<T>>();
        for (int i = 0; i < 4; ++i) {
            testRecordCollections.add(this.generateAndWriteTestData(i, externalContext, sourceSettings));
        }
        String sourceName = "metricTestSource" + testRecordCollections.hashCode();
        StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(envOptions);
        DataStreamSource dataStreamSource = env.fromSource(this.tryCreateSource(externalContext, sourceSettings), WatermarkStrategy.noWatermarks(), sourceName).setParallelism(4);
        dataStreamSource.addSink((SinkFunction)new DiscardingSink());
        JobClient jobClient = env.executeAsync("Metrics Test");
        MetricQuerier queryRestClient = new MetricQuerier(new Configuration());
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            CommonTestUtils.waitForAllTaskRunning(() -> MetricQuerier.getJobDetails(new RestClient(new Configuration(), (Executor)executorService), testEnv.getRestEndpoint(), jobClient.getJobID()));
            CommonTestUtils.waitUntilCondition(() -> {
                try {
                    return this.checkSourceMetrics(queryRestClient, testEnv, jobClient.getJobID(), sourceName, this.getTestDataSize(testRecordCollections));
                }
                catch (Exception e) {
                    return false;
                }
            });
        }
        finally {
            executorService.shutdown();
            this.killJob(jobClient);
        }
    }

    @TestTemplate
    @DisplayName(value="Test source with at least one idle parallelism")
    public void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.BOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> source = this.tryCreateSource(externalContext, sourceSettings);
        int splitNumber = 4;
        ArrayList<List<T>> testRecordsLists = new ArrayList<List<T>>();
        for (int i = 0; i < splitNumber; ++i) {
            testRecordsLists.add(this.generateAndWriteTestData(i, externalContext, sourceSettings));
        }
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
        DataStreamSource stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(splitNumber + 1);
        CollectIteratorBuilder<T> iteratorBuilder = this.addCollectSink((DataStream<T>)stream);
        JobClient jobClient = this.submitJob(execEnv, "Idle Reader Test");
        try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient);){
            LOG.info("Checking test results");
            this.checkResultWithSemantic((CloseableIterator<T>)resultIterator, (List<List<T>>)testRecordsLists, semantic, null);
        }
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
    }

    @TestTemplate
    @DisplayName(value="Test TaskManager failure")
    public void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic) throws Exception {
        TestingSourceSettings sourceSettings = TestingSourceSettings.builder().setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED).setCheckpointingMode(semantic).build();
        TestEnvironmentSettings envOptions = TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build();
        Source<T, ?, ?> source = this.tryCreateSource(externalContext, sourceSettings);
        int splitIndex = 0;
        List<T> testRecordsBeforeFailure = externalContext.generateTestData(sourceSettings, splitIndex, ThreadLocalRandom.current().nextLong());
        ExternalSystemSplitDataWriter<T> externalSystemSplitDataWriter = externalContext.createSourceSplitDataWriter(sourceSettings);
        LOG.info("Writing {} records for split {} to external system", (Object)testRecordsBeforeFailure.size(), (Object)splitIndex);
        externalSystemSplitDataWriter.writeRecords(testRecordsBeforeFailure);
        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
        execEnv.enableCheckpointing(50L);
        DataStreamSource stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Tested Source").setParallelism(1);
        CollectIteratorBuilder<T> iteratorBuilder = this.addCollectSink((DataStream<T>)stream);
        JobClient jobClient = this.submitJob(execEnv, "TaskManager Failover Test");
        CollectResultIterator<T> iterator = iteratorBuilder.build(jobClient);
        LOG.info("Checking records before killing TaskManagers");
        this.checkResultWithSemantic((CloseableIterator<T>)iterator, Collections.singletonList(testRecordsBeforeFailure), semantic, testRecordsBeforeFailure.size());
        LOG.info("Trigger TaskManager failover");
        controller.triggerTaskManagerFailover(jobClient, () -> {});
        LOG.info("Waiting for job recovering from failure");
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.RUNNING));
        List<T> testRecordsAfterFailure = externalContext.generateTestData(sourceSettings, splitIndex, ThreadLocalRandom.current().nextLong());
        LOG.info("Writing {} records for split {} to external system", (Object)testRecordsAfterFailure.size(), (Object)splitIndex);
        externalSystemSplitDataWriter.writeRecords(testRecordsAfterFailure);
        LOG.info("Checking records after job failover");
        this.checkResultWithSemantic((CloseableIterator<T>)iterator, Collections.singletonList(testRecordsAfterFailure), semantic, testRecordsAfterFailure.size());
        CommonTestUtils.terminateJob((JobClient)jobClient);
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.CANCELED));
        iterator.close();
    }

    protected List<T> generateAndWriteTestData(int splitIndex, DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings testingSourceSettings) {
        List<T> testRecords = externalContext.generateTestData(testingSourceSettings, splitIndex, ThreadLocalRandom.current().nextLong());
        LOG.info("Writing {} records for split {} to external system", (Object)testRecords.size(), (Object)splitIndex);
        externalContext.createSourceSplitDataWriter(testingSourceSettings).writeRecords(testRecords);
        return testRecords;
    }

    protected Source<T, ?, ?> tryCreateSource(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceOptions) {
        try {
            return externalContext.createSource(sourceOptions);
        }
        catch (UnsupportedOperationException e) {
            throw new TestAbortedException("Cannot create source satisfying given options", (Throwable)e);
        }
    }

    protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception {
        LOG.info("Submitting Flink job to test environment");
        return env.executeAsync(jobName);
    }

    protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream) {
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator();
        CollectStreamSink sink = new CollectStreamSink(stream, factory);
        sink.name("Data stream collect sink");
        stream.getExecutionEnvironment().addOperator(sink.getTransformation());
        return new CollectIteratorBuilder(operator, serializer, accumulatorName, stream.getExecutionEnvironment().getCheckpointConfig());
    }

    protected List<T> generateTestDataForWriter(DataStreamSourceExternalContext<T> externalContext, TestingSourceSettings sourceSettings, int splitIndex, ExternalSystemSplitDataWriter<T> writer) {
        List<T> testRecordCollection = externalContext.generateTestData(sourceSettings, splitIndex, ThreadLocalRandom.current().nextLong());
        LOG.debug("Writing {} records to external system", (Object)testRecordCollection.size());
        writer.writeRecords(testRecordCollection);
        return testRecordCollection;
    }

    protected int getTestDataSize(List<List<T>> collections) {
        int sumSize = 0;
        for (Collection collection : collections) {
            sumSize += collection.size();
        }
        return sumSize;
    }

    protected void checkResultWithSemantic(CloseableIterator<T> resultIterator, List<List<T>> testData, CheckpointingMode semantic, Integer limit) {
        if (limit != null) {
            Runnable runnable = () -> CollectIteratorAssertions.assertThat(resultIterator).withNumRecordsLimit(limit).matchesRecordsFromSource(testData, semantic);
            FlinkAssertions.assertThatFuture(CompletableFuture.runAsync(runnable)).eventuallySucceeds();
        } else {
            CollectIteratorAssertions.assertThat(resultIterator).matchesRecordsFromSource(testData, semantic);
        }
    }

    private boolean checkSourceMetrics(MetricQuerier queryRestClient, TestEnvironment testEnv, JobID jobId, String sourceName, long allRecordSize) throws Exception {
        Double sumNumRecordsIn = queryRestClient.getAggregatedMetricsByRestAPI(testEnv.getRestEndpoint(), jobId, sourceName, "numRecordsIn", null);
        return Precision.equals((double)allRecordSize, (double)sumNumRecordsIn);
    }

    private void killJob(JobClient jobClient) throws Exception {
        CommonTestUtils.terminateJob((JobClient)jobClient);
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.CANCELED));
    }

    protected static class CollectIteratorBuilder<T> {
        private final CollectSinkOperator<T> operator;
        private final TypeSerializer<T> serializer;
        private final String accumulatorName;
        private final CheckpointConfig checkpointConfig;

        protected CollectIteratorBuilder(CollectSinkOperator<T> operator, TypeSerializer<T> serializer, String accumulatorName, CheckpointConfig checkpointConfig) {
            this.operator = operator;
            this.serializer = serializer;
            this.accumulatorName = accumulatorName;
            this.checkpointConfig = checkpointConfig;
        }

        protected CollectResultIterator<T> build(JobClient jobClient) {
            CollectResultIterator iterator = new CollectResultIterator(this.operator.getOperatorIdFuture(), this.serializer, this.accumulatorName, this.checkpointConfig);
            iterator.setJobClient(jobClient);
            return iterator;
        }
    }
}

