/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program.rest;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestClusterClient<T>
implements ClusterClient<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RestClusterClient.class);
    private final RestClusterClientConfiguration restClusterClientConfiguration;
    private final Configuration configuration;
    private final RestClient restClient;
    private final ExecutorService executorService = Executors.newFixedThreadPool(4, (ThreadFactory)new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    private final WaitStrategy waitStrategy;
    private final T clusterId;
    private final ClientHighAvailabilityServices clientHAServices;
    private final LeaderRetrievalService webMonitorRetrievalService;
    private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private ScheduledExecutorService retryExecutorService;
    private final Predicate<Throwable> unknownJobStateRetryable = exception -> ExceptionUtils.findThrowable((Throwable)exception, JobStateUnknownException.class).isPresent();

    public RestClusterClient(Configuration config, T clusterId) throws Exception {
        this(config, clusterId, HighAvailabilityServicesUtils.createClientHAService((Configuration)config));
    }

    public RestClusterClient(Configuration config, T clusterId, ClientHighAvailabilityServices clientHAServices) throws Exception {
        this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), clientHAServices);
    }

    @VisibleForTesting
    RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
        this(configuration, restClient, clusterId, waitStrategy, HighAvailabilityServicesUtils.createClientHAService((Configuration)configuration));
    }

    private RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, ClientHighAvailabilityServices clientHAServices) throws Exception {
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
        this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
        this.restClient = restClient != null ? restClient : new RestClient(this.restClusterClientConfiguration.getRestClientConfiguration(), (Executor)this.executorService);
        this.waitStrategy = (WaitStrategy)Preconditions.checkNotNull((Object)waitStrategy);
        this.clusterId = Preconditions.checkNotNull(clusterId);
        this.clientHAServices = (ClientHighAvailabilityServices)Preconditions.checkNotNull((Object)clientHAServices);
        this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever();
        this.retryExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
        this.startLeaderRetrievers();
    }

    private void startLeaderRetrievers() throws Exception {
        this.webMonitorRetrievalService.start((LeaderRetrievalListener)this.webMonitorLeaderRetriever);
    }

    @Override
    public Configuration getFlinkConfiguration() {
        return new Configuration(this.configuration);
    }

    @Override
    public void close() {
        if (this.running.compareAndSet(true, false)) {
            ExecutorUtils.gracefulShutdown((long)this.restClusterClientConfiguration.getRetryDelay(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{this.retryExecutorService});
            this.restClient.shutdown(Time.seconds((long)5L));
            ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.executorService});
            try {
                this.webMonitorRetrievalService.stop();
            }
            catch (Exception e) {
                LOG.error("An error occurred during stopping the WebMonitorRetrievalService", (Throwable)e);
            }
            try {
                this.clientHAServices.close();
            }
            catch (Exception e) {
                LOG.error("An error occurred during stopping the ClientHighAvailabilityServices", (Throwable)e);
            }
        }
    }

    public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
        JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
        JobMessageParameters params = new JobMessageParameters();
        params.jobPathParameter.resolve((Object)jobId);
        return this.sendRequest(detailsHeaders, params);
    }

    @Override
    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
        CheckedSupplier operation = () -> this.requestJobStatus(jobId);
        return this.retry(operation, this.unknownJobStateRetryable);
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
        CheckedSupplier operation = () -> this.requestJobResultInternal(jobId);
        return this.retry(operation, this.unknownJobStateRetryable);
    }

    @Override
    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
        CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
            try {
                java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin", new FileAttribute[0]);
                try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile, new OpenOption[0]));){
                    objectOut.writeObject(jobGraph);
                }
                return jobGraphFile;
            }
            catch (IOException e) {
                throw new CompletionException((Throwable)new FlinkException("Failed to serialize JobGraph.", (Throwable)e));
            }
        }, this.executorService);
        CompletionStage requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
            ArrayList<String> jarFileNames = new ArrayList<String>(8);
            ArrayList<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<JobSubmitRequestBody.DistributedCacheFile>(8);
            ArrayList<FileUpload> filesToUpload = new ArrayList<FileUpload>(8);
            filesToUpload.add(new FileUpload(jobGraphFile, "application/octet-stream"));
            for (Path path : jobGraph.getUserJars()) {
                jarFileNames.add(path.getName());
                filesToUpload.add(new FileUpload(Paths.get(path.toUri()), "application/java-archive"));
            }
            for (Map.Entry entry : jobGraph.getUserArtifacts().entrySet()) {
                Path artifactFilePath = new Path(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath);
                try {
                    if (artifactFilePath.getFileSystem().isDistributedFS()) continue;
                    artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile((String)entry.getKey(), artifactFilePath.getName()));
                    filesToUpload.add(new FileUpload(Paths.get(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath, new String[0]), "application/octet-stream"));
                }
                catch (IOException e) {
                    throw new CompletionException((Throwable)new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", (Throwable)e));
                }
            }
            JobSubmitRequestBody requestBody = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), jarFileNames, artifactFileNames);
            return Tuple2.of((Object)requestBody, Collections.unmodifiableCollection(filesToUpload));
        });
        CompletionStage submissionFuture = ((CompletableFuture)requestFuture).thenCompose(requestAndFileUploads -> this.sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), (RequestBody)requestAndFileUploads.f0, (Collection)requestAndFileUploads.f1, RestClusterClient.isConnectionProblemOrServiceUnavailable()));
        ((CompletableFuture)((CompletableFuture)submissionFuture).thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)).thenAccept(jobGraphFile -> {
            try {
                Files.delete(jobGraphFile);
            }
            catch (IOException e) {
                LOG.warn("Could not delete temporary file {}.", jobGraphFile, (Object)e);
            }
        });
        return ((CompletableFuture)((CompletableFuture)submissionFuture).thenApply(ignore -> jobGraph.getJobID())).exceptionally(throwable -> {
            throw new CompletionException((Throwable)new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException((Throwable)throwable)));
        });
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(JobID jobID) {
        JobCancellationMessageParameters params = new JobCancellationMessageParameters();
        params.jobPathParameter.resolve((Object)jobID);
        params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
        CompletableFuture responseFuture = this.sendRequest(JobCancellationHeaders.getInstance(), params);
        return responseFuture.thenApply(ignore -> Acknowledge.get());
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfTime, @Nullable String savepointDirectory) {
        StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders = StopWithSavepointTriggerHeaders.getInstance();
        SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters = stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
        stopWithSavepointTriggerMessageParameters.jobID.resolve((Object)jobId);
        CompletableFuture responseFuture = this.sendRequest(stopWithSavepointTriggerHeaders, stopWithSavepointTriggerMessageParameters, new StopWithSavepointRequestBody(savepointDirectory, advanceToEndOfTime));
        return ((CompletableFuture)responseFuture.thenCompose(savepointTriggerResponseBody -> {
            TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId();
            return this.pollSavepointAsync(jobId, savepointTriggerId);
        })).thenApply(savepointInfo -> {
            if (savepointInfo.getFailureCause() != null) {
                throw new CompletionException((Throwable)savepointInfo.getFailureCause());
            }
            return savepointInfo.getLocation();
        });
    }

    @Override
    public CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) {
        return this.triggerSavepoint(jobId, savepointDirectory, true);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory) {
        return this.triggerSavepoint(jobId, savepointDirectory, false);
    }

    @Override
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, OperatorID operatorId, CoordinationRequest request) {
        SerializedValue serializedRequest;
        ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
        ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
        params.jobPathParameter.resolve((Object)jobId);
        params.operatorPathParameter.resolve((Object)operatorId);
        try {
            serializedRequest = new SerializedValue((Object)request);
        }
        catch (IOException e) {
            return FutureUtils.completedExceptionally((Throwable)e);
        }
        ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
        return this.sendRequest(headers, params, requestBody).thenApply(responseBody -> {
            try {
                return (CoordinationResponse)responseBody.getSerializedCoordinationResponse().deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new CompletionException("Failed to deserialize coordination response", e);
            }
        });
    }

    private CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory, boolean cancelJob) {
        SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
        SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
        savepointTriggerMessageParameters.jobID.resolve((Object)jobId);
        CompletableFuture responseFuture = this.sendRequest(savepointTriggerHeaders, savepointTriggerMessageParameters, new SavepointTriggerRequestBody(savepointDirectory, cancelJob));
        return ((CompletableFuture)responseFuture.thenCompose(savepointTriggerResponseBody -> {
            TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId();
            return this.pollSavepointAsync(jobId, savepointTriggerId);
        })).thenApply(savepointInfo -> {
            if (savepointInfo.getFailureCause() != null) {
                throw new CompletionException((Throwable)savepointInfo.getFailureCause());
            }
            return savepointInfo.getLocation();
        });
    }

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
        JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
        JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
        accMsgParams.jobPathParameter.resolve((Object)jobID);
        accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
        CompletableFuture responseFuture = this.sendRequest(accumulatorsHeaders, accMsgParams);
        return ((CompletableFuture)responseFuture.thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators)).thenApply(accumulators -> {
            try {
                return AccumulatorHelper.deserializeAndUnwrapAccumulators((Map)accumulators, (ClassLoader)loader);
            }
            catch (Exception e) {
                throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
            }
        });
    }

    private CompletableFuture<SavepointInfo> pollSavepointAsync(JobID jobId, TriggerId triggerID) {
        return this.pollResourceAsync(() -> {
            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
            SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
            savepointStatusMessageParameters.jobIdPathParameter.resolve((Object)jobId);
            savepointStatusMessageParameters.triggerIdPathParameter.resolve((Object)triggerID);
            return this.sendRequest(savepointStatusHeaders, savepointStatusMessageParameters);
        });
    }

    @Override
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return this.sendRequest(JobsOverviewHeaders.getInstance()).thenApply(multipleJobsDetails -> multipleJobsDetails.getJobs().stream().map(detail -> new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())).collect(Collectors.toList()));
    }

    @Override
    public T getClusterId() {
        return this.clusterId;
    }

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
        SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath);
        CompletableFuture savepointDisposalTriggerFuture = this.sendRequest(SavepointDisposalTriggerHeaders.getInstance(), savepointDisposalRequest);
        CompletionStage savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose(triggerResponse -> {
            TriggerId triggerId = triggerResponse.getTriggerId();
            SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
            SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
            savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve((Object)triggerId);
            return this.pollResourceAsync(() -> this.sendRequest(savepointDisposalStatusHeaders, savepointDisposalStatusMessageParameters));
        });
        return ((CompletableFuture)savepointDisposalFuture).thenApply(asynchronousOperationInfo -> {
            if (asynchronousOperationInfo.getFailureCause() == null) {
                return Acknowledge.get();
            }
            throw new CompletionException((Throwable)asynchronousOperationInfo.getFailureCause());
        });
    }

    @Override
    public void shutDownCluster() {
        try {
            this.sendRequest(ShutdownHeaders.getInstance()).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            LOG.error("Error while shutting down cluster", (Throwable)e);
        }
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> resourceFutureSupplier) {
        return this.pollResourceAsync(resourceFutureSupplier, new CompletableFuture(), 0L);
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> resourceFutureSupplier, CompletableFuture<R> resultFuture, long attempt) {
        resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally((Throwable)throwable);
            } else if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) {
                resultFuture.complete(asynchronouslyCreatedResource.resource());
            } else {
                this.retryExecutorService.schedule(() -> this.lambda$null$23((Supplier)resourceFutureSupplier, resultFuture, attempt), this.waitStrategy.sleepTime(attempt), TimeUnit.MILLISECONDS);
            }
        });
        return resultFuture;
    }

    @Override
    public String getWebInterfaceURL() {
        try {
            return this.getWebMonitorBaseUrl().get().toString();
        }
        catch (InterruptedException | ExecutionException e) {
            ExceptionUtils.checkInterrupted((Throwable)e);
            LOG.warn("Could not retrieve the web interface URL for the cluster.", (Throwable)e);
            return "Unknown address.";
        }
    }

    private CompletableFuture<JobStatus> requestJobStatus(JobID jobId) {
        return ((CompletableFuture)this.getJobDetails(jobId).thenApply(JobDetailsInfo::getJobStatus)).thenApply(jobStatus -> {
            if (jobStatus == JobStatus.SUSPENDED) {
                throw new JobStateUnknownException(String.format("Job %s is in state SUSPENDED", jobId));
            }
            return jobStatus;
        });
    }

    private CompletableFuture<JobResult> requestJobResultInternal(@Nonnull JobID jobId) {
        return this.pollResourceAsync(() -> {
            JobMessageParameters messageParameters = new JobMessageParameters();
            messageParameters.jobPathParameter.resolve((Object)jobId);
            return this.sendRequest(JobExecutionResultHeaders.getInstance(), messageParameters);
        }).thenApply(jobResult -> {
            if (jobResult.getApplicationStatus() == ApplicationStatus.UNKNOWN) {
                throw new JobStateUnknownException(String.format("Result for Job %s is UNKNOWN", jobId));
            }
            return jobResult;
        });
    }

    private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters) {
        return this.sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance());
    }

    private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, R request) {
        return this.sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request);
    }

    @VisibleForTesting
    <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders) {
        return this.sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    @VisibleForTesting
    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
        return this.sendRetriableRequest(messageHeaders, messageParameters, request, RestClusterClient.isConnectionProblemOrServiceUnavailable());
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
        return this.sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate);
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
        return this.retry(() -> this.getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
            try {
                return this.restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
            }
            catch (IOException e) {
                throw new CompletionException(e);
            }
        }), retryPredicate);
    }

    private <C> CompletableFuture<C> retry(CheckedSupplier<CompletableFuture<C>> operation, Predicate<Throwable> retryPredicate) {
        return FutureUtils.retryWithDelay((Supplier)CheckedSupplier.unchecked(operation), (int)this.restClusterClientConfiguration.getRetryMaxAttempts(), (Time)Time.milliseconds((long)this.restClusterClientConfiguration.getRetryDelay()), retryPredicate, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(this.retryExecutorService));
    }

    private static Predicate<Throwable> isConnectionProblemOrServiceUnavailable() {
        return RestClusterClient.isConnectionProblemException().or(RestClusterClient.isServiceUnavailable());
    }

    private static Predicate<Throwable> isConnectionProblemException() {
        return throwable -> ExceptionUtils.findThrowable((Throwable)throwable, ConnectException.class).isPresent() || ExceptionUtils.findThrowable((Throwable)throwable, SocketTimeoutException.class).isPresent() || ExceptionUtils.findThrowable((Throwable)throwable, ConnectTimeoutException.class).isPresent() || ExceptionUtils.findThrowable((Throwable)throwable, IOException.class).isPresent();
    }

    private static Predicate<Throwable> isServiceUnavailable() {
        return RestClusterClient.httpExceptionCodePredicate(code -> code.intValue() == HttpResponseStatus.SERVICE_UNAVAILABLE.code());
    }

    private static Predicate<Throwable> httpExceptionCodePredicate(Predicate<Integer> statusCodePredicate) {
        return throwable -> ExceptionUtils.findThrowable((Throwable)throwable, RestClientException.class).map(restClientException -> {
            int code = restClientException.getHttpResponseStatus().code();
            return statusCodePredicate.test(code);
        }).orElse(false);
    }

    @VisibleForTesting
    CompletableFuture<URL> getWebMonitorBaseUrl() {
        return FutureUtils.orTimeout((CompletableFuture)this.webMonitorLeaderRetriever.getLeaderFuture(), (long)this.restClusterClientConfiguration.getAwaitLeaderTimeout(), (TimeUnit)TimeUnit.MILLISECONDS).thenApplyAsync(leaderAddressSessionId -> {
            String url = (String)leaderAddressSessionId.f0;
            try {
                return new URL(url);
            }
            catch (MalformedURLException e) {
                throw new IllegalArgumentException("Could not parse URL from " + url, e);
            }
        }, (Executor)this.executorService);
    }

    private /* synthetic */ void lambda$null$23(Supplier resourceFutureSupplier, CompletableFuture resultFuture, long attempt) {
        this.pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1L);
    }

    private static class JobStateUnknownException
    extends RuntimeException {
        public JobStateUnknownException(String message) {
            super(message);
        }
    }
}

