/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
public class OneInputStreamTask<IN, OUT>
extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    @Nullable
    private CheckpointBarrierHandler checkpointBarrierHandler;
    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

    public OneInputStreamTask(Environment env) throws Exception {
        super(env);
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment env, @Nullable TimerService timeProvider) throws Exception {
        super(env, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = this.getConfiguration();
        int numberOfInputs = configuration.getNumberOfNetworkInputs();
        if (numberOfInputs > 0) {
            CheckpointedInputGate inputGate = this.createCheckpointedInputGate();
            Counter numRecordsIn = this.setupNumRecordsInCounter(this.mainOperator);
            PushingAsyncDataInput.DataOutput<IN> output = this.createDataOutput(numRecordsIn);
            StreamTaskInput<IN> input = this.createTaskInput(inputGate);
            StreamConfig.InputConfig[] inputConfigs = configuration.getInputs(this.getUserCodeClassLoader());
            StreamConfig.InputConfig inputConfig = inputConfigs[0];
            if (StreamConfig.requiresSorting(inputConfig)) {
                Preconditions.checkState((!CheckpointingOptions.isCheckpointingEnabled((Configuration)this.getJobConfiguration()) ? 1 : 0) != 0, (Object)"Checkpointing is not allowed with sorted inputs.");
                input = this.wrapWithSorted(input);
            }
            this.getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);
            this.inputProcessor = new StreamOneInputProcessor<IN>(input, output, this.operatorChain);
        }
        ((OneInputStreamOperator)this.mainOperator).getMetricGroup().gauge("currentInputWatermark", (Gauge)this.inputWatermarkGauge);
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge::getValue);
    }

    @Override
    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
        return Optional.ofNullable(this.checkpointBarrierHandler);
    }

    private StreamTaskInput<IN> wrapWithSorted(StreamTaskInput<IN> input) {
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        return new SortingDataInput(input, this.configuration.getTypeSerializerIn(input.getInputIndex(), userCodeClassLoader), this.configuration.getStateKeySerializer(userCodeClassLoader), this.configuration.getStatePartitioner(input.getInputIndex(), userCodeClassLoader), this.getEnvironment().getMemoryManager(), this.getEnvironment().getIOManager(), this.getExecutionConfig().isObjectReuseEnabled(), this.configuration.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, this.getJobConfiguration(), this.getEnvironment().getTaskConfiguration(), userCodeClassLoader), this.getEnvironment().getTaskManagerInfo().getConfiguration(), this, this.getExecutionConfig());
    }

    private CheckpointedInputGate createCheckpointedInputGate() {
        IndexedInputGate[] inputGates = this.getEnvironment().getAllInputGates();
        this.checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(this, this.getJobConfiguration(), this.configuration, this.getCheckpointCoordinator(), this.getTaskNameWithSubtaskAndId(), new List[]{Arrays.asList(inputGates)}, Collections.emptyList(), this.mainMailboxExecutor, this.systemTimerService);
        CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedMultipleInputGate(this.mainMailboxExecutor, new List[]{Arrays.asList(inputGates)}, this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.checkpointBarrierHandler, this.configuration);
        return (CheckpointedInputGate)Iterables.getOnlyElement(Arrays.asList(checkpointedInputGates));
    }

    private PushingAsyncDataInput.DataOutput<IN> createDataOutput(Counter numRecordsIn) {
        return new StreamTaskNetworkOutput(this.operatorChain.getFinishedOnRestoreInputOrDefault((Input)((Object)this.mainOperator)), this.inputWatermarkGauge, numRecordsIn);
    }

    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate) {
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(inputGate);
        TypeSerializer inSerializer = this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());
        Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationSet = this.configuration.getWatermarkDeclarations(this.getUserCodeClassLoader());
        return StreamTaskNetworkInputFactory.create(inputGate, inSerializer, this.getEnvironment().getIOManager(), statusWatermarkValve, 0, this.getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), gateIndex -> this.configuration.getInPhysicalEdges(this.getUserCodeClassLoader()).get((int)gateIndex).getPartitioner(), this.getEnvironment().getTaskInfo(), this.getCanEmitBatchOfRecords(), watermarkDeclarationSet);
    }

    private static class StreamTaskNetworkOutput<IN>
    implements PushingAsyncDataInput.DataOutput<IN> {
        private final Input<IN> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;
        private final ThrowingConsumer<StreamRecord<IN>, Exception> recordProcessor;

        private StreamTaskNetworkOutput(Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {
            this.operator = (Input)Preconditions.checkNotNull(operator);
            this.watermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)watermarkGauge);
            this.numRecordsIn = (Counter)Preconditions.checkNotNull((Object)numRecordsIn);
            this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);
        }

        @Override
        public void emitRecord(StreamRecord<IN> record) throws Exception {
            this.numRecordsIn.inc();
            this.recordProcessor.accept(record);
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark(watermark);
        }

        @Override
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.operator.processWatermarkStatus(watermarkStatus);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }

        @Override
        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            this.operator.processRecordAttributes(recordAttributes);
        }

        @Override
        public void emitWatermark(WatermarkEvent watermark) throws Exception {
            this.operator.processWatermark(watermark);
        }
    }
}

