/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.functions;

import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class ImpulseSourceFunction
implements SourceFunction<WindowedValue<byte[]>>,
CheckpointedFunction {
    private final boolean keepSourceAlive;
    private volatile boolean running;
    private transient ListState<Boolean> impulseEmitted;

    public ImpulseSourceFunction(boolean keepSourceAlive) {
        this.keepSourceAlive = keepSourceAlive;
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<WindowedValue<byte[]>> sourceContext) throws Exception {
        if (Iterables.isEmpty((Iterable)((Iterable)this.impulseEmitted.get()))) {
            Object object = sourceContext.getCheckpointLock();
            synchronized (object) {
                sourceContext.collect((Object)WindowedValue.valueInGlobalWindow((Object)new byte[0]));
                this.impulseEmitted.add((Object)true);
            }
        }
        if (this.keepSourceAlive) {
            Object waitLock = new Object();
            while (this.running) {
                try {
                    Object object = waitLock;
                    synchronized (object) {
                        waitLock.wait(1000L);
                    }
                }
                catch (InterruptedException e) {
                    if (this.running) continue;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void snapshotState(FunctionSnapshotContext context) {
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.impulseEmitted = context.getOperatorStateStore().getListState(new ListStateDescriptor("impulse-emitted", (TypeSerializer)BooleanSerializer.INSTANCE));
    }
}

