/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;

@Deprecated
public class StreamTaskTestHarness<OUT> {
    public static final int DEFAULT_MEMORY_MANAGER_SIZE = 0x100000;
    public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
    private final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
    public long memorySize;
    public int bufferSize;
    protected StreamMockEnvironment mockEnv;
    protected ExecutionConfig executionConfig;
    public Configuration jobConfig;
    public Configuration taskConfig;
    protected StreamConfig streamConfig;
    protected TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
    protected TestTaskStateManager taskStateManager;
    private TypeSerializer<OUT> outputSerializer;
    private TypeSerializer<StreamElement> outputStreamRecordSerializer;
    private LinkedBlockingQueue<Object> outputList;
    protected TaskThread taskThread;
    protected int numInputGates;
    protected int numInputChannelsPerGate;
    private boolean setupCalled = false;
    protected StreamTestSingleInputGate[] inputGates;

    public StreamTaskTestHarness(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, TypeInformation<OUT> outputType) {
        this(taskFactory, outputType, TestLocalRecoveryConfig.disabled());
    }

    public StreamTaskTestHarness(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, TypeInformation<OUT> outputType, File localRootDir) {
        this(taskFactory, outputType, new LocalRecoveryConfig((LocalRecoveryDirectoryProvider)new LocalRecoveryDirectoryProviderImpl(localRootDir, new JobID(), new JobVertexID(), 0)));
    }

    public StreamTaskTestHarness(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory, TypeInformation<OUT> outputType, LocalRecoveryConfig localRecoveryConfig) {
        this.taskFactory = (FunctionWithException)Preconditions.checkNotNull(taskFactory);
        this.memorySize = 0x100000L;
        this.bufferSize = 1024;
        this.jobConfig = new Configuration();
        this.taskConfig = new Configuration();
        this.executionConfig = new ExecutionConfig();
        this.streamConfig = new StreamConfig(this.taskConfig);
        this.streamConfig.setStateBackendUsesManagedMemory(true);
        this.streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
        this.outputSerializer = outputType.createSerializer(this.executionConfig);
        this.outputStreamRecordSerializer = new StreamElementSerializer(this.outputSerializer);
        this.taskStateManager = new TestTaskStateManager(localRecoveryConfig);
    }

    public StreamMockEnvironment getEnvironment() {
        return this.mockEnv;
    }

    public TimerService getTimerService() {
        return this.taskThread.task.getTimerService();
    }

    public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() {
        return this.taskManagerRuntimeInfo;
    }

    public <OP extends StreamOperator<OUT>> OP getHeadOperator() {
        return (OP)this.taskThread.task.getMainOperator();
    }

    protected void initializeInputs() throws IOException, InterruptedException {
    }

    public TestTaskStateManager getTaskStateManager() {
        return this.taskStateManager;
    }

    public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateManager.setReportedCheckpointId(checkpointId);
        this.taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(Collections.singletonMap(checkpointId, taskStateSnapshot));
    }

    private void initializeOutput() {
        this.outputList = new LinkedBlockingQueue();
        this.mockEnv.addOutput(this.outputList, this.outputStreamRecordSerializer);
    }

    public void setupOutputForSingletonOperatorChain() {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(Integer.valueOf(0));
        this.streamConfig.setOperatorID(new OperatorID(4711L, 123L));
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<OUT>(){
            private static final long serialVersionUID = 1L;
        };
        LinkedList<NonChainedOutput> streamOutputs = new LinkedList<NonChainedOutput>();
        StreamNode sourceVertexDummy = new StreamNode(Integer.valueOf(0), "group", null, (StreamOperator)dummyOperator, "source dummy", SourceStreamTask.class);
        streamOutputs.add(new NonChainedOutput(true, sourceVertexDummy.getId(), 1, 1, 100L, false, new IntermediateDataSetID(), null, (StreamPartitioner)new BroadcastPartitioner(), ResultPartitionType.PIPELINED_BOUNDED));
        this.streamConfig.setVertexNonChainedOutputs(streamOutputs);
        this.streamConfig.setOperatorNonChainedOutputs(streamOutputs);
        this.streamConfig.serializeAllConfigs();
    }

    public StreamMockEnvironment createEnvironment() {
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, (TaskStateManager)this.taskStateManager);
        if (this.taskManagerRuntimeInfo != null) {
            streamMockEnvironment.setTaskManagerInfo(this.taskManagerRuntimeInfo);
        }
        return streamMockEnvironment;
    }

    public Thread invoke() throws Exception {
        this.streamConfig.serializeAllConfigs();
        return this.invoke(this.createEnvironment());
    }

    public Thread invoke(StreamMockEnvironment mockEnv) throws Exception {
        Preconditions.checkState((this.mockEnv == null ? 1 : 0) != 0);
        Preconditions.checkState((this.taskThread == null ? 1 : 0) != 0);
        this.mockEnv = (StreamMockEnvironment)Preconditions.checkNotNull((Object)mockEnv);
        this.initializeInputs();
        this.initializeOutput();
        this.streamConfig.serializeAllConfigs();
        this.taskThread = new TaskThread(() -> (StreamTask)this.taskFactory.apply((Object)mockEnv));
        this.taskThread.start();
        while (this.taskThread.task == null) {
            if (this.taskThread.error != null) {
                ExceptionUtils.rethrow((Throwable)this.taskThread.error);
            }
            Thread.sleep(10L);
        }
        return this.taskThread;
    }

    public void waitForTaskCompletion() throws Exception {
        this.waitForTaskCompletion(Long.MAX_VALUE);
    }

    public void waitForTaskCompletion(long timeout) throws Exception {
        this.waitForTaskCompletion(timeout, false);
    }

    public void waitForTaskCompletion(long timeout, boolean ignoreCancellationOrInterruptedException) throws Exception {
        Preconditions.checkState((this.taskThread != null ? 1 : 0) != 0, (Object)"Task thread was not started.");
        this.taskThread.join(timeout);
        if (this.taskThread.getError() != null) {
            boolean errorIsCancellationOrInterrupted;
            boolean bl = errorIsCancellationOrInterrupted = ExceptionUtils.findThrowable((Throwable)this.taskThread.getError(), CancelTaskException.class).isPresent() || ExceptionUtils.findThrowable((Throwable)this.taskThread.getError(), InterruptedException.class).isPresent();
            if (ignoreCancellationOrInterruptedException && errorIsCancellationOrInterrupted) {
                return;
            }
            throw new Exception("error in task", this.taskThread.getError());
        }
    }

    public void waitForTaskRunning() throws Exception {
        Preconditions.checkState((this.taskThread != null ? 1 : 0) != 0, (Object)"Task thread was not started.");
        StreamTask streamTask = this.taskThread.task;
        while (!streamTask.isRunning()) {
            Thread.sleep(10L);
            if (this.taskThread.isAlive()) continue;
            if (this.taskThread.getError() != null) {
                throw new Exception("Task Thread failed due to an error.", this.taskThread.getError());
            }
            throw new Exception("Task Thread unexpectedly shut down.");
        }
    }

    public StreamTask<OUT, ?> getTask() {
        return this.taskThread.task;
    }

    public Thread getTaskThread() {
        return this.taskThread;
    }

    public LinkedBlockingQueue<Object> getOutput() {
        return this.outputList;
    }

    public StreamConfig getStreamConfig() {
        return this.streamConfig;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    private void shutdownIOManager() throws Exception {
        this.mockEnv.getIOManager().close();
    }

    private void shutdownMemoryManager() {
        MemoryManager memMan;
        if (this.memorySize > 0L && (memMan = this.mockEnv.getMemoryManager()) != null) {
            Assert.assertTrue((String)"Memory Manager managed memory was not completely freed.", (boolean)memMan.verifyEmpty());
            memMan.shutdown();
        }
    }

    public void processElement(Object element) {
        this.inputGates[0].sendElement(element, 0);
    }

    public void processElement(Object element, int inputGate, int channel) {
        this.inputGates[inputGate].sendElement(element, channel);
    }

    public void processEvent(AbstractEvent event) {
        this.inputGates[0].sendEvent(event, 0);
    }

    public void processEvent(AbstractEvent event, int inputGate, int channel) {
        this.inputGates[inputGate].sendEvent(event, channel);
    }

    public void waitForInputProcessing() throws Exception {
        Throwable error;
        while (this.taskThread.isAlive()) {
            boolean allEmpty = true;
            for (int i = 0; i < this.numInputGates; ++i) {
                if (this.inputGates[i].allQueuesEmpty()) continue;
                allEmpty = false;
            }
            if (!allEmpty) continue;
            break;
        }
        AtomicBoolean allInputProcessed = new AtomicBoolean();
        MailboxProcessor mailboxProcessor = ((TaskThread)this.taskThread).task.mailboxProcessor;
        MailboxExecutor mailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
        while (this.taskThread.isAlive()) {
            try {
                CountDownLatch latch = new CountDownLatch(1);
                mailboxExecutor.execute(() -> {
                    allInputProcessed.set(!mailboxProcessor.isDefaultActionAvailable());
                    latch.countDown();
                }, "query-whether-processInput-has-suspend-itself");
                latch.await(1L, TimeUnit.SECONDS);
            }
            catch (RejectedExecutionException latch) {
                // empty catch block
            }
            if (allInputProcessed.get()) break;
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException latch) {}
        }
        if ((error = this.taskThread.getError()) != null) {
            throw new Exception("Exception in the task thread", error);
        }
    }

    public void endInput() {
        for (int i = 0; i < this.numInputGates; ++i) {
            this.inputGates[i].endInput();
        }
    }

    public void endInput(int gateIndex, int channelIndex) {
        this.endInput(gateIndex, channelIndex, true);
    }

    public void endInput(int gateIndex, int channelIndex, boolean emitEndOfData) {
        if (emitEndOfData) {
            this.inputGates[gateIndex].sendEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), channelIndex);
        }
        this.inputGates[gateIndex].sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, channelIndex);
    }

    public StreamConfigChainer<StreamTaskTestHarness<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperator<?> headOperator) {
        return this.setupOperatorChain(headOperatorId, (StreamOperatorFactory<?>)SimpleOperatorFactory.of(headOperator));
    }

    public StreamConfigChainer<StreamTaskTestHarness<OUT>> setupOperatorChain(OperatorID headOperatorId, StreamOperatorFactory<?> headOperatorFactory) {
        Preconditions.checkState((!this.setupCalled ? 1 : 0) != 0, (Object)"This harness was already setup.");
        this.setupCalled = true;
        StreamConfig streamConfig = this.getStreamConfig();
        streamConfig.setStreamOperatorFactory(headOperatorFactory);
        streamConfig.serializeAllConfigs();
        return new StreamConfigChainer<StreamTaskTestHarness<OUT>>(headOperatorId, streamConfig, this, 1);
    }

    static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics) {
        return TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)new TestMetricRegistry(metrics), (String)"localhost", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobName").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "test");
    }

    static class TestMetricRegistry
    extends NoOpMetricRegistry {
        private final Map<String, Metric> metrics;

        TestMetricRegistry(Map<String, Metric> metrics) {
            this.metrics = metrics;
        }

        public void register(Metric metric, String metricName, AbstractMetricGroup group) {
            this.metrics.put(metricName, metric);
        }
    }

    class TaskThread
    extends Thread {
        private final SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory;
        private volatile StreamTask<OUT, ?> task;
        private volatile Throwable error;

        TaskThread(SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory) {
            super("Task Thread");
            this.taskFactory = taskFactory;
        }

        @Override
        public void run() {
            try {
                this.task = (StreamTask)this.taskFactory.get();
                this.task.invoke();
                StreamTaskTestHarness.this.shutdownIOManager();
                StreamTaskTestHarness.this.shutdownMemoryManager();
            }
            catch (Throwable throwable) {
                this.error = throwable;
            }
            finally {
                try {
                    this.task.cleanUp(this.error);
                }
                catch (Exception cleanUpException) {
                    if (this.error == null) {
                        this.error = cleanUpException;
                    }
                    this.error.addSuppressed(cleanUpException);
                }
            }
        }

        public Throwable getError() {
            return this.error;
        }
    }
}

