/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.OptionalLong;
import java.util.Queue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SourceTaskTerminationTest {
    private static OneShotLatch ready;
    private static MultiShotLatch runLoopStart;
    private static MultiShotLatch runLoopEnd;

    SourceTaskTerminationTest() {
    }

    @BeforeEach
    void initialize() {
        ready = new OneShotLatch();
        runLoopStart = new MultiShotLatch();
        runLoopEnd = new MultiShotLatch();
    }

    @Test
    void testStopWithSavepointWithMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(true);
    }

    @Test
    void testStopWithSavepointWithoutMaxWatermark() throws Exception {
        this.stopWithSavepointStreamTaskTestHelper(false);
    }

    private void stopWithSavepointStreamTaskTestHelper(boolean shouldTerminate) throws Exception {
        long syncSavepointId = 34L;
        try (StreamTaskMailboxTestHarness<Long> srcTaskTestHarness = this.getSourceStreamTaskTestHarness();){
            StreamTask<Long, ?> srcTask = srcTaskTestHarness.getStreamTask();
            srcTaskTestHarness.processAll();
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
            srcTaskTestHarness.processUntil(srcTask.triggerCheckpointAsync(new CheckpointMetaData(31L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation())::isDone);
            this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
            this.emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
            srcTaskTestHarness.processUntil(srcTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), new CheckpointOptions((SnapshotType)(shouldTerminate ? SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL) : SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL)), CheckpointStorageLocationReference.getDefault()))::isDone);
            if (shouldTerminate) {
                this.verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
            }
            this.verifyEvent(srcTaskTestHarness.getOutput(), (AbstractEvent)new EndOfData(shouldTerminate ? StopMode.DRAIN : StopMode.NO_DRAIN));
            this.verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 34L);
            this.waitForSynchronousSavepointIdToBeSet(srcTask);
            Assertions.assertThat((OptionalLong)srcTask.getSynchronousSavepointId()).isPresent();
            srcTaskTestHarness.processUntil(srcTask.notifyCheckpointCompleteAsync(34L)::isDone);
            srcTaskTestHarness.waitForTaskCompletion();
        }
    }

    private StreamTaskMailboxTestHarness<Long> getSourceStreamTaskTestHarness() throws Exception {
        StreamTaskMailboxTestHarness<Long> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO).setCollectNetworkEvents().modifyExecutionConfig(config -> config.setLatencyTrackingInterval(-1L)).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new LockStepSourceWithOneWmPerElement())).build();
        return testHarness;
    }

    private void waitForSynchronousSavepointIdToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        while (!streamTaskUnderTest.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
        }
    }

    private void emitAndVerifyWatermarkAndElement(StreamTaskMailboxTestHarness<Long> srcTaskTestHarness, long expectedElement) throws Exception {
        runLoopStart.trigger();
        runLoopEnd.await();
        srcTaskTestHarness.processAll();
        this.verifyWatermark(srcTaskTestHarness.getOutput(), new Watermark(expectedElement));
        this.verifyNextElement(srcTaskTestHarness.getOutput(), expectedElement);
    }

    private void verifyNextElement(Queue<Object> output, long expectedElement) {
        Object next = output.remove();
        ((ObjectAssert)Assertions.assertThat((Object)next).as("next element is not an event", new Object[0])).isInstanceOf(StreamRecord.class);
        ((AbstractLongAssert)Assertions.assertThat((Long)((Long)((StreamRecord)next).getValue())).as("wrong event", new Object[0])).isEqualTo(expectedElement);
    }

    private void verifyWatermark(Queue<Object> output, Watermark expectedWatermark) {
        Object next = output.remove();
        ((ObjectAssert)Assertions.assertThat((Object)next).as("next element is not an event", new Object[0])).isInstanceOf(Watermark.class);
        ((ObjectAssert)Assertions.assertThat((Object)next).as("wrong watermark", new Object[0])).isEqualTo((Object)expectedWatermark);
    }

    private void verifyEvent(Queue<Object> output, AbstractEvent expectedEvent) {
        Object next = output.remove();
        ((ObjectAssert)Assertions.assertThat((Object)next).isInstanceOf(expectedEvent.getClass())).isEqualTo((Object)expectedEvent);
    }

    private void verifyCheckpointBarrier(Queue<Object> output, long checkpointId) {
        Object next = output.remove();
        ((ObjectAssert)Assertions.assertThat((Object)next).as("next element is not a checkpoint barrier", new Object[0])).isInstanceOf(CheckpointBarrier.class);
        ((AbstractLongAssert)Assertions.assertThat((long)((CheckpointBarrier)next).getId()).as("wrong checkpoint id", new Object[0])).isEqualTo(checkpointId);
    }

    private static class LockStepSourceWithOneWmPerElement
    implements SourceFunction<Long> {
        private volatile boolean isRunning;

        private LockStepSourceWithOneWmPerElement() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long element = 1L;
            this.isRunning = true;
            ready.trigger();
            while (this.isRunning) {
                runLoopStart.await();
                if (this.isRunning) {
                    ctx.emitWatermark(new Watermark(element));
                    ctx.collect((Object)element++);
                }
                runLoopEnd.trigger();
            }
        }

        public void cancel() {
            this.isRunning = false;
            runLoopStart.trigger();
        }
    }
}

