/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.continuous;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.apache.spark.SparkEnv$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.CurrentDate;
import org.apache.spark.sql.catalyst.expressions.CurrentTimestamp;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation$;
import org.apache.spark.sql.execution.streaming.ACTIVE$;
import org.apache.spark.sql.execution.streaming.CommitMetadata;
import org.apache.spark.sql.execution.streaming.CommitMetadata$;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.IncrementalExecution;
import org.apache.spark.sql.execution.streaming.OffsetSeq;
import org.apache.spark.sql.execution.streaming.OffsetSeq$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import org.apache.spark.sql.execution.streaming.RECONFIGURING$;
import org.apache.spark.sql.execution.streaming.State;
import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.execution.streaming.StreamExecution$;
import org.apache.spark.sql.execution.streaming.TERMINATED$;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.IncrementAndGetEpoch$;
import org.apache.spark.sql.execution.streaming.continuous.StopContinuousExecutionWrites$;
import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSource;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Clock;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t-b\u0001\u0002\u0016,\u0001iB\u0011b\u0010\u0001\u0003\u0002\u0003\u0006I\u0001\u0011#\t\u0013\u0015\u0003!\u0011!Q\u0001\n\u0019\u001b\u0006\u0002\u0003+\u0001\u0005\u0003\u0005\u000b\u0011\u0002$\t\u0011U\u0003!\u0011!Q\u0001\nYC\u0011\u0002\u0019\u0001\u0003\u0002\u0003\u0006I!Y5\t\u0013)\u0004!\u0011!Q\u0001\n-\u0004\b\"C9\u0001\u0005\u0003\u0005\u000b\u0011\u0002:y\u0011%I\bA!A!\u0002\u0013QX\u0010\u0003\u0005\u007f\u0001\t\u0005\t\u0015!\u0003\u0000\u0011)\t)\u0001\u0001B\u0001B\u0003%\u0011q\u0001\u0005\b\u0003\u001f\u0001A\u0011AA\t\u0011%\tY\u0003\u0001a\u0001\n#\ti\u0003C\u0005\u0002P\u0001\u0001\r\u0011\"\u0005\u0002R!A\u0011Q\f\u0001!B\u0013\ty\u0003\u0003\u0007\u0002h\u0001\u0001\r\u00111A\u0005\u0002E\nI\u0007\u0003\u0007\u0002l\u0001\u0001\r\u00111A\u0005\u0002E\ni\u0007\u0003\u0006\u0002r\u0001\u0001\r\u0011!Q!\n\u0019C\u0011\"a\u001d\u0001\u0005\u0004%I!!\u001e\t\u0011\u0005M\u0005\u0001)A\u0005\u0003oB\u0011\"!&\u0001\u0005\u0004%\t%a&\t\u0011\u0005}\u0005\u0001)A\u0005\u00033C\u0011\"!)\u0001\u0005\u0004%I!a)\t\u0011\u0005-\u0006\u0001)A\u0005\u0003KCq!!,\u0001\t#\ny\u000bC\u0004\u00026\u0002!I!a.\t\u000f\u0005\r\u0007\u0001\"\u0003\u0002F\"9\u00111\u001a\u0001\u0005\u0002\u00055\u0007bBAu\u0001\u0011\u0005\u00111\u001e\u0005\t\u0003_\u0004A\u0011A\u0019\u0002r\"9\u0011Q\u001f\u0001\u0005\u0002\u0005]\bbBA{\u0001\u0011%\u0011Q \u0005\b\u0003\u007f\u0004A\u0011IA\u007f\u00115\u0011\t\u0001\u0001I\u0001\u0004\u0003\u0005I\u0011\u0002B\u0002{\"i!Q\u0001\u0001\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003\b\u0011;qA!\u0003,\u0011\u0003\u0011YA\u0002\u0004+W!\u0005!Q\u0002\u0005\b\u0003\u001f!C\u0011\u0001B\u000b\u0011%\u00119\u0002\nb\u0001\n\u0003\u0011I\u0002\u0003\u0005\u0003&\u0011\u0002\u000b\u0011\u0002B\u000e\u0011%\u00119\u0003\nb\u0001\n\u0003\u0011I\u0002\u0003\u0005\u0003*\u0011\u0002\u000b\u0011\u0002B\u000e\u0005M\u0019uN\u001c;j]V|Wo]#yK\u000e,H/[8o\u0015\taS&\u0001\u0006d_:$\u0018N\\;pkNT!AL\u0018\u0002\u0013M$(/Z1nS:<'B\u0001\u00192\u0003%)\u00070Z2vi&|gN\u0003\u00023g\u0005\u00191/\u001d7\u000b\u0005Q*\u0014!B:qCJ\\'B\u0001\u001c8\u0003\u0019\t\u0007/Y2iK*\t\u0001(A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001wA\u0011A(P\u0007\u0002[%\u0011a(\f\u0002\u0010'R\u0014X-Y7Fq\u0016\u001cW\u000f^5p]\u0006a1\u000f]1sWN+7o]5p]B\u0011\u0011IQ\u0007\u0002c%\u00111)\r\u0002\r'B\f'o[*fgNLwN\\\u0005\u0003\u007fu\nAA\\1nKB\u0011q\t\u0015\b\u0003\u0011:\u0003\"!\u0013'\u000e\u0003)S!aS\u001d\u0002\rq\u0012xn\u001c;?\u0015\u0005i\u0015!B:dC2\f\u0017BA(M\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011K\u0015\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005=c\u0015BA#>\u00039\u0019\u0007.Z2la>Lg\u000e\u001e*p_R\fA\"\u00198bYfTX\r\u001a)mC:\u0004\"a\u00160\u000e\u0003aS!!\u0017.\u0002\u000f1|w-[2bY*\u00111\fX\u0001\u0006a2\fgn\u001d\u0006\u0003;F\n\u0001bY1uC2L8\u000f^\u0005\u0003?b\u00131\u0002T8hS\u000e\fG\u000e\u00157b]\u0006!1/\u001b8l!\t\u0011w-D\u0001d\u0015\t!W-A\u0004dCR\fGn\\4\u000b\u0005\u0019\f\u0014!C2p]:,7\r^8s\u0013\tA7MA\u0007TkB\u0004xN\u001d;t/JLG/Z\u0005\u0003Av\nq\u0001\u001e:jO\u001e,'\u000f\u0005\u0002m]6\tQN\u0003\u0002/c%\u0011q.\u001c\u0002\b)JLwmZ3s\u0013\tQW(\u0001\u0007ue&<w-\u001a:DY>\u001c7\u000e\u0005\u0002tm6\tAO\u0003\u0002vg\u0005!Q\u000f^5m\u0013\t9HOA\u0003DY>\u001c7.\u0003\u0002r{\u0005Qq.\u001e;qkRlu\u000eZ3\u0011\u00051\\\u0018B\u0001?n\u0005)yU\u000f\u001e9vi6{G-Z\u0005\u0003sv\nA\"\u001a=ue\u0006|\u0005\u000f^5p]N\u0004RaRA\u0001\r\u001aK1!a\u0001S\u0005\ri\u0015\r]\u0001\u0017I\u0016dW\r^3DQ\u0016\u001c7\u000e]8j]R|en\u0015;paB!\u0011\u0011BA\u0006\u001b\u0005a\u0015bAA\u0007\u0019\n9!i\\8mK\u0006t\u0017A\u0002\u001fj]&$h\b\u0006\f\u0002\u0014\u0005]\u0011\u0011DA\u000e\u0003;\ty\"!\t\u0002$\u0005\u0015\u0012qEA\u0015!\r\t)\u0002A\u0007\u0002W!)qh\u0003a\u0001\u0001\")Qi\u0003a\u0001\r\")Ak\u0003a\u0001\r\")Qk\u0003a\u0001-\")\u0001m\u0003a\u0001C\")!n\u0003a\u0001W\")\u0011o\u0003a\u0001e\")\u0011p\u0003a\u0001u\")ap\u0003a\u0001\u007f\"9\u0011QA\u0006A\u0002\u0005\u001d\u0011aB:pkJ\u001cWm]\u000b\u0003\u0003_\u0001b!!\r\u0002<\u0005\u0005c\u0002BA\u001a\u0003oq1!SA\u001b\u0013\u0005i\u0015bAA\u001d\u0019\u00069\u0001/Y2lC\u001e,\u0017\u0002BA\u001f\u0003\u007f\u00111aU3r\u0015\r\tI\u0004\u0014\t\u0005\u0003\u0007\nY%\u0004\u0002\u0002F)\u0019a&a\u0012\u000b\u0007\u0005%S-\u0001\u0003sK\u0006$\u0017\u0002BA'\u0003\u000b\u0012\u0001cQ8oi&tWo\\;t'R\u0014X-Y7\u0002\u0017M|WO]2fg~#S-\u001d\u000b\u0005\u0003'\nI\u0006\u0005\u0003\u0002\n\u0005U\u0013bAA,\u0019\n!QK\\5u\u0011%\tY&DA\u0001\u0002\u0004\ty#A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u001d\u0005\u0005\u0004\u0003BA\u0005\u0003GJ1!!\u001aM\u0005!1x\u000e\\1uS2,\u0017!G2veJ,g\u000e^#q_\u000eD7i\\8sI&t\u0017\r^8s\u0013\u0012,\u0012AR\u0001\u001eGV\u0014(/\u001a8u\u000bB|7\r[\"p_J$\u0017N\\1u_JLEm\u0018\u0013fcR!\u00111KA8\u0011!\tY\u0006EA\u0001\u0002\u00041\u0015AG2veJ,g\u000e^#q_\u000eD7i\\8sI&t\u0017\r^8s\u0013\u0012\u0004\u0013a\u00024bS2,(/Z\u000b\u0003\u0003o\u0002b!!\u001f\u0002\n\u00065UBAA>\u0015\u0011\ti(a \u0002\r\u0005$x.\\5d\u0015\u0011\t\t)a!\u0002\u0015\r|gnY;se\u0016tGOC\u0002v\u0003\u000bS!!a\"\u0002\t)\fg/Y\u0005\u0005\u0003\u0017\u000bYHA\bBi>l\u0017n\u0019*fM\u0016\u0014XM\\2f!\u0011\t\t$a$\n\t\u0005E\u0015q\b\u0002\n)\"\u0014xn^1cY\u0016\f\u0001BZ1jYV\u0014X\rI\u0001\fY><\u0017nY1m!2\fg.\u0006\u0002\u0002\u001aB!\u0011QCAN\u0013\r\tij\u000b\u0002\u001c/JLG/\u001a+p\u0007>tG/\u001b8v_V\u001cH)\u0019;b'>,(oY3\u0002\u00191|w-[2bYBc\u0017M\u001c\u0011\u0002\u001fQ\u0014\u0018nZ4fe\u0016CXmY;u_J,\"!!*\u0011\u0007q\n9+C\u0002\u0002*6\u0012a\u0003\u0015:pG\u0016\u001c8/\u001b8h)&lW-\u0012=fGV$xN]\u0001\u0011iJLwmZ3s\u000bb,7-\u001e;pe\u0002\n!C];o\u0003\u000e$\u0018N^1uK\u0012\u001cFO]3b[R!\u00111KAY\u0011\u0019\t\u0019\f\u0007a\u0001\u0001\u0006)2\u000f]1sWN+7o]5p]\u001a{'o\u0015;sK\u0006l\u0017aD4fiN#\u0018M\u001d;PM\u001a\u001cX\r^:\u0015\t\u0005e\u0016q\u0018\t\u0004y\u0005m\u0016bAA_[\tIqJ\u001a4tKR\u001cV-\u001d\u0005\u0007\u0003\u0003L\u0002\u0019\u0001!\u00021M\u0004\u0018M]6TKN\u001c\u0018n\u001c8U_J+hNQ1uG\",7/A\u0007sk:\u001cuN\u001c;j]V|Wo\u001d\u000b\u0005\u0003'\n9\r\u0003\u0004\u0002Jj\u0001\r\u0001Q\u0001\u0015gB\f'o[*fgNLwN\u001c$peF+XM]=\u0002\u0013\u0005$Gm\u00144gg\u0016$H\u0003CA*\u0003\u001f\fI.!8\t\u000f\u0005E7\u00041\u0001\u0002T\u0006)Q\r]8dQB!\u0011\u0011BAk\u0013\r\t9\u000e\u0014\u0002\u0005\u0019>tw\rC\u0004\u0002\\n\u0001\r!!\u0011\u0002\rM$(/Z1n\u0011\u001d\tyn\u0007a\u0001\u0003C\f\u0001\u0003]1si&$\u0018n\u001c8PM\u001a\u001cX\r^:\u0011\r\u0005E\u00121HAr!\u0011\t\u0019%!:\n\t\u0005\u001d\u0018Q\t\u0002\u0010!\u0006\u0014H/\u001b;j_:|eMZ:fi\u000611m\\7nSR$B!a\u0015\u0002n\"9\u0011\u0011\u001b\u000fA\u0002\u0005M\u0017AC1xC&$X\t]8dQR!\u00111KAz\u0011\u001d\t\t.\ba\u0001\u0003'\fqb\u001d;pa&sg*Z<UQJ,\u0017\r\u001a\u000b\u0005\u0003'\nI\u0010C\u0004\u0002|z\u0001\r!!$\u0002\u000b\u0015\u0014(o\u001c:\u0015\u0005\u0005M\u0013\u0001B:u_B\f\u0001c];qKJ$s.\u001e;qkRlu\u000eZ3\u0016\u0003i\f!c];qKJ$3\u000f]1sWN+7o]5p]V\t\u0001)A\nD_:$\u0018N\\;pkN,\u00050Z2vi&|g\u000eE\u0002\u0002\u0016\u0011\u001a2\u0001\nB\b!\u0011\tIA!\u0005\n\u0007\tMAJ\u0001\u0004B]f\u0014VM\u001a\u000b\u0003\u0005\u0017\tqb\u0015+B%R{V\tU(D\u0011~[U)W\u000b\u0003\u00057\u0001BA!\b\u0003$5\u0011!q\u0004\u0006\u0005\u0005C\t))\u0001\u0003mC:<\u0017bA)\u0003 \u0005\u00012\u000bV!S)~+\u0005kT\"I?.+\u0015\fI\u0001\u0019\u000bB{5\tS0D\u001f>\u0013F)\u0013(B)>\u0013v,\u0013#`\u0017\u0016K\u0016!G#Q\u001f\u000eCulQ(P%\u0012Ke*\u0011+P%~KEiX&F3\u0002\u0002")
public class ContinuousExecution
extends StreamExecution {
    private volatile Seq<ContinuousStream> sources = (Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$);
    private String currentEpochCoordinatorId;
    private final AtomicReference<Throwable> failure = new AtomicReference<Object>(null);
    private final WriteToContinuousDataSource logicalPlan;
    private final ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;

    public static String EPOCH_COORDINATOR_ID_KEY() {
        return ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY();
    }

    public static String START_EPOCH_KEY() {
        return ContinuousExecution$.MODULE$.START_EPOCH_KEY();
    }

    private /* synthetic */ OutputMode super$outputMode() {
        return super.outputMode();
    }

    private /* synthetic */ SparkSession super$sparkSession() {
        return super.sparkSession();
    }

    public Seq<ContinuousStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<ContinuousStream> x$1) {
        this.sources = x$1;
    }

    public String currentEpochCoordinatorId() {
        return this.currentEpochCoordinatorId;
    }

    public void currentEpochCoordinatorId_$eq(String x$1) {
        this.currentEpochCoordinatorId = x$1;
    }

    private AtomicReference<Throwable> failure() {
        return this.failure;
    }

    @Override
    public WriteToContinuousDataSource logicalPlan() {
        return this.logicalPlan;
    }

    public ProcessingTimeExecutor org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor() {
        return this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor;
    }

    @Override
    public void runActivatedStream(SparkSession sparkSessionForStream) {
        UnaryOperator<State> stateUpdate = new UnaryOperator<State>(null){

            public State apply(State s) {
                State state = s;
                State state2 = RECONFIGURING$.MODULE$.equals(state) ? ACTIVE$.MODULE$ : s;
                return state2;
            }
        };
        while (true) {
            this.runContinuous(sparkSessionForStream);
            State state = this.state().updateAndGet(stateUpdate);
            ACTIVE$ aCTIVE$ = ACTIVE$.MODULE$;
            if (state == null) {
                if (aCTIVE$ == null) continue;
                break;
            }
            if (!state.equals(aCTIVE$)) break;
        }
        this.stopSources();
    }

    private OffsetSeq getStartOffsets(SparkSession sparkSessionToRunBatches) {
        OffsetSeq offsetSeq;
        Some some;
        Tuple2 tuple2;
        Option option = this.commitLog().getLatest();
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpochId = tuple2._1$mcJ$sp();
            this.updateStatusMessage(new StringBuilder(67).append("Starting new streaming query ").append("and getting offsets from latest epoch ").append(latestEpochId).toString());
            OffsetSeq nextOffsets = (OffsetSeq)this.offsetLog().get(latestEpochId).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
                throw new IllegalStateException(new StringBuilder(47).append("Batch ").append(latestEpochId).append(" was committed without end epoch offsets!").toString());
            });
            this.committedOffsets_$eq(nextOffsets.toStreamProgress(this.sources()));
            this.currentBatchId_$eq(latestEpochId + 1L);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Resuming at epoch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).toString());
            offsetSeq = nextOffsets;
        } else if (None$.MODULE$.equals(option)) {
            this.updateStatusMessage("Starting new streaming query");
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting new streaming query.");
            this.currentBatchId_$eq(0L);
            offsetSeq = OffsetSeq$.MODULE$.fill((Seq<Offset>)((Seq)this.sources().map((Function1 & Serializable & scala.Serializable)x$1 -> null, Seq$.MODULE$.canBuildFrom())));
        } else {
            throw new MatchError(option);
        }
        return offsetSeq;
    }

    private void runContinuous(SparkSession sparkSessionForQuery) {
        block8: {
            OffsetSeq offsets = this.getStartOffsets(sparkSessionForQuery);
            LogicalPlan withNewSources = (LogicalPlan)this.logicalPlan().transform((PartialFunction)new scala.Serializable(null, offsets){
                public static final long serialVersionUID = 0L;
                private final OffsetSeq offsets$1;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x1;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        Option loggedOffset = (Option)this.offsets$1.offsets().apply(0);
                        Option realOffset = loggedOffset.map((Function1 & Serializable & scala.Serializable)off -> streamingDataSourceV2Relation.stream().deserializeOffset(off.json()));
                        Offset startOffset = (Offset)realOffset.getOrElse((Function0 & Serializable & scala.Serializable)() -> streamingDataSourceV2Relation.stream().initialOffset());
                        Some x$1 = new Some((Object)startOffset);
                        Seq x$2 = streamingDataSourceV2Relation.copy$default$1();
                        Scan x$3 = streamingDataSourceV2Relation.copy$default$2();
                        SparkDataStream x$4 = streamingDataSourceV2Relation.copy$default$3();
                        Option x$5 = streamingDataSourceV2Relation.copy$default$5();
                        object = streamingDataSourceV2Relation.copy(x$2, x$3, x$4, (Option)x$1, x$5);
                    } else {
                        object = function1.apply(x1);
                    }
                    return (B1)object;
                }

                public final boolean isDefinedAt(LogicalPlan x1) {
                    LogicalPlan logicalPlan2 = x1;
                    boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                    return bl;
                }
                {
                    this.offsets$1 = offsets$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$3(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation org.apache.spark.sql.connector.read.streaming.Offset ), $anonfun$applyOrElse$4(org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation )}, serializedLambda);
                }
            });
            withNewSources.transformAllExpressions((PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends Expression, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                    A1 A1 = x2;
                    boolean bl = A1 instanceof CurrentTimestamp ? true : A1 instanceof CurrentDate;
                    if (bl) {
                        throw new IllegalStateException("CurrentTimestamp and CurrentDate not yet supported for continuous processing");
                    }
                    Object object = function1.apply(x2);
                    return (B1)object;
                }

                public final boolean isDefinedAt(Expression x2) {
                    Expression expression = x2;
                    boolean bl = expression instanceof CurrentTimestamp ? true : expression instanceof CurrentDate;
                    boolean bl2 = bl;
                    return bl2;
                }
            });
            this.reportTimeTaken("queryPlanning", (Function0 & Serializable & scala.Serializable)() -> {
                this.lastExecution_$eq(new IncrementalExecution(sparkSessionForQuery, withNewSources, this.super$outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
                return this.lastExecution().executedPlan();
            });
            ContinuousStream stream = (ContinuousStream)withNewSources.collect((PartialFunction)new scala.Serializable(null){
                public static final long serialVersionUID = 0L;

                public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x3, Function1<A1, B1> function1) {
                    Object object;
                    A1 A1 = x3;
                    if (A1 instanceof StreamingDataSourceV2Relation) {
                        StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                        object = (ContinuousStream)streamingDataSourceV2Relation.stream();
                    } else {
                        object = function1.apply(x3);
                    }
                    return (B1)object;
                }

                public final boolean isDefinedAt(LogicalPlan x3) {
                    LogicalPlan logicalPlan2 = x3;
                    boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                    return bl;
                }
            }).head();
            sparkSessionForQuery.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), ((Object)BoxesRunTime.boxToBoolean((boolean)true)).toString());
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.START_EPOCH_KEY(), ((Object)BoxesRunTime.boxToLong((long)this.currentBatchId())).toString());
            String epochCoordinatorId = new StringBuilder(2).append(this.runId()).append("--").append(UUID.randomUUID()).toString();
            this.currentEpochCoordinatorId_$eq(epochCoordinatorId);
            sparkSessionForQuery.sparkContext().setLocalProperty(ContinuousExecution$.MODULE$.EPOCH_COORDINATOR_ID_KEY(), epochCoordinatorId);
            RpcEndpointRef epochEndpoint = EpochCoordinatorRef$.MODULE$.create(this.logicalPlan().write(), stream, this, epochCoordinatorId, this.currentBatchId(), super.sparkSession(), SparkEnv$.MODULE$.get());
            Thread epochUpdateThread = new Thread(new Runnable(this, stream, epochEndpoint){
                private final /* synthetic */ ContinuousExecution $outer;
                private final ContinuousStream stream$1;
                private final RpcEndpointRef epochEndpoint$1;

                public void run() {
                    try {
                        this.$outer.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor().execute((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                            boolean bl;
                            $this.$outer.startTrigger();
                            if ($this.stream$1.needsReconfiguration() && $this.$outer.state().compareAndSet(ACTIVE$.MODULE$, RECONFIGURING$.MODULE$)) {
                                if ($this.$outer.queryExecutionThread().isAlive()) {
                                    $this.$outer.queryExecutionThread().interrupt();
                                }
                                bl = false;
                            } else if ($this.$outer.isActive()) {
                                $this.$outer.currentBatchId_$eq(BoxesRunTime.unboxToLong((Object)$this.epochEndpoint$1.askSync((Object)IncrementAndGetEpoch$.MODULE$, ClassTag$.MODULE$.Long())));
                                $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(23).append("New epoch ").append($this.$outer.currentBatchId()).append(" is starting.").toString());
                                bl = true;
                            } else {
                                bl = false;
                            }
                            return bl;
                        });
                    }
                    catch (InterruptedException interruptedException) {
                        return;
                    }
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.stream$1 = stream$1;
                    this.epochEndpoint$1 = epochEndpoint$1;
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 ), $anonfun$run$2(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anon$2 )}, serializedLambda);
                }
            }, new StringBuilder(24).append("epoch update thread for ").append(this.prettyIdString()).toString());
            try {
                try {
                    epochUpdateThread.setDaemon(true);
                    epochUpdateThread.start();
                    this.updateStatusMessage("Running");
                    this.reportTimeTaken("runContinuous", (Function0 & Serializable & scala.Serializable)() -> (RDD)SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0 & Serializable & scala.Serializable)() -> this.lastExecution().executedPlan().execute()));
                    Throwable f = this.failure().get();
                    if (f != null) {
                        throw f;
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2;
                    Throwable throwable3 = throwable;
                    if (throwable3 != null && StreamExecution$.MODULE$.isInterruptionException(throwable2 = throwable3, super.sparkSession().sparkContext())) {
                        State state = this.state().get();
                        RECONFIGURING$ rECONFIGURING$ = RECONFIGURING$.MODULE$;
                        if (!(state != null ? !state.equals(rECONFIGURING$) : rECONFIGURING$ != null)) {
                            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("Query ").append(this.id()).append(" ignoring exception from reconfiguring: ").append(throwable2).toString());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            break block8;
                        }
                    }
                    throw throwable;
                }
            }
            finally {
                this.queryExecutionThread().runUninterruptibly((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                    try {
                        epochEndpoint.askSync((Object)StopContinuousExecutionWrites$.MODULE$, ClassTag$.MODULE$.Unit());
                    }
                    finally {
                        SparkEnv$.MODULE$.get().rpcEnv().stop(epochEndpoint);
                        epochUpdateThread.interrupt();
                        epochUpdateThread.join();
                        this.super$sparkSession().sparkContext().cancelJobGroup(this.runId().toString());
                    }
                });
                Thread.interrupted();
            }
        }
    }

    public void addOffset(long epoch, ContinuousStream stream, Seq<PartitionOffset> partitionOffsets) {
        Option option;
        Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
        Offset globalOffset = stream.mergeOffsets((PartitionOffset[])partitionOffsets.toArray(ClassTag$.MODULE$.apply(PartitionOffset.class)));
        ContinuousExecution continuousExecution = this;
        synchronized (continuousExecution) {
            this.offsetLog().add(epoch, OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})));
            option = this.offsetLog().get(epoch - 1L);
        }
        Option oldOffset = option;
        if (oldOffset.contains((Object)OffsetSeq$.MODULE$.fill((Seq<Offset>)Predef$.MODULE$.wrapRefArray((Object[])new Offset[]{globalOffset})))) {
            this.noNewData_$eq(true);
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void commit(long epoch) {
        block8: {
            this.updateStatusMessage(new StringBuilder(17).append("Committing epoch ").append(epoch).toString());
            Predef$.MODULE$.assert(this.sources().length() == 1, (Function0 & Serializable & scala.Serializable)() -> "only one continuous source supported currently");
            Predef$.MODULE$.assert(this.offsetLog().get(epoch).isDefined(), (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("offset for epoch ").append(epoch).append(" not reported before commit").toString());
            ContinuousExecution continuousExecution = this;
            synchronized (continuousExecution) {
                block7: {
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets());
                    if (!this.queryExecutionThread().isAlive()) break block7;
                    this.commitLog().add(epoch, new CommitMetadata(CommitMetadata$.MODULE$.apply$default$1()));
                    Offset offset = ((SparkDataStream)this.sources().apply(0)).deserializeOffset(((Offset)((Option)((OffsetSeq)this.offsetLog().get(epoch).get()).offsets().apply(0)).get()).json());
                    this.committedOffsets_$eq(this.committedOffsets().$plus$plus((GenTraversableOnce<Tuple2<SparkDataStream, Offset>>)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.sources().apply(0)), (Object)offset)}))));
                    ((SparkDataStream)this.sources().apply(0)).commit(offset);
                    break block8;
                }
                return;
            }
        }
        if ((long)this.minLogEntriesToMaintain() <= epoch) {
            this.purge(epoch + 1L - (long)this.minLogEntriesToMaintain());
        }
        this.awaitProgressLock().lock();
        try {
            this.awaitProgressLockCondition().signalAll();
        }
        finally {
            this.awaitProgressLock().unlock();
        }
    }

    public void awaitEpoch(long epoch) {
        while (this.notDone$1(epoch)) {
            this.awaitProgressLock().lock();
            try {
                this.awaitProgressLockCondition().await(100L, TimeUnit.MILLISECONDS);
                if (this.streamDeathCause() == null) continue;
                throw this.streamDeathCause();
            }
            finally {
                this.awaitProgressLock().unlock();
            }
        }
    }

    public void stopInNewThread(Throwable error) {
        block0: {
            if (!this.failure().compareAndSet(null, error)) break block0;
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(26).append("Query ").append(this.prettyIdString()).append(" received exception ").append(error).toString());
            this.stopInNewThread();
        }
    }

    private void stopInNewThread() {
        new Thread(this){
            private final /* synthetic */ ContinuousExecution $outer;

            public void run() {
                try {
                    this.$outer.stop();
                }
                catch (Throwable e) {
                    this.$outer.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> e.getMessage(), e);
                    throw e;
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super("stop-continuous-execution");
                this.setDaemon(true);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$3(java.lang.Throwable )}, serializedLambda);
            }
        }.start();
    }

    @Override
    public void stop() {
        this.state().set(TERMINATED$.MODULE$);
        if (this.queryExecutionThread().isAlive()) {
            this.interruptAndAwaitExecutionThreadTermination();
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString());
    }

    private final boolean notDone$1(long epoch$2) {
        boolean bl;
        Some some;
        Tuple2 tuple2;
        Option latestCommit = this.commitLog().getLatest();
        Option option = latestCommit;
        if (option instanceof Some && (tuple2 = (Tuple2)(some = (Some)option).value()) != null) {
            long latestEpoch = tuple2._1$mcJ$sp();
            bl = latestEpoch < epoch$2;
        } else if (None$.MODULE$.equals(option)) {
            bl = true;
        } else {
            throw new MatchError(option);
        }
        return bl;
    }

    public ContinuousExecution(SparkSession sparkSession, String name, String checkpointRoot, LogicalPlan analyzedPlan, SupportsWrite sink, Trigger trigger, Clock triggerClock, OutputMode outputMode, Map<String, String> extraOptions, boolean deleteCheckpointOnStop) {
        super(sparkSession, name, checkpointRoot, analyzedPlan, (Table)sink, trigger, triggerClock, outputMode, deleteCheckpointOnStop);
        scala.collection.mutable.Map v2ToRelationMap = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        IntRef nextSourceId = IntRef.create((int)0);
        LogicalPlan _logicalPlan = (LogicalPlan)analyzedPlan.transform((PartialFunction)new scala.Serializable(this, v2ToRelationMap, nextSourceId){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ContinuousExecution $outer;
            private final scala.collection.mutable.Map v2ToRelationMap$1;
            private final IntRef nextSourceId$1;

            /*
             * Enabled aggressive block sorting
             */
            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 instanceof StreamingRelationV2) {
                    StreamingRelationV2 streamingRelationV2 = (StreamingRelationV2)A1;
                    Option ds = streamingRelationV2.source();
                    String sourceName = streamingRelationV2.sourceName();
                    Table table = streamingRelationV2.table();
                    CaseInsensitiveStringMap options = streamingRelationV2.extraOptions();
                    Seq output = streamingRelationV2.output();
                    if (table instanceof SupportsRead) {
                        String dsStr;
                        SupportsRead supportsRead = (SupportsRead)table;
                        String string = dsStr = ds.nonEmpty() ? new StringBuilder(2).append("[").append(ds.get()).append("]").toString() : "";
                        if (!DataSourceV2Implicits$.MODULE$.TableHelper((Table)supportsRead).supports(TableCapability.CONTINUOUS_READ)) {
                            throw new UnsupportedOperationException(new StringBuilder(52).append("Data source ").append(sourceName).append(" does not support continuous processing.").toString());
                        }
                        object = this.v2ToRelationMap$1.getOrElseUpdate((Object)streamingRelationV2, (Function0 & Serializable & scala.Serializable)() -> {
                            String metadataPath = new StringBuilder(9).append($this.$outer.resolvedCheckpointRoot()).append("/sources/").append($this.nextSourceId$1.elem).toString();
                            ++$this.nextSourceId$1.elem;
                            $this.$outer.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Reading table [").append(supportsRead).append("] from DataSourceV2 named '").append(sourceName).append("' ").append(dsStr).toString());
                            Scan scan = supportsRead.newScanBuilder(options).build();
                            ContinuousStream stream = scan.toContinuousStream(metadataPath);
                            return new StreamingDataSourceV2Relation(output, scan, (SparkDataStream)stream, StreamingDataSourceV2Relation$.MODULE$.apply$default$4(), StreamingDataSourceV2Relation$.MODULE$.apply$default$5());
                        });
                        return (B1)object;
                    }
                }
                object = function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x1) {
                StreamingRelationV2 streamingRelationV2;
                Table table;
                LogicalPlan logicalPlan2 = x1;
                boolean bl = logicalPlan2 instanceof StreamingRelationV2 && (table = (streamingRelationV2 = (StreamingRelationV2)logicalPlan2).table()) instanceof SupportsRead;
                return bl;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.v2ToRelationMap$1 = v2ToRelationMap$1;
                this.nextSourceId$1 = nextSourceId$1;
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$applyOrElse$1(org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution$$anonfun$1 org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String org.apache.spark.sql.util.CaseInsensitiveStringMap scala.collection.Seq ), $anonfun$applyOrElse$2(org.apache.spark.sql.connector.catalog.SupportsRead java.lang.String java.lang.String )}, serializedLambda);
            }
        });
        this.sources_$eq((Seq<ContinuousStream>)_logicalPlan.collect((PartialFunction)new scala.Serializable(null){
            public static final long serialVersionUID = 0L;

            public final <A1 extends LogicalPlan, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 instanceof StreamingDataSourceV2Relation) {
                    StreamingDataSourceV2Relation streamingDataSourceV2Relation = (StreamingDataSourceV2Relation)A1;
                    object = (ContinuousStream)streamingDataSourceV2Relation.stream();
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(LogicalPlan x2) {
                LogicalPlan logicalPlan2 = x2;
                boolean bl = logicalPlan2 instanceof StreamingDataSourceV2Relation;
                return bl;
            }
        }));
        this.uniqueSources_$eq((Map<SparkDataStream, ReadLimit>)((TraversableOnce)((TraversableLike)this.sources().distinct()).map((Function1 & Serializable & scala.Serializable)s -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(s), (Object)ReadLimit.allAvailable()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        this.logicalPlan = new WriteToContinuousDataSource(this.createStreamingWrite((SupportsWrite)super.sink(), extraOptions, _logicalPlan), _logicalPlan);
        Trigger trigger2 = super.trigger();
        if (!(trigger2 instanceof ContinuousTrigger)) {
            throw new IllegalStateException(new StringBuilder(29).append("Unsupported type of trigger: ").append(super.trigger()).toString());
        }
        ContinuousTrigger continuousTrigger = (ContinuousTrigger)trigger2;
        long t = continuousTrigger.intervalMs();
        ProcessingTimeExecutor processingTimeExecutor = new ProcessingTimeExecutor(new ProcessingTimeTrigger(t), super.triggerClock());
        this.org$apache$spark$sql$execution$streaming$continuous$ContinuousExecution$$triggerExecutor = processingTimeExecutor;
    }
}

