/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.utils.Preconditions;

public class FiniteTestSource<T>
implements SourceFunction<T>,
CheckpointedFunction,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private final List<T> elements;
    private final boolean emitOnce;
    private volatile boolean running = true;
    private transient int numCheckpointsComplete;
    private transient ListState<Integer> checkpointedState;
    private volatile int numTimesEmitted;

    public FiniteTestSource(List<T> elements, boolean emitOnce) {
        this.elements = elements;
        this.emitOnce = emitOnce;
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor("emit-times", (TypeSerializer)IntSerializer.INSTANCE));
        if (context.isRestored()) {
            ArrayList<Integer> retrievedStates = new ArrayList<Integer>();
            for (Integer entry : (Iterable)this.checkpointedState.get()) {
                retrievedStates.add(entry);
            }
            Preconditions.checkArgument((retrievedStates.size() == 1 ? 1 : 0) != 0, (Object)(this.getClass().getSimpleName() + " retrieved invalid state."));
            this.numTimesEmitted = (Integer)retrievedStates.get(0);
            Preconditions.checkArgument((this.numTimesEmitted <= 2 ? 1 : 0) != 0, (Object)(this.getClass().getSimpleName() + " retrieved invalid numTimesEmitted: " + this.numTimesEmitted));
        } else {
            this.numTimesEmitted = 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
        switch (this.numTimesEmitted) {
            case 0: {
                this.emitElementsAndWaitForCheckpoints(ctx, false);
                this.emitElementsAndWaitForCheckpoints(ctx, true);
                break;
            }
            case 1: {
                this.emitElementsAndWaitForCheckpoints(ctx, true);
                break;
            }
            case 2: {
                Object lock;
                Object object = lock = ctx.getCheckpointLock();
                synchronized (object) {
                    int checkpointToAwait = this.numCheckpointsComplete + 2;
                    while (this.running && this.numCheckpointsComplete < checkpointToAwait) {
                        lock.wait(1L);
                    }
                    break;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> ctx, boolean isSecond) throws InterruptedException {
        int checkpointToAwait;
        Object lock;
        Object object = lock = ctx.getCheckpointLock();
        synchronized (object) {
            checkpointToAwait = this.numCheckpointsComplete + 2;
            if (!isSecond || !this.emitOnce) {
                for (T t : this.elements) {
                    ctx.collect(t);
                }
            }
            ++this.numTimesEmitted;
        }
        object = lock;
        synchronized (object) {
            while (this.running && this.numCheckpointsComplete < checkpointToAwait) {
                lock.wait(1L);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        ++this.numCheckpointsComplete;
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.checkpointedState != null ? 1 : 0) != 0, (Object)("The " + this.getClass().getSimpleName() + " has not been properly initialized."));
        this.checkpointedState.clear();
        this.checkpointedState.add((Object)this.numTimesEmitted);
    }
}

