/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.testing.spark.internal;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import lombok.Generated;
import org.apache.xbean.finder.UrlSet;
import org.apache.ziplock.ClassLoaders;
import org.apache.ziplock.JarLocation;
import org.jboss.shrinkwrap.resolver.api.ResolutionStrategy;
import org.jboss.shrinkwrap.resolver.api.maven.ConfigurableMavenResolverSystem;
import org.jboss.shrinkwrap.resolver.api.maven.Maven;
import org.jboss.shrinkwrap.resolver.api.maven.MavenFormatStage;
import org.jboss.shrinkwrap.resolver.api.maven.MavenStrategyStage;
import org.jboss.shrinkwrap.resolver.api.maven.ScopeType;
import org.jboss.shrinkwrap.resolver.api.maven.strategy.AcceptScopesStrategy;
import org.jboss.shrinkwrap.resolver.api.maven.strategy.MavenResolutionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.runtime.testing.spark.SparkClusterRule;
import org.talend.sdk.component.runtime.testing.spark.internal.SparkVersions;

public abstract class BaseSpark<T extends BaseSpark<?>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SparkClusterRule.class);
    private final ThreadLocal<ClusterConfig> config = new ThreadLocal();
    private int slaves = 1;
    private String scalaVersion = SparkVersions.SPARK_SCALA_VERSION.getValue();
    private String sparkVersion = SparkVersions.SPARK_VERSION.getValue();
    private String hadoopBase = "https://github.com/steveloughran/winutils/blob/master";
    private String hadoopVersion = "2.6.4";
    private boolean installWinUtils = true;
    protected static final String EXTRA_JVM_ARGS = "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED";

    protected abstract void fail(String var1);

    protected abstract void assertTrue(String var1, boolean var2);

    protected abstract File getRoot();

    public T withSlaves(int slaves) {
        this.slaves = slaves;
        return (T)this;
    }

    public T withScalaVersion(String scalaVersion) {
        this.scalaVersion = scalaVersion;
        return (T)this;
    }

    public T withSparkVersion(String sparkVersion) {
        this.sparkVersion = sparkVersion;
        return (T)this;
    }

    public T withInstallWinUtils(boolean installWinUtils) {
        this.installWinUtils = installWinUtils;
        return (T)this;
    }

    public T withHadoopVersion(String version) {
        this.hadoopVersion = version;
        return (T)this;
    }

    public T withHadoopBase(String base) {
        this.hadoopBase = base;
        return (T)this;
    }

    public T skipWinUtils() {
        this.installWinUtils = false;
        return (T)this;
    }

    protected Instances start() {
        String masterHost;
        Version version = Version.find(this.sparkVersion);
        File sparkHome = this.buildSparkHome(version);
        LOGGER.info("Copied spark libraries in " + sparkHome);
        try {
            masterHost = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            masterHost = "localhost";
        }
        int masterPort = this.newPort();
        int webMasterPort = this.newPort();
        ArrayList<Runnable> closingTasks = new ArrayList<Runnable>();
        ClusterConfig localConfig = new ClusterConfig(masterHost, masterPort, webMasterPort, sparkHome, closingTasks, version);
        this.config.set(localConfig);
        closingTasks.add(this.config::remove);
        String host = masterHost;
        Throwable exception = null;
        try {
            SparkProcessMonitor master = new SparkProcessMonitor(localConfig, "spark-master-monitor", () -> BaseSpark.isOpen(host, masterPort), "org.apache.spark.deploy.master.Master", "--host", masterHost, "--port", Integer.toString(masterPort), "--webui-port", Integer.toString(webMasterPort));
            Thread masterHook = new Thread(master::close);
            Runtime.getRuntime().addShutdownHook(masterHook);
            closingTasks.add(() -> Runtime.getRuntime().removeShutdownHook(masterHook));
            closingTasks.add(master::close);
            master.start();
            this.assertTrue("master didn't start", master.isStarted());
            LOGGER.info("Started Master on " + this.getSparkMaster());
            int firstSlavePort = this.newPort();
            List<SparkProcessMonitor> slaves = IntStream.range(0, this.slaves).mapToObj(i -> {
                int slavePort = firstSlavePort + 1 + 2 * i;
                return new SparkProcessMonitor(localConfig, "spark-slave-" + i + "-monitor", () -> BaseSpark.isOpen(host, slavePort), "org.apache.spark.deploy.worker.Worker", "--host", host, "--port", Integer.toString(slavePort), "--webui-port", Integer.toString(slavePort + 1), this.getSparkMaster());
            }).collect(Collectors.toList());
            slaves.stream().peek(s -> closingTasks.add(s::close)).map(m -> new Thread(m::close)).forEach(t -> {
                Runtime.getRuntime().addShutdownHook((Thread)t);
                closingTasks.add(() -> Runtime.getRuntime().removeShutdownHook((Thread)t));
            });
            slaves.forEach(Thread::start);
            if (slaves.stream().anyMatch(m -> !m.isStarted())) {
                this.fail("Some slave(s) didn't start");
            }
        }
        catch (Throwable error) {
            exception = error;
        }
        return new Instances(() -> closingTasks.forEach(r -> {
            try {
                r.run();
            }
            catch (RuntimeException re) {
                LOGGER.warn(re.getMessage(), (Throwable)re);
            }
        }), false, exception);
    }

    private File buildSparkHome(Version version) {
        File sparkHome = new File(this.getRoot(), "spark/");
        Stream.of(version.libFolder(), "conf").map(n -> new File(sparkHome, (String)n)).forEach(File::mkdirs);
        File libFolder = new File(sparkHome, version.libFolder());
        ConfigurableMavenResolverSystem resolver = Maven.configureResolver();
        AcceptScopesStrategy resolutionStrategy = new AcceptScopesStrategy(new ScopeType[]{ScopeType.COMPILE, ScopeType.RUNTIME});
        Stream.of("org.apache.spark:spark-core_" + this.scalaVersion + ":" + this.sparkVersion, "org.apache.spark:spark-streaming_" + this.scalaVersion + ":" + this.sparkVersion).peek(dep -> LOGGER.info("Resolving " + dep + "...")).flatMap(arg_0 -> BaseSpark.lambda$buildSparkHome$13(resolver, (MavenResolutionStrategy)resolutionStrategy, arg_0)).distinct().forEach(dep -> {
            try {
                LOGGER.debug("Copying " + dep.getName() + " dependency");
                Files.copy(dep.toPath(), new File(libFolder, dep.getName()).toPath(), StandardCopyOption.REPLACE_EXISTING);
            }
            catch (IOException e) {
                this.fail(e.getMessage());
            }
        });
        if (version == Version.SPARK_1) {
            try {
                Files.write(new File(sparkHome, "RELEASE").toPath(), "fake release file cause it is tested in 1.6.3".getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE_NEW);
            }
            catch (IOException e) {
                this.fail(e.getMessage());
            }
            try (JarOutputStream file = new JarOutputStream(new FileOutputStream(new File(sparkHome, version.libFolder() + "/spark-assembly-" + this.sparkVersion + "-hadoop2.6.0.jar")));){
                file.putNextEntry(new ZipEntry("META-INF/marker"));
                file.write("just to let spark find the jar".getBytes(StandardCharsets.UTF_8));
            }
            catch (IOException e) {
                this.fail(e.getMessage());
            }
        }
        if (BaseSpark.isWin() && this.installWinUtils) {
            LOGGER.info("Downloading Hadoop winutils");
            String dll = this.hadoopBase + "/hadoop-" + this.hadoopVersion + "/bin/hadoop.dll";
            String exe = this.hadoopBase + "/hadoop-" + this.hadoopVersion + "/bin/winutils.exe";
            new File(sparkHome, "bin").mkdirs();
            Stream.of(dll, exe).forEach(from -> {
                File target = new File(sparkHome, "bin/" + from.substring(from.lastIndexOf(47) + 1));
                try {
                    URL url = new URL((String)from);
                    try (InputStream stream = url.openStream();
                         FileOutputStream out = new FileOutputStream(target);){
                        int read;
                        byte[] buffer = new byte[8192];
                        while ((read = stream.read(buffer)) >= 0) {
                            ((OutputStream)out).write(read);
                        }
                    }
                    catch (IOException e) {
                        throw new IllegalStateException(e);
                    }
                }
                catch (MalformedURLException e) {
                    throw new IllegalArgumentException(e);
                }
            });
        }
        return sparkHome;
    }

    public void submitClasspath(Class<?> main, Predicate<File> classpathFilter, String ... args) {
        Set files;
        try {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            files = new UrlSet((Collection)ClassLoaders.findUrls((ClassLoader)contextClassLoader)).excludeJvm().getUrls().stream().map(ClassLoaders::toFile).collect(Collectors.toSet());
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
        String classpath = files.stream().filter(classpathFilter).map(file -> {
            if (file.isDirectory()) {
                return this.config.get().jarCache.computeIfAbsent((File)file, dir -> {
                    File cache = new File(this.getRoot(), file.getName() + "_generated_" + System.nanoTime() + ".jar");
                    try (JarOutputStream out = new JarOutputStream(new FileOutputStream(cache));){
                        this.zip((File)file, out, "");
                    }
                    catch (IOException e) {
                        this.fail(e.getMessage());
                    }
                    return cache;
                }).getAbsolutePath();
            }
            return file.getAbsolutePath();
        }).collect(Collectors.joining(File.pathSeparator));
        this.submit(main, (String[])Stream.concat(args == null ? Stream.empty() : Stream.of(args), Stream.of("--jars", classpath)).toArray(String[]::new));
    }

    public void submitClasspath(Class<?> main, String ... args) {
        this.submitClasspath(main, (File file) -> true, args);
    }

    public void submit(Class<?> main, String ... args) {
        String bundle = Optional.of(main).map(c -> c.getName().replace('.', '/') + ".class").flatMap(resource -> this.config.get().jarCache.entrySet().stream().filter(jar -> new File((File)jar.getKey(), (String)resource).exists()).findAny().map(Map.Entry::getValue)).orElseGet(() -> (File)Optional.of(JarLocation.jarLocation((Class)main)).flatMap(f -> f.getName().endsWith(".jar") ? Optional.of(f) : Optional.ofNullable(f.getParentFile().listFiles()).map(Stream::of).orElseGet(Stream::empty).filter(file -> file.getName().endsWith(".jar") && !file.getName().startsWith("original-")).findFirst()).orElseThrow(() -> new IllegalStateException("No bundle jar found from " + main + ", run tests after packaging (IT with failsafe for instance)"))).getAbsolutePath();
        String[] submitArgs = (String[])Stream.concat(Stream.concat(Stream.concat(Stream.of("org.apache.spark.deploy.SparkSubmit", "--verbose"), new HashMap<String, String>(){
            {
                this.put("--executor-memory", "512m");
                this.put("--driver-memory", "512m");
                this.put("--total-executor-cores", "1");
                this.put("--deploy-mode", "cluster");
            }
        }.entrySet().stream().filter(e -> Stream.of(args).noneMatch(p -> p.equals(e.getKey()))).flatMap(e -> Stream.of((String)e.getKey(), (String)e.getValue()))), Stream.of("--master", this.getSparkMaster(), "--class", main.getName(), bundle)), args == null ? Stream.empty() : Stream.of(args)).toArray(String[]::new);
        LOGGER.info("Submitting: " + Arrays.asList(submitArgs));
        SparkProcessMonitor monitor = new SparkProcessMonitor(this.config.get(), "spark-submit-" + main.getSimpleName() + "-monitor", () -> true, submitArgs);
        Thread hook = new Thread(monitor::close);
        Runnable shutdownHookCleanup = () -> Runtime.getRuntime().removeShutdownHook(hook);
        this.config.get().cleanupTasks.add(shutdownHookCleanup);
        monitor.start();
        this.assertTrue("monitor is not started", monitor.isStarted());
        int retries = 500;
        while (monitor.process != null && retries-- > 0) {
            try {
                LOGGER.info("Submit result: " + monitor.process.exitValue());
                monitor.close();
                this.config.get().cleanupTasks.remove(shutdownHookCleanup);
                Runtime.getRuntime().removeShutdownHook(hook);
            }
            catch (IllegalThreadStateException itse) {
                try {
                    Thread.sleep(750L);
                }
                catch (InterruptedException e2) {
                    this.fail(e2.getMessage());
                    break;
                }
            }
        }
    }

    private void zip(File root, JarOutputStream out, String prefix) {
        Path rootPath = root.toPath();
        Optional.ofNullable(new File(root, prefix).listFiles()).map(Stream::of).orElseGet(Stream::empty).filter(f -> !f.getName().startsWith(".")).map(File::getAbsoluteFile).forEach(f -> {
            Path asPath = f.toPath();
            String name = rootPath.relativize(asPath).toString().replace(File.separatorChar, '/');
            try {
                if (f.isDirectory()) {
                    out.putNextEntry(new JarEntry(name + "/"));
                    this.zip(root, out, prefix + "/" + f.getName());
                } else {
                    out.putNextEntry(new JarEntry(name));
                    Files.copy(asPath, out);
                }
            }
            catch (IOException e) {
                this.fail(e.getMessage());
            }
        });
    }

    public String getSparkMasterHttp(String ... path) {
        return "http://" + this.getMasterHost() + ":" + this.getWebMasterPort() + (path == null || path.length == 0 ? "" : Stream.of(path).collect(Collectors.joining("/")));
    }

    public String getSparkMaster() {
        return "spark://" + this.getMasterHost() + ":" + this.getMasterPort();
    }

    public int getWebMasterPort() {
        return this.config.get().masterWebPort;
    }

    public String getMasterHost() {
        return this.config.get().masterHost;
    }

    public int getMasterPort() {
        return this.config.get().masterPort;
    }

    private static boolean isWin() {
        return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
    }

    private static boolean isOpen(String host, int port) {
        boolean bl;
        Socket client = new Socket(host, port);
        try {
            client.getInputStream().close();
            bl = true;
        }
        catch (Throwable throwable) {
            try {
                try {
                    client.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException ioe) {
                return false;
            }
        }
        client.close();
        return bl;
    }

    private int newPort() {
        int n;
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            n = serverSocket.getLocalPort();
        }
        catch (Throwable throwable) {
            try {
                try {
                    serverSocket.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                this.fail(e.getMessage());
                return -1;
            }
        }
        serverSocket.close();
        return n;
    }

    private static /* synthetic */ Stream lambda$buildSparkHome$13(ConfigurableMavenResolverSystem resolver, MavenResolutionStrategy resolutionStrategy, String dep) {
        return Stream.of(((MavenFormatStage)((MavenStrategyStage)resolver.resolve(dep)).using((ResolutionStrategy)resolutionStrategy)).asFile());
    }

    /*
     * Uses 'sealed' constructs - enablewith --sealed true
     */
    public static enum Version {
        SPARK_1{

            @Override
            String libFolder() {
                return "lib";
            }
        }
        ,
        SPARK_2{

            @Override
            String libFolder() {
                return "jars";
            }
        };


        abstract String libFolder();

        public static Version find(String sparkVersion) {
            return sparkVersion.startsWith("1.") ? SPARK_1 : SPARK_2;
        }
    }

    private static class ClusterConfig {
        private final String masterHost;
        private final int masterPort;
        private final int masterWebPort;
        private final File sparkHome;
        private final Collection<Runnable> cleanupTasks;
        private final Map<File, File> jarCache = new HashMap<File, File>();
        private final Version version;

        private ClusterConfig(String masterHost, int masterPort, int masterWebPort, File sparkHome, Collection<Runnable> cleanupTasks, Version version) {
            this.masterHost = masterHost;
            this.masterPort = masterPort;
            this.masterWebPort = masterWebPort;
            this.sparkHome = sparkHome;
            this.cleanupTasks = cleanupTasks;
            this.version = version;
        }
    }

    private class SparkProcessMonitor
    extends Thread
    implements AutoCloseable {
        private final ClusterConfig config;
        private final String[] mainAndArgs;
        private final BooleanSupplier healthCheck;
        private final CountDownLatch started = new CountDownLatch(1);
        private volatile Process process;
        private volatile boolean quit;

        private SparkProcessMonitor(ClusterConfig config, String name, BooleanSupplier healthCheck, String ... mainAndArgs) {
            this.setName(name);
            this.config = config;
            this.mainAndArgs = mainAndArgs;
            this.healthCheck = healthCheck;
        }

        @Override
        public synchronized void run() {
            if (this.quit) {
                return;
            }
            File sparkHome = this.config.sparkHome;
            try {
                String classpath = Stream.of(Optional.ofNullable(new File(sparkHome, this.config.version.libFolder()).listFiles()).orElseThrow(() -> new IllegalArgumentException("No spark dependencies in " + sparkHome))).map(File::getAbsolutePath).collect(Collectors.joining(File.pathSeparator));
                LOGGER.debug("Launching " + Arrays.asList(this.mainAndArgs));
                ProcessBuilder builder = new ProcessBuilder(new String[0]).redirectErrorStream(true).command(Stream.concat(Stream.of(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath(), "-cp", classpath), Stream.of(this.mainAndArgs)).collect(Collectors.toList()));
                Map<String, String> environment = builder.environment();
                String jvmVersion = System.getProperty("java.version", "1.8");
                Boolean isJava8 = jvmVersion.startsWith("1.8.") || jvmVersion.startsWith("8.");
                if (!isJava8.booleanValue()) {
                    environment.put("_JAVA_OPTIONS", BaseSpark.EXTRA_JVM_ARGS);
                }
                environment.put("SPARK_HOME", sparkHome.getAbsolutePath());
                environment.put("SPARK_SCALA_VERSION", BaseSpark.this.scalaVersion);
                if (this.config.version == Version.SPARK_1) {
                    environment.put("SPARK_CLASSPATH", classpath);
                }
                if (BaseSpark.isWin() && BaseSpark.this.installWinUtils) {
                    environment.put("HADOOP_HOME", sparkHome.getAbsolutePath());
                }
                this.process = builder.start();
                new Thread(new SurefireWorkaroundOutput(this.getName(), this.process.getInputStream())).start();
                int maxRetries = 500;
                while (!(this.healthCheck.getAsBoolean() && this.healthCheck.getAsBoolean() || maxRetries-- <= 0 || !this.process.isAlive())) {
                    try {
                        SparkProcessMonitor.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                        break;
                    }
                }
                LOGGER.info(this.getName() + " done");
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
            finally {
                this.started.countDown();
            }
        }

        private boolean isStarted() {
            long end = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(Integer.getInteger("talend.junit.spark.timeout", 5).intValue());
            while (!this.quit && end - System.currentTimeMillis() > 0L) {
                try {
                    if (!this.started.await(500L, TimeUnit.MILLISECONDS)) continue;
                    break;
                }
                catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
            return this.healthCheck.getAsBoolean();
        }

        @Override
        public synchronized void close() {
            if (this.quit) {
                return;
            }
            this.quit = true;
            if (this.process == null) {
                return;
            }
            try {
                this.process.exitValue();
            }
            catch (IllegalThreadStateException itse) {
                this.process.destroyForcibly();
                try {
                    this.process.exitValue();
                }
                catch (IllegalThreadStateException illegalThreadStateException) {
                    // empty catch block
                }
            }
            finally {
                this.process = null;
            }
        }
    }

    protected static class Instances
    implements AutoCloseable {
        private final AutoCloseable delegate;
        private boolean closed;
        private final Throwable exception;

        @Override
        public synchronized void close() throws Exception {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.delegate.close();
        }

        @Generated
        public Instances(AutoCloseable delegate, boolean closed, Throwable exception) {
            this.delegate = delegate;
            this.closed = closed;
            this.exception = exception;
        }

        @Generated
        public Throwable getException() {
            return this.exception;
        }
    }

    private static class SurefireWorkaroundOutput
    implements Runnable {
        private final String name;
        private final InputStream stream;
        private final ByteArrayOutputStream builder = new ByteArrayOutputStream();

        private SurefireWorkaroundOutput(String name, InputStream stream) {
            this.name = name;
            this.stream = stream;
        }

        @Override
        public void run() {
            try {
                int num;
                byte[] buf = new byte[64];
                while ((num = this.stream.read(buf)) != -1) {
                    for (int i = 0; i < num; ++i) {
                        if (buf[i] == 13 || buf[i] == 10) {
                            this.doLog();
                            this.builder.reset();
                            continue;
                        }
                        this.builder.write(buf[i]);
                    }
                }
                if (this.builder.size() > 0) {
                    this.doLog();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        private void doLog() {
            String string = this.builder.toString().trim();
            if (string.isEmpty()) {
                return;
            }
            LOGGER.info("[" + this.name + "] " + string);
        }
    }
}

