/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FlinkException;

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
    private static final Runnable SOURCE_POISON_LETTER = () -> {};
    private final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
    private volatile boolean externallyInducedCheckpoints;
    private volatile boolean isFinished = false;

    public SourceStreamTask(Environment env) {
        super(env);
    }

    @Override
    protected void init() {
        SourceFunction source = (SourceFunction)((StreamSource)this.headOperator).getUserFunction();
        if (source instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger(){

                @Override
                public void triggerCheckpoint(long checkpointId) throws FlinkException {
                    CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
                    long timestamp = System.currentTimeMillis();
                    CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
                    try {
                        SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false);
                    }
                    catch (RuntimeException | FlinkException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new FlinkException(e.getMessage(), (Throwable)e);
                    }
                }
            };
            ((ExternallyInducedSource)source).setCheckpointTrigger(triggerHook);
        }
    }

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        ((StreamSource)this.headOperator).advanceToEndOfEventTime();
    }

    @Override
    protected void cleanup() {
    }

    @Override
    protected void processInput(StreamTask.ActionContext context) throws Exception {
        this.sourceThread.setTaskDescription(this.getName());
        this.sourceThread.start();
        try {
            this.runAlternativeMailboxLoop();
        }
        catch (Exception mailboxEx) {
            if (!this.isCanceled()) {
                this.cancelTask();
            }
            throw mailboxEx;
        }
        this.sourceThread.join();
        if (!this.isFinished) {
            this.sourceThread.checkThrowSourceExecutionException();
        }
        context.allActionsCompleted();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runAlternativeMailboxLoop() throws InterruptedException {
        Runnable letter;
        while ((letter = this.mailbox.takeMail()) != SOURCE_POISON_LETTER) {
            Object object = this.getCheckpointLock();
            synchronized (object) {
                letter.run();
            }
        }
    }

    @Override
    protected void cancelTask() {
        if (this.headOperator != null) {
            ((StreamSource)this.headOperator).cancel();
        }
    }

    @Override
    protected void finishTask() throws Exception {
        this.isFinished = true;
        this.cancelTask();
    }

    public Optional<Thread> getExecutingThread() {
        return Optional.of(this.sourceThread);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
        if (!this.externallyInducedCheckpoints) {
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
        }
        Object object = this.getCheckpointLock();
        synchronized (object) {
            return this.isRunning();
        }
    }

    private class LegacySourceFunctionThread
    extends Thread {
        private Throwable sourceExecutionThrowable = null;

        LegacySourceFunctionThread() {
        }

        @Override
        public void run() {
            try {
                ((StreamSource)SourceStreamTask.this.headOperator).run(SourceStreamTask.this.getCheckpointLock(), SourceStreamTask.this.getStreamStatusMaintainer(), SourceStreamTask.this.operatorChain);
            }
            catch (Throwable t) {
                this.sourceExecutionThrowable = t;
            }
            finally {
                SourceStreamTask.this.mailbox.clearAndPut(SOURCE_POISON_LETTER);
            }
        }

        public void setTaskDescription(String taskDescription) {
            this.setName("Legacy Source Thread - " + taskDescription);
        }

        void checkThrowSourceExecutionException() throws Exception {
            if (this.sourceExecutionThrowable != null) {
                throw new Exception(this.sourceExecutionThrowable);
            }
        }
    }
}

