/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.query.monitor;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kylin.common.KylinConfig;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparderContextCanary {
    private static final Logger logger = LoggerFactory.getLogger(SparderContextCanary.class);
    private static volatile boolean isStarted = false;
    private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder();
    private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes();
    private static volatile int errorAccumulated = 0;
    private static volatile long lastResponseTime = -1L;
    private static volatile boolean sparderRestarting = false;

    private SparderContextCanary() {
    }

    public static int getErrorAccumulated() {
        return errorAccumulated;
    }

    public long getLastResponseTime() {
        return lastResponseTime;
    }

    public boolean isSparderRestarting() {
        return sparderRestarting;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public static void init() {
        if (isStarted) return;
        Class<SparderContextCanary> clazz = SparderContextCanary.class;
        synchronized (SparderContextCanary.class) {
            if (isStarted) return;
            isStarted = true;
            logger.info("Start monitoring Sparder");
            Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor, PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
            // ** MonitorExit[var0] (shouldn't be in output)
            return;
        }
    }

    public static boolean isError() {
        return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
    }

    public static void monitor() {
        block10: {
            try {
                long startTime = System.currentTimeMillis();
                if (!SparderContext.isSparkAvailable()) {
                    logger.info("Sparder is unavailable, need to restart immediately.");
                    errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
                } else {
                    try {
                        JavaSparkContext jsc = JavaSparkContext.fromSparkContext((SparkContext)SparderContext.getOriginalSparkSession().sparkContext());
                        jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
                        long t = System.currentTimeMillis();
                        long ret = (Long)SparderContextCanary.numberCount(jsc).get((long)KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), TimeUnit.MILLISECONDS);
                        logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", (Object)ret, (Object)(System.currentTimeMillis() - t));
                        errorAccumulated = 0;
                    }
                    catch (TimeoutException te) {
                        logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.", (Object)KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), (Object)(++errorAccumulated));
                    }
                    catch (ExecutionException ee) {
                        logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", (Throwable)ee);
                        errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
                    }
                    catch (Exception e) {
                        ++errorAccumulated;
                        logger.error("SparderContextCanary numberCount occurs exception.", (Throwable)e);
                    }
                }
                lastResponseTime = System.currentTimeMillis() - startTime;
                logger.debug("Sparder context errorAccumulated:{}", (Object)errorAccumulated);
                if (!SparderContextCanary.isError()) break block10;
                sparderRestarting = true;
                try {
                    logger.warn("Repairing sparder context");
                    SparderContext.restartSpark();
                }
                catch (Throwable th) {
                    logger.error("Restart sparder context failed.", th);
                }
                sparderRestarting = false;
            }
            catch (Throwable th) {
                logger.error("Error when monitoring Sparder.", th);
            }
        }
    }

    private static JavaFutureAction<Long> numberCount(JavaSparkContext jsc) {
        ArrayList<Integer> list = new ArrayList<Integer>();
        for (int i = 0; i < 10; ++i) {
            list.add(i);
        }
        return jsc.parallelize(list, 1).countAsync();
    }
}

