/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.http.rest;

import com.azure.core.MockServer;
import com.azure.core.annotation.BodyParam;
import com.azure.core.annotation.Delete;
import com.azure.core.annotation.ExpectedResponses;
import com.azure.core.annotation.Get;
import com.azure.core.annotation.HeaderParam;
import com.azure.core.annotation.Host;
import com.azure.core.annotation.PathParam;
import com.azure.core.annotation.Put;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpPipelineBuilder;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.AddDatePolicy;
import com.azure.core.http.policy.AddHeadersPolicy;
import com.azure.core.http.policy.HostPolicy;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpLoggingPolicy;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.RestProxy;
import com.azure.core.http.rest.StreamResponse;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RestProxyStressTests {
    private static IOService service;
    private static Process testServer;
    private static int port;
    private static Path tempFolderPath;
    private static final int NUM_FILES = 100;
    private static final int FILE_SIZE = 0x6400000;
    private static final int CHUNK_SIZE = 8192;
    private static final int CHUNKS_PER_FILE = 12800;

    @BeforeAll
    public static void beforeClass() throws IOException {
        Assumptions.assumeTrue((boolean)Boolean.parseBoolean(System.getenv("JAVA_SDK_STRESS_TESTS")), (String)"Set the environment variable JAVA_SDK_STRESS_TESTS to \"true\" to run stress tests");
        String tempFolderPath = System.getenv("JAVA_STRESS_TEST_TEMP_PATH");
        if (tempFolderPath == null || tempFolderPath.isEmpty()) {
            tempFolderPath = "temp";
        }
        HttpHeaders headers = new HttpHeaders().put("x-ms-version", "2017-04-17");
        ArrayList<Object> polices = new ArrayList<Object>();
        polices.add(new AddDatePolicy());
        polices.add(new AddHeadersPolicy(headers));
        polices.add(new ThrottlingRetryPolicy());
        String liveStressTests = System.getenv("JAVA_SDK_TEST_SAS");
        if (liveStressTests == null || liveStressTests.isEmpty()) {
            RestProxyStressTests.launchTestServer();
            polices.add(new HostPolicy("http://localhost:" + port));
        }
        polices.add(new HttpLoggingPolicy(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BASIC)));
        service = (IOService)RestProxy.create(IOService.class, (HttpPipeline)new HttpPipelineBuilder().policies(polices.toArray(new HttpPipelinePolicy[0])).build());
        RestProxyStressTests.tempFolderPath = Paths.get(tempFolderPath, new String[0]);
        RestProxyStressTests.create100MFiles(false);
    }

    private static void launchTestServer() throws IOException {
        String portString = System.getenv("JAVA_SDK_TEST_PORT");
        Assumptions.assumeTrue((portString != null ? 1 : 0) != 0, (String)"JAVA_SDK_TEST_PORT must specify the port of a running local server");
        if (portString != null) {
            port = Integer.parseInt(portString, 10);
            LoggerFactory.getLogger(RestProxyStressTests.class).warn("Attempting to connect to already-running test server on port {}", (Object)port);
        } else {
            String javaHome = System.getProperty("java.home");
            String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";
            String classpath = System.getProperty("java.class.path");
            String className = MockServer.class.getCanonicalName();
            ProcessBuilder builder = new ProcessBuilder(javaExecutable, "-cp", classpath, className).redirectErrorStream(true).redirectOutput(ProcessBuilder.Redirect.INHERIT);
            testServer = builder.start();
        }
    }

    @AfterAll
    public static void afterClass() throws Exception {
        if (testServer != null) {
            testServer.destroy();
        }
    }

    private static void deleteRecursive(Path tempFolderPath) throws IOException {
        try {
            Files.walkFileTree(tempFolderPath, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    Files.delete(file);
                    return FileVisitResult.CONTINUE;
                }

                @Override
                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                    if (exc != null) {
                        throw exc;
                    }
                    Files.delete(dir);
                    return FileVisitResult.CONTINUE;
                }
            });
        }
        catch (NoSuchFileException noSuchFileException) {
            // empty catch block
        }
    }

    private static void create100MFiles(boolean recreate) throws IOException {
        Assertions.fail((String)"This method is not yet re-implemented");
    }

    @Test
    @Disabled(value="Should only be run manually")
    public void prepare100MFiles() throws Exception {
        RestProxyStressTests.create100MFiles(true);
    }

    @Test
    public void upload100MParallelTest() {
        Assertions.fail((String)"Need to implement this test again");
    }

    @Test
    public void uploadMemoryMappedTest() {
        Assertions.fail((String)"Need to implement this test again");
    }

    @Test
    public void download100MParallelTest() {
        String sas = System.getenv("JAVA_SDK_TEST_SAS") == null ? "" : System.getenv("JAVA_SDK_TEST_SAS");
        Flux md5s = Flux.range((int)0, (int)100).map(integer -> {
            Path filePath = tempFolderPath.resolve("100m-" + integer + "-md5.dat");
            try {
                return Files.readAllBytes(filePath);
            }
            catch (IOException ioe) {
                throw Exceptions.propagate((Throwable)ioe);
            }
        });
        Instant downloadStart = Instant.now();
        Flux.range((int)0, (int)100).zipWith((Publisher)md5s, (id, md5) -> service.download100M(String.valueOf(id), sas).flatMap(response -> {
            try {
                MessageDigest messageDigest = MessageDigest.getInstance("MD5");
                Flux content = response.getValue().doOnNext(buf -> messageDigest.update(buf.slice()));
                return content.last().doOnSuccess(b -> {
                    Assertions.assertArrayEquals((byte[])md5, (byte[])messageDigest.digest());
                    LoggerFactory.getLogger(this.getClass()).info("Finished downloading and MD5 validated for " + id);
                });
            }
            catch (NoSuchAlgorithmException nsae) {
                throw Exceptions.propagate((Throwable)nsae);
            }
        })).flatMapDelayError(m -> m, 15, 1).blockLast();
        long durationMilliseconds = Duration.between(downloadStart, Instant.now()).toMillis();
        LoggerFactory.getLogger(this.getClass()).info("Download took " + durationMilliseconds + " milliseconds.");
    }

    @Test
    public void downloadUploadStreamingTest() {
        String sas = System.getenv("JAVA_SDK_TEST_SAS") == null ? "" : System.getenv("JAVA_SDK_TEST_SAS");
        Flux md5s = Flux.range((int)0, (int)100).map(integer -> {
            Path filePath = tempFolderPath.resolve("100m-" + integer + "-md5.dat");
            try {
                return Files.readAllBytes(filePath);
            }
            catch (IOException ioe) {
                throw Exceptions.propagate((Throwable)ioe);
            }
        });
        Instant downloadStart = Instant.now();
        Flux.range((int)0, (int)100).zipWith((Publisher)md5s, (integer, md5) -> {
            int id = integer;
            Flux downloadContent = service.download100M(String.valueOf(id), sas).flatMapMany(StreamResponse::getValue).map(reactorNettybb -> {
                throw new IllegalStateException("This method is not yet re-implemented");
            });
            return service.upload100MB("copy-" + integer, sas, "BlockBlob", (Flux<ByteBuffer>)downloadContent, 0x6400000L).flatMap(uploadResponse -> {
                String base64MD5 = uploadResponse.getHeaders().getValue("Content-MD5");
                byte[] uploadMD5 = Base64.getDecoder().decode(base64MD5);
                Assertions.assertArrayEquals((byte[])md5, (byte[])uploadMD5);
                LoggerFactory.getLogger(this.getClass()).info("Finished upload and validation for id " + id);
                return Mono.just((Object)uploadResponse);
            });
        }).flatMapDelayError(m -> m, 30, 1).blockLast();
        long durationMilliseconds = Duration.between(downloadStart, Instant.now()).toMillis();
        LoggerFactory.getLogger(this.getClass()).info("Download/Upload took " + durationMilliseconds + " milliseconds.");
    }

    @Test
    public void cancellationTest() throws Exception {
        String sas = System.getenv("JAVA_SDK_TEST_SAS") == null ? "" : System.getenv("JAVA_SDK_TEST_SAS");
        Disposable d = Flux.range((int)0, (int)100).flatMap(integer -> service.download100M(String.valueOf(integer), sas).flatMapMany(StreamResponse::getValue)).subscribe();
        Mono.delay((Duration)Duration.ofSeconds(10L)).then(Mono.defer(() -> {
            d.dispose();
            return Mono.empty();
        })).block();
        Thread.sleep(10000L);
    }

    @Test
    public void testHighParallelism() {
        String sas = System.getenv("JAVA_SDK_TEST_SAS") == null ? "" : System.getenv("JAVA_SDK_TEST_SAS");
        HttpHeaders headers = new HttpHeaders().put("x-ms-version", "2017-04-17");
        ArrayList<Object> policies = new ArrayList<Object>();
        policies.add(new AddDatePolicy());
        policies.add(new AddHeadersPolicy(headers));
        policies.add(new ThrottlingRetryPolicy());
        if (sas == null || sas.isEmpty()) {
            policies.add(new HostPolicy("http://localhost:" + port));
        }
        IOService innerService = (IOService)RestProxy.create(IOService.class, (HttpPipeline)new HttpPipelineBuilder().policies(policies.toArray(new HttpPipelinePolicy[0])).build());
        Flux.range((int)0, (int)10000).flatMap(integer -> innerService.createContainer(integer.toString(), sas).onErrorResume(throwable -> {
            HttpResponseException restException;
            if (throwable instanceof HttpResponseException && ((restException = (HttpResponseException)throwable).getResponse().getStatusCode() == 409 || restException.getResponse().getStatusCode() == 404)) {
                return Mono.empty();
            }
            return Mono.error((Throwable)throwable);
        }).then(innerService.deleteContainer(integer.toString(), sas))).blockLast();
    }

    static {
        port = 8080;
    }

    @Host(value="https://javasdktest.blob.core.windows.net")
    static interface IOService {
        @ExpectedResponses(value={201})
        @Put(value="/javasdktest/upload/100m-{id}.dat?{sas}")
        public Mono<Response<Void>> upload100MB(@PathParam(value="id") String var1, @PathParam(value="sas", encoded=true) String var2, @HeaderParam(value="x-ms-blob-type") String var3, @BodyParam(value="application/octet-stream") Flux<ByteBuffer> var4, @HeaderParam(value="content-length") long var5);

        @Get(value="/javasdktest/upload/100m-{id}.dat?{sas}")
        public Mono<StreamResponse> download100M(@PathParam(value="id") String var1, @PathParam(value="sas", encoded=true) String var2);

        @ExpectedResponses(value={201})
        @Put(value="/testcontainer{id}?restype=container&{sas}")
        public Mono<Response<Void>> createContainer(@PathParam(value="id") String var1, @PathParam(value="sas", encoded=true) String var2);

        @ExpectedResponses(value={202})
        @Delete(value="/testcontainer{id}?restype=container&{sas}")
        public Mono<Response<Void>> deleteContainer(@PathParam(value="id") String var1, @PathParam(value="sas", encoded=true) String var2);
    }

    private static final class ThrottlingRetryPolicy
    implements HttpPipelinePolicy {
        private ThrottlingRetryPolicy() {
        }

        public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
            return this.process(1 + ThreadLocalRandom.current().nextInt(5), context, next);
        }

        Mono<HttpResponse> process(int waitTimeSeconds, HttpPipelineCallContext context, HttpPipelineNextPolicy nextPolicy) {
            return nextPolicy.clone().process().flatMap(httpResponse -> {
                if (httpResponse.getStatusCode() != 503 && httpResponse.getStatusCode() != 500) {
                    return Mono.just((Object)httpResponse);
                }
                LoggerFactory.getLogger(this.getClass()).warn("Received " + httpResponse.getStatusCode() + " for request. Waiting " + waitTimeSeconds + " seconds before retry.");
                int nextWaitTime = 5 + ThreadLocalRandom.current().nextInt(10);
                httpResponse.getBody().subscribe().dispose();
                return Mono.delay((Duration)Duration.of(waitTimeSeconds, ChronoUnit.SECONDS)).then(this.process(nextWaitTime, context, nextPolicy));
            }).onErrorResume(throwable -> {
                if (throwable instanceof IOException) {
                    LoggerFactory.getLogger(this.getClass()).warn("I/O exception occurred: " + throwable.getMessage());
                    return this.process(context, nextPolicy).delaySubscription(Duration.of(waitTimeSeconds, ChronoUnit.SECONDS));
                }
                LoggerFactory.getLogger(this.getClass()).warn("Unrecoverable exception occurred: " + throwable.getMessage());
                return Mono.error((Throwable)throwable);
            });
        }
    }
}

