/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedSourceWrapper<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
implements ProcessingTimeCallback,
StoppableFunction,
CheckpointListener,
CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
    private final String stepName;
    private final SerializablePipelineOptions serializedOptions;
    private final boolean isConvertedBoundedSource;
    private final KvCoder<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder;
    private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
    private final boolean shutdownOnFinalWatermark;
    private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
    private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
    private volatile boolean isRunning = true;
    private transient StreamingRuntimeContext runtimeContext;
    private transient SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> context;
    private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints;
    private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
    private transient ListState<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint;
    private transient boolean isRestored = false;

    public UnboundedSourceWrapper(String stepName, PipelineOptions pipelineOptions, UnboundedSource<OutputT, CheckpointMarkT> source, int parallelism) throws Exception {
        Coder checkpointMarkCoder;
        this.stepName = stepName;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.isConvertedBoundedSource = source instanceof UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
        if (source.requiresDeduping()) {
            LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
        }
        if ((checkpointMarkCoder = source.getCheckpointMarkCoder()) == null) {
            LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots.");
            this.checkpointCoder = null;
        } else {
            SerializableCoder sourceCoder = SerializableCoder.of((TypeDescriptor)new TypeDescriptor<UnboundedSource>(){});
            this.checkpointCoder = KvCoder.of((Coder)sourceCoder, (Coder)checkpointMarkCoder);
        }
        this.splitSources = source.split(parallelism, pipelineOptions);
        this.shutdownOnFinalWatermark = ((FlinkPipelineOptions)pipelineOptions.as(FlinkPipelineOptions.class)).isShutdownSourcesOnFinalWatermark();
    }

    public void open(Configuration parameters) throws Exception {
        this.runtimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
        int subtaskIndex = this.runtimeContext.getIndexOfThisSubtask();
        int numSubtasks = this.runtimeContext.getNumberOfParallelSubtasks();
        this.localSplitSources = new ArrayList<UnboundedSource<OutputT, CheckpointMarkT>>();
        this.localReaders = new ArrayList<UnboundedSource.UnboundedReader<OutputT>>();
        this.pendingCheckpoints = new LinkedHashMap();
        if (this.isRestored) {
            for (KV restored : (Iterable)this.stateForCheckpoint.get()) {
                this.localSplitSources.add((UnboundedSource)restored.getKey());
                this.localReaders.add(((UnboundedSource)restored.getKey()).createReader(this.serializedOptions.get(), (UnboundedSource.CheckpointMark)restored.getValue()));
            }
        } else {
            for (int i = 0; i < this.splitSources.size(); ++i) {
                if (i % numSubtasks != subtaskIndex) continue;
                UnboundedSource<OutputT, CheckpointMarkT> source = this.splitSources.get(i);
                UnboundedSource.UnboundedReader reader = source.createReader(this.serializedOptions.get(), null);
                this.localSplitSources.add(source);
                this.localReaders.add(reader);
            }
        }
        LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", new Object[]{subtaskIndex + 1, numSubtasks, this.localSplitSources});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) throws Exception {
        this.context = ctx;
        FlinkMetricContainer metricContainer = new FlinkMetricContainer(this.getRuntimeContext());
        ReaderInvocationUtil readerInvoker = new ReaderInvocationUtil(this.stepName, this.serializedOptions.get(), metricContainer);
        if (this.localReaders.isEmpty()) {
            LOG.info("Number of readers is 0 for this task executor, idle");
        } else if (this.isConvertedBoundedSource) {
            this.setNextWatermarkTimer(this.runtimeContext);
            for (int i = 0; i < this.localReaders.size() && this.isRunning; ++i) {
                boolean dataAvailable;
                UnboundedSource.UnboundedReader<OutputT> reader = this.localReaders.get(i);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    boolean dataAvailable2 = readerInvoker.invokeStart(reader);
                    if (dataAvailable2) {
                        this.emitElement(ctx, reader);
                    }
                }
                do {
                    Object dataAvailable2 = ctx.getCheckpointLock();
                    synchronized (dataAvailable2) {
                        dataAvailable = readerInvoker.invokeAdvance(reader);
                        if (dataAvailable) {
                            this.emitElement(ctx, reader);
                        }
                    }
                } while (dataAvailable && this.isRunning);
            }
        } else {
            boolean dataAvailable;
            Object object;
            int numReaders = this.localReaders.size();
            int currentReader = 0;
            for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                object = ctx.getCheckpointLock();
                synchronized (object) {
                    dataAvailable = readerInvoker.invokeStart(reader);
                    if (dataAvailable) {
                        this.emitElement(ctx, reader);
                    }
                }
            }
            this.setNextWatermarkTimer(this.runtimeContext);
            boolean hadData = false;
            while (this.isRunning) {
                UnboundedSource.UnboundedReader<OutputT> reader;
                reader = this.localReaders.get(currentReader);
                object = ctx.getCheckpointLock();
                synchronized (object) {
                    dataAvailable = readerInvoker.invokeAdvance(reader);
                    if (dataAvailable) {
                        this.emitElement(ctx, reader);
                        hadData = true;
                    }
                }
                currentReader = (currentReader + 1) % numReaders;
                if (currentReader == 0 && !hadData) {
                    Thread.sleep(50L);
                    continue;
                }
                if (currentReader != 0) continue;
                hadData = false;
            }
        }
        ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
        this.finalizeSource();
    }

    private void finalizeSource() {
        if (!this.shutdownOnFinalWatermark) {
            while (this.isRunning) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    if (this.isRunning) continue;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void emitElement(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx, UnboundedSource.UnboundedReader<OutputT> reader) {
        Object item = reader.getCurrent();
        byte[] recordId = reader.getCurrentRecordId();
        Instant timestamp = reader.getCurrentTimestamp();
        WindowedValue windowedValue = WindowedValue.of((Object)new ValueWithRecordId(item, recordId), (Instant)timestamp, (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
        ctx.collect((Object)windowedValue);
    }

    public void close() throws Exception {
        try {
            super.close();
            if (this.localReaders != null) {
                for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                    reader.close();
                }
            }
        }
        finally {
            FlinkClassloading.deleteStaticCaches();
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void stop() {
        this.isRunning = false;
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.isRunning) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            if (this.checkpointCoder == null) {
                return;
            }
            this.stateForCheckpoint.clear();
            long checkpointId = functionSnapshotContext.getCheckpointId();
            ArrayList<UnboundedSource.CheckpointMark> checkpointMarks = new ArrayList<UnboundedSource.CheckpointMark>(this.localSplitSources.size());
            for (int i = 0; i < this.localSplitSources.size(); ++i) {
                UnboundedSource<OutputT, CheckpointMarkT> source = this.localSplitSources.get(i);
                UnboundedSource.UnboundedReader<OutputT> reader = this.localReaders.get(i);
                UnboundedSource.CheckpointMark mark = reader.getCheckpointMark();
                checkpointMarks.add(mark);
                KV kv = KV.of(source, (Object)mark);
                this.stateForCheckpoint.add((Object)kv);
            }
            int diff = this.pendingCheckpoints.size() - 32;
            if (diff >= 0) {
                Iterator<Long> iterator = this.pendingCheckpoints.keySet().iterator();
                while (diff >= 0) {
                    iterator.next();
                    iterator.remove();
                    --diff;
                }
            }
            this.pendingCheckpoints.put(checkpointId, checkpointMarks);
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (this.checkpointCoder == null) {
            return;
        }
        OperatorStateStore stateStore = context.getOperatorStateStore();
        CoderTypeInformation typeInformation = new CoderTypeInformation(this.checkpointCoder);
        this.stateForCheckpoint = stateStore.getOperatorState(new ListStateDescriptor("_default_", typeInformation.createSerializer(new ExecutionConfig())));
        if (context.isRestored()) {
            this.isRestored = true;
            LOG.info("Restoring state in the UnboundedSourceWrapper.");
        } else {
            LOG.info("No restore state for UnboundedSourceWrapper.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onProcessingTime(long timestamp) {
        if (this.isRunning) {
            Object object = this.context.getCheckpointLock();
            synchronized (object) {
                long watermarkMillis = Long.MAX_VALUE;
                for (UnboundedSource.UnboundedReader<OutputT> reader : this.localReaders) {
                    Instant watermark = reader.getWatermark();
                    if (watermark == null) continue;
                    watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
                }
                this.context.emitWatermark(new Watermark(watermarkMillis));
                if (this.shutdownOnFinalWatermark && watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                    this.isRunning = false;
                }
            }
            this.setNextWatermarkTimer(this.runtimeContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
        if (this.isRunning) {
            long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
            Object object = this.context.getCheckpointLock();
            synchronized (object) {
                long currentProcessingTime = runtime.getProcessingTimeService().getCurrentProcessingTime();
                if (currentProcessingTime < Long.MAX_VALUE) {
                    long nextTriggerTime = currentProcessingTime + watermarkInterval;
                    if (nextTriggerTime < currentProcessingTime) {
                        nextTriggerTime = Long.MAX_VALUE;
                    }
                    runtime.getProcessingTimeService().registerTimer(nextTriggerTime, (ProcessingTimeCallback)this);
                }
            }
        }
    }

    @VisibleForTesting
    public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
        return this.splitSources;
    }

    @VisibleForTesting
    List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
        return this.localSplitSources;
    }

    @VisibleForTesting
    List<UnboundedSource.UnboundedReader<OutputT>> getLocalReaders() {
        return this.localReaders;
    }

    @VisibleForTesting
    boolean isRunning() {
        return this.isRunning;
    }

    @VisibleForTesting
    public void setSourceContext(SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) {
        this.context = ctx;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        List<CheckpointMarkT> checkpointMarks = this.pendingCheckpoints.get(checkpointId);
        if (checkpointMarks != null) {
            long currentId;
            Iterator<Long> iterator = this.pendingCheckpoints.keySet().iterator();
            do {
                currentId = iterator.next();
                iterator.remove();
            } while (currentId != checkpointId);
            for (UnboundedSource.CheckpointMark mark : checkpointMarks) {
                mark.finalizeCheckpoint();
            }
        }
    }
}

