/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.container;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

public class FlinkContainers
implements BeforeAllCallback,
AfterAllCallback {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class);
    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30L);
    private final GenericContainer<?> jobManager;
    private final List<GenericContainer<?>> taskManagers;
    private final GenericContainer<?> haService;
    private final Configuration conf;
    @Nullable
    private RestClusterClient<StandaloneClusterId> restClusterClient;
    private boolean isStarted = false;

    public static Builder builder() {
        return new Builder();
    }

    FlinkContainers(GenericContainer<?> jobManager, List<GenericContainer<?>> taskManagers, @Nullable GenericContainer<?> haService, Configuration conf) {
        this.jobManager = jobManager;
        this.taskManagers = taskManagers;
        this.haService = haService;
        this.conf = conf;
    }

    public void start() throws Exception {
        if (this.haService != null) {
            LOG.debug("Starting HA service container");
            this.haService.start();
        }
        LOG.debug("Starting JobManager container");
        this.jobManager.start();
        this.waitUntilJobManagerRESTReachable(this.jobManager);
        LOG.debug("Starting TaskManager containers");
        this.taskManagers.parallelStream().forEach(GenericContainer::start);
        LOG.debug("Creating REST cluster client");
        this.restClusterClient = this.createClusterClient();
        this.waitUntilAllTaskManagerConnected();
        this.isStarted = true;
    }

    public void stop() {
        this.isStarted = false;
        if (this.restClusterClient != null) {
            this.restClusterClient.close();
        }
        this.taskManagers.forEach(GenericContainer::stop);
        this.deleteJobManagerTemporaryFiles();
        this.jobManager.stop();
        if (this.haService != null) {
            this.haService.stop();
        }
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public GenericContainer<?> getJobManager() {
        return this.jobManager;
    }

    public String getJobManagerHost() {
        return this.jobManager.getHost();
    }

    public int getJobManagerPort() {
        return this.jobManager.getMappedPort(((Integer)this.conf.get(RestOptions.PORT)).intValue());
    }

    @Nullable
    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
        return this.restClusterClient;
    }

    public void restartJobManager(RunnableWithException afterFailAction) throws Exception {
        if (this.haService == null) {
            LOG.warn("Restarting JobManager without HA service. This might drop all your running jobs");
        }
        this.jobManager.stop();
        afterFailAction.run();
        this.jobManager.start();
        this.waitUntilJobManagerRESTReachable(this.jobManager);
        this.restClusterClient = this.createClusterClient();
        this.waitUntilAllTaskManagerConnected();
    }

    public void restartTaskManager(RunnableWithException afterFailAction) throws Exception {
        this.taskManagers.forEach(GenericContainer::stop);
        afterFailAction.run();
        this.taskManagers.forEach(GenericContainer::start);
    }

    public void submitSQLJob(SQLJobSubmission job) throws IOException, InterruptedException {
        Preconditions.checkState((boolean)this.isStarted(), (Object)"SQL job submission is only applicable for a running cluster");
        ArrayList<String> commands = new ArrayList<String>();
        Path script = Files.createTempDirectory("sql-script", new FileAttribute[0]).resolve("script");
        Files.write(script, (Iterable<? extends CharSequence>)job.getSqlLines(), new OpenOption[0]);
        this.jobManager.copyFileToContainer(MountableFile.forHostPath((Path)script), "/tmp/script.sql");
        commands.add("cat /tmp/script.sql | ");
        commands.add("bin/sql-client.sh");
        for (String jar : job.getJars()) {
            commands.add("--jar");
            Path path = Paths.get(jar, new String[0]);
            String containerPath = "/tmp/" + path.getFileName();
            this.jobManager.copyFileToContainer(MountableFile.forHostPath((Path)path), containerPath);
            commands.add(containerPath);
        }
        Container.ExecResult execResult = this.jobManager.execInContainer(new String[]{"bash", "-c", String.join((CharSequence)" ", commands)});
        LOG.info(execResult.getStdout());
        LOG.error(execResult.getStderr());
        if (execResult.getExitCode() != 0) {
            throw new AssertionError((Object)"Failed when submitting the SQL job.");
        }
    }

    public JobID submitJob(JobSubmission job) throws IOException, InterruptedException {
        ArrayList<String> commands = new ArrayList<String>();
        commands.add("bin/flink");
        commands.add("run");
        if (job.isDetached()) {
            commands.add("-d");
        }
        if (job.getParallelism() > 0) {
            commands.add("-p");
            commands.add(String.valueOf(job.getParallelism()));
        }
        job.getMainClass().ifPresent(mainClass -> {
            commands.add("--class");
            commands.add((String)mainClass);
        });
        Path jobJar = job.getJar();
        String containerPath = "/tmp/" + jobJar.getFileName();
        commands.add(containerPath);
        this.jobManager.copyFileToContainer(MountableFile.forHostPath((Path)jobJar.toAbsolutePath()), containerPath);
        commands.addAll(job.getArguments());
        LOG.info("Running {}.", (Object)commands.stream().collect(Collectors.joining(" ")));
        Container.ExecResult execResult = this.jobManager.execInContainer(new String[]{"bash", "-c", String.join((CharSequence)" ", commands)});
        Pattern pattern = job.isDetached() ? Pattern.compile("Job has been submitted with JobID (.*)") : Pattern.compile("Job with JobID (.*) has finished.");
        String stdout = execResult.getStdout();
        LOG.info(stdout);
        LOG.error(execResult.getStderr());
        Matcher matcher = pattern.matcher(stdout);
        Preconditions.checkState((boolean)matcher.find(), (Object)"Cannot extract JobID from stdout.");
        return JobID.fromHexString((String)matcher.group(1));
    }

    public void beforeAll(ExtensionContext context) throws Exception {
        this.start();
    }

    public void afterAll(ExtensionContext context) {
        this.stop();
    }

    private RestClusterClient<StandaloneClusterId> createClusterClient() throws Exception {
        Preconditions.checkState((boolean)this.jobManager.isRunning(), (Object)"JobManager should be running for creating a REST client");
        if (this.restClusterClient != null) {
            this.restClusterClient.close();
        }
        Configuration clientConfiguration = new Configuration();
        clientConfiguration.set(RestOptions.ADDRESS, (Object)this.getJobManagerHost());
        clientConfiguration.set(RestOptions.PORT, (Object)this.jobManager.getMappedPort(((Integer)this.conf.get(RestOptions.PORT)).intValue()));
        return new RestClusterClient(clientConfiguration, (Object)StandaloneClusterId.getInstance());
    }

    private void waitUntilJobManagerRESTReachable(GenericContainer<?> jobManager) {
        LOG.debug("Waiting for JobManager's REST interface getting ready");
        new HttpWaitStrategy().forPort(((Integer)this.conf.get(RestOptions.PORT)).intValue()).forPath("/overview").forStatusCode(200).withReadTimeout(DEFAULT_TIMEOUT).waitUntilReady(jobManager);
    }

    private void waitUntilAllTaskManagerConnected() throws InterruptedException, TimeoutException {
        LOG.debug("Waiting for all TaskManagers connecting to JobManager");
        Preconditions.checkNotNull(this.restClusterClient, (String)"REST cluster client should not be null when checking TaskManager status");
        CommonTestUtils.waitUtil(() -> {
            ClusterOverviewWithVersion clusterOverview;
            try {
                clusterOverview = (ClusterOverviewWithVersion)this.restClusterClient.sendRequest((MessageHeaders)ClusterOverviewHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance()).get();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to get cluster overview", e);
            }
            return clusterOverview.getNumTaskManagersConnected() == this.taskManagers.size();
        }, (Duration)DEFAULT_TIMEOUT, (String)"TaskManagers are not ready within 30 seconds");
    }

    private void deleteJobManagerTemporaryFiles() {
        String checkpointDir = (String)this.conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
        String haDir = (String)this.conf.get(HighAvailabilityOptions.HA_STORAGE_PATH);
        Collection usedPaths = Lists.newArrayList((Object[])new String[]{checkpointDir, haDir}).stream().filter(Objects::nonNull).collect(Collectors.toList());
        if (usedPaths.isEmpty()) {
            return;
        }
        StringBuilder deletionBaseCommand = new StringBuilder("rm -rf");
        usedPaths.forEach(p -> deletionBaseCommand.append(this.formatFilePathForDeletion((String)p)));
        CharSequence[] command = new String[]{"bash", "-c", deletionBaseCommand.toString()};
        try {
            Container.ExecResult result = this.jobManager.execInContainer((String[])command);
            if (result.getExitCode() != 0) {
                throw new IllegalStateException(String.format("Command \"%s\" returned non-zero exit code %d. \nSTDOUT: %s\nSTDERR: %s", String.join((CharSequence)" ", command), result.getExitCode(), result.getStdout(), result.getStderr()));
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to delete temporary files generated by the flink cluster.", e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Failed to delete temporary files generated by the flink cluster.", e);
        }
    }

    private String formatFilePathForDeletion(String path) {
        return " " + Paths.get(path, new String[0]).toString().split("file:")[1] + "/*";
    }

    public static final class Builder {
        private FlinkContainersSettings flinkContainersSettings = FlinkContainersSettings.defaultConfig();
        private TestcontainersSettings testcontainersSettings = TestcontainersSettings.defaultSettings();

        private Builder() {
        }

        public Builder withFlinkContainersSettings(FlinkContainersSettings flinkContainersSettings) {
            this.flinkContainersSettings = flinkContainersSettings;
            return this;
        }

        public Builder withTestcontainersSettings(TestcontainersSettings testcontainersSettings) {
            this.testcontainersSettings = testcontainersSettings;
            return this;
        }

        public FlinkContainers build() {
            FlinkTestcontainersConfigurator configurator = new FlinkTestcontainersConfigurator(this.flinkContainersSettings, this.testcontainersSettings);
            return configurator.configure();
        }
    }
}

