/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;

public class OneInputStreamTaskTest
extends TestLogger {
    private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR = new ListStateDescriptor("test", (TypeSerializer)new IntSerializer());

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)"Ciao", initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)"Ciao", initialTime + 2L));
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime));
        testHarness.processElement(new StreamRecord((Object)"Ciao", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao", initialTime));
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 2L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 0);
        testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 5L), 1, 1);
        testHarness.processElement(WatermarkStatus.IDLE, 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 5L));
        expectedOutput.add(new Watermark(initialTime + 6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE, 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.ACTIVE, 1, 0);
        testHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)resultElements.size());
    }

    @Test
    public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator();
        WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator();
        TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator();
        testHarness.setupOperatorChain(new OperatorID(42L, 42L), (StreamOperator<?>)headOperator).chain(new OperatorID(4711L, 42L), watermarkOperator, StringSerializer.INSTANCE).chain(new OperatorID(123L, 123L), tailOperator, StringSerializer.INSTANCE).finish();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"10"), 0, 0);
        testHarness.processElement(new Watermark(15L));
        testHarness.processElement(new StreamRecord((Object)"20"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"30"), 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"10"));
        expectedOutput.add(new Watermark(10L));
        expectedOutput.add(new StreamRecord((Object)"20"));
        expectedOutput.add(new Watermark(20L));
        expectedOutput.add(new StreamRecord((Object)"30"));
        expectedOutput.add(new Watermark(30L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.IDLE);
        testHarness.processElement(new StreamRecord((Object)"NO_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"40"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"50"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"60"), 0, 0);
        testHarness.processElement(new Watermark(65L));
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.IDLE);
        expectedOutput.add(new StreamRecord((Object)"NO_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"40"));
        expectedOutput.add(new StreamRecord((Object)"50"));
        expectedOutput.add(new StreamRecord((Object)"60"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(WatermarkStatus.ACTIVE);
        testHarness.processElement(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        testHarness.processElement(new StreamRecord((Object)"70"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"80"), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"90"), 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(WatermarkStatus.ACTIVE);
        expectedOutput.add(new StreamRecord((Object)"EXPECT_WATERMARKS"));
        expectedOutput.add(new StreamRecord((Object)"70"));
        expectedOutput.add(new Watermark(70L));
        expectedOutput.add(new StreamRecord((Object)"80"));
        expectedOutput.add(new Watermark(80L));
        expectedOutput.add(new StreamRecord((Object)"90"));
        expectedOutput.add(new Watermark(90L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)12L, (long)resultElements.size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-1-1", initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)"Ciao-1-1", initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello-1-1", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-1-1", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-1-1", initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)"Ciao-1-1", initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello-1-1", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-1-1", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        expectedOutput.add(new CancelCheckpointMarker(0L));
        expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testSnapshottingAndRestoring() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(2L));
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        IdentityKeySelector keySelector = new IdentityKeySelector();
        testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
        long checkpointId = 1L;
        long checkpointTimestamp = 1L;
        int numberChainedTasks = 11;
        StreamConfig streamConfig = testHarness.getStreamConfig();
        this.configureChainedTestingStreamOperator(streamConfig, numberChainedTasks);
        TestTaskStateManager taskStateManager = testHarness.taskStateManager;
        TestingStreamOperator.numberRestoreCalls = 0;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask streamTask = testHarness.getTask();
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
        streamTask.triggerCheckpointAsync(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation()).get();
        Assert.assertEquals((long)0L, (long)TestingStreamOperator.numberRestoreCalls);
        taskStateManager.getWaitForReportLatch().await();
        Assert.assertEquals((long)checkpointId, (long)taskStateManager.getReportedCheckpointId());
        testHarness.endInput();
        testHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
        OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        restoredTaskHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
        restoredTaskHarness.setTaskStateSnapshot(checkpointId, taskStateManager.getLastJobManagerTaskStateSnapshot());
        StreamConfig restoredTaskStreamConfig = restoredTaskHarness.getStreamConfig();
        this.configureChainedTestingStreamOperator(restoredTaskStreamConfig, numberChainedTasks);
        TaskStateSnapshot stateHandles = taskStateManager.getLastJobManagerTaskStateSnapshot();
        Assert.assertEquals((long)numberChainedTasks, (long)stateHandles.getSubtaskStateMappings().size());
        TestingStreamOperator.numberRestoreCalls = 0;
        restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
        restoredTaskHarness.invoke();
        restoredTaskHarness.endInput();
        restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());
        Assert.assertEquals((long)numberChainedTasks, (long)TestingStreamOperator.numberRestoreCalls);
        TestingStreamOperator.numberRestoreCalls = 0;
        TestingStreamOperator.numberSnapshotCalls = 0;
    }

    @Test
    public void testQuiesceTimerServiceAfterOpClose() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new TestOperator());
        streamConfig.setOperatorID(new OperatorID());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        SystemProcessingTimeService timeService = (SystemProcessingTimeService)testHarness.getTimerService();
        Assert.assertTrue((boolean)timeService.isAlive());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        timeService.shutdownService();
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new TestBoundedOneInputStreamOperator("Operator0")).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello"));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        ArrayDeque expected = new ArrayDeque();
        Collections.addAll(expected, new StreamRecord((Object)"Hello"), new StreamRecord((Object)"[Operator0]: End of input"), new StreamRecord((Object)"[Operator0]: Finish"), new StreamRecord((Object)"[Operator1]: End of input"), new StreamRecord((Object)"[Operator1]: Finish"));
        MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expected.toArray()));
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new DuplicatingOperator()).chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final TaskMetricGroup taskMetricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)NoOpMetricRegistry.INSTANCE, (String)"host", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobname").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "task");
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            @Override
            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        int numRecords = 5;
        for (int x = 0; x < 5; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"));
        }
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)5L, (long)numRecordsInCounter.getCount());
        Assert.assertEquals((long)40L, (long)numRecordsOutCounter.getCount());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        WatermarkMetricOperator headOperator = new WatermarkMetricOperator();
        final OperatorID headOperatorId = new OperatorID();
        WatermarkMetricOperator chainedOperator = new WatermarkMetricOperator();
        final OperatorID chainedOperatorId = new OperatorID();
        testHarness.setupOperatorChain(headOperatorId, (StreamOperator<?>)headOperator).chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public InternalOperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)headOperatorId)) {
                    return headOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            @Override
            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
        Gauge headInputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInputWatermark");
        Gauge headOutputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentOutputWatermark");
        Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
        Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
        Assert.assertEquals((String)"A metric was registered multiple times.", (long)5L, (long)new HashSet<Gauge>(Arrays.asList(taskInputWatermarkGauge, headInputWatermarkGauge, headOutputWatermarkGauge, chainedInputWatermarkGauge, chainedOutputWatermarkGauge)).size());
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(1L));
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)1L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)4L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(2L));
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)2L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)4L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)4L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)8L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new TestOperator());
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        StreamMockEnvironment environment = testHarness.createEnvironment();
        environment.setTaskMetricGroup(taskMetricGroup);
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointAlignmentTime"));
        MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointStartDelayNanos"));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    private void configureChainedTestingStreamOperator(StreamConfig streamConfig, int numberChainedTasks) {
        Preconditions.checkArgument((numberChainedTasks >= 1 ? 1 : 0) != 0, (Object)"The operator chain must at least contain one operator.");
        TestingStreamOperator previousOperator = new TestingStreamOperator();
        streamConfig.setStreamOperator(previousOperator);
        streamConfig.setOperatorID(new OperatorID(0L, 0L));
        HashMap<Integer, StreamConfig> chainedTaskConfigs = new HashMap<Integer, StreamConfig>(numberChainedTasks - 1);
        ArrayList<StreamEdge> outputEdges = new ArrayList<StreamEdge>(numberChainedTasks - 1);
        for (int chainedIndex = 1; chainedIndex < numberChainedTasks; ++chainedIndex) {
            TestingStreamOperator chainedOperator = new TestingStreamOperator();
            StreamConfig chainedConfig = new StreamConfig(new Configuration());
            chainedConfig.setupNetworkInputs(new TypeSerializer[]{StringSerializer.INSTANCE});
            chainedConfig.setStreamOperator(chainedOperator);
            chainedConfig.setOperatorID(new OperatorID(0L, (long)chainedIndex));
            chainedTaskConfigs.put(chainedIndex, chainedConfig);
            StreamEdge outputEdge = new StreamEdge(new StreamNode(Integer.valueOf(chainedIndex - 1), null, null, (StreamOperator)null, null, null), new StreamNode(Integer.valueOf(chainedIndex), null, null, (StreamOperator)null, null, null), 0, null, null);
            outputEdges.add(outputEdge);
        }
        streamConfig.setChainedOutputs(outputEdges);
        chainedTaskConfigs.values().forEach(StreamConfig::serializeAllConfigs);
        streamConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedTaskConfigs);
    }

    private static class TriggerableFailOnWatermarkTestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813243L;
        public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS";
        public static final String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS";
        protected boolean expectForwardedWatermarks;

        private TriggerableFailOnWatermarkTestOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
            if (((String)element.getValue()).equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = true;
            } else if (((String)element.getValue()).equals(NO_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = false;
            } else {
                this.handleElement(element);
            }
        }

        public void processWatermark(Watermark mark) throws Exception {
            if (!this.expectForwardedWatermarks) {
                throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks.");
            }
            this.handleWatermark(mark);
        }

        protected void handleElement(StreamRecord<String> element) {
        }

        protected void handleWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }
    }

    private static class WatermarkGeneratingTestOperator
    extends TriggerableFailOnWatermarkTestOperator {
        private static final long serialVersionUID = -5064871833244157221L;
        private long lastWatermark;

        private WatermarkGeneratingTestOperator() {
        }

        @Override
        protected void handleElement(StreamRecord<String> element) {
            long timestamp = Long.valueOf((String)element.getValue());
            if (timestamp > this.lastWatermark) {
                this.output.emitWatermark(new Watermark(timestamp));
                this.lastWatermark = timestamp;
            }
        }

        @Override
        protected void handleWatermark(Watermark mark) {
            if (mark.equals((Object)Watermark.MAX_WATERMARK)) {
                this.output.emitWatermark(mark);
                this.lastWatermark = Long.MAX_VALUE;
            }
        }
    }

    private static class IdentityMap
    implements MapFunction<String, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map(String value) throws Exception {
            return value;
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichMapFunction<String, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        TestOpenCloseMapFunction() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map(String value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }
    }

    private static class TestingStreamOperator<IN, OUT>
    extends AbstractStreamOperator<OUT>
    implements OneInputStreamOperator<IN, OUT> {
        private static final long serialVersionUID = 774614855940397174L;
        public static int numberRestoreCalls = 0;
        public static int numberSnapshotCalls = 0;

        private TestingStreamOperator() {
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            ListState partitionableState = this.getOperatorStateBackend().getListState(TEST_DESCRIPTOR);
            partitionableState.clear();
            partitionableState.add((Object)42);
            partitionableState.add((Object)4711);
            ++numberSnapshotCalls;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            if (context.isRestored()) {
                ++numberRestoreCalls;
            }
            ListState partitionableState = context.getOperatorStateStore().getListState(TEST_DESCRIPTOR);
            if (numberSnapshotCalls == 0) {
                for (Integer v : (Iterable)partitionableState.get()) {
                    Assert.fail();
                }
            } else {
                HashSet<Integer> result = new HashSet<Integer>();
                for (Integer v : (Iterable)partitionableState.get()) {
                    result.add(v);
                }
                Assert.assertEquals((long)2L, (long)result.size());
                Assert.assertTrue((boolean)result.contains(42));
                Assert.assertTrue((boolean)result.contains(4711));
            }
        }

        public void processElement(StreamRecord<IN> element) throws Exception {
        }
    }

    private static class IdentityKeySelector<IN>
    implements KeySelector<IN, IN> {
        private static final long serialVersionUID = -3555913664416688425L;

        private IdentityKeySelector() {
        }

        public IN getKey(IN value) throws Exception {
            return value;
        }
    }

    static class WatermarkMetricOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        WatermarkMetricOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
        }

        public void processWatermark(Watermark mark) {
            this.output.emitWatermark(new Watermark(mark.getTimestamp() * 2L));
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        DuplicatingOperator() {
        }

        public void processElement(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }
    }

    private static class TestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;

        private TestOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.output.collect(element);
        }

        public void finish() throws Exception {
            Assert.assertTrue((boolean)((SystemProcessingTimeService)this.getContainingTask().getTimerService()).isAlive());
            super.close();
        }
    }
}

