/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.concurrent.FutureUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class PermanentBlobCacheSizeLimitTest {
    private static final Random RANDOM = new Random();
    private static final BlobKey.BlobType BLOB_TYPE = BlobKey.BlobType.PERMANENT_BLOB;
    private static final int BLOB_SIZE = 10000;
    private static final int MAX_NUM_OF_ACCEPTED_BLOBS = 2;
    private static final int TOTAL_NUM_OF_BLOBS = 3;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Test
    public void testTrackSizeLimitAndDeleteExcessSequentially() throws Exception {
        Configuration config = new Configuration();
        try (BlobServer server = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), (BlobStore)new VoidBlobStore());
             BlobCacheService cache = PermanentBlobCacheSizeLimitTest.initBlobCacheServiceWithSizeLimit(config, new InetSocketAddress("localhost", server.getPort()));){
            server.start();
            BlobInfo[] blobs = PermanentBlobCacheSizeLimitTest.putBlobsIntoBlobServer(server);
            for (int i = 0; i < 3; ++i) {
                PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[i].jobId, blobs[i].blobKey, blobs[i].data);
                blobs[i].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[i].jobId, blobs[i].blobKey);
                Assert.assertTrue((boolean)blobs[i].blobFile.exists());
            }
            Assert.assertFalse((boolean)blobs[0].blobFile.exists());
            Assert.assertTrue((boolean)blobs[1].blobFile.exists());
            PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[1].jobId, blobs[1].blobKey, blobs[1].data);
            blobs[0].blobKey = BlobServerPutTest.put((BlobService)server, blobs[0].jobId, blobs[0].data, PermanentBlobCacheSizeLimitTest.BLOB_TYPE);
            PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[0].jobId, blobs[0].blobKey, blobs[0].data);
            blobs[0].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[0].jobId, blobs[0].blobKey);
            Assert.assertTrue((boolean)blobs[0].blobFile.exists());
            Assert.assertTrue((boolean)blobs[1].blobFile.exists());
            Assert.assertFalse((boolean)blobs[2].blobFile.exists());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTrackSizeLimitAndDeleteExcessConcurrently() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        Configuration config = new Configuration();
        try (BlobServer server = new BlobServer(config, TEMPORARY_FOLDER.newFolder(), (BlobStore)new VoidBlobStore());
             BlobCacheService cache = PermanentBlobCacheSizeLimitTest.initBlobCacheServiceWithSizeLimit(config, new InetSocketAddress("localhost", server.getPort()));){
            server.start();
            BlobInfo[] blobs = PermanentBlobCacheSizeLimitTest.putBlobsIntoBlobServer(server);
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(3);
            int i = 0;
            while (i < 3) {
                int idx = i++;
                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[idx].jobId, blobs[idx].blobKey, blobs[idx].data);
                        blobs[idx].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[idx].jobId, blobs[idx].blobKey);
                        return null;
                    }
                    catch (IOException e) {
                        throw new CompletionException(e);
                    }
                }, executor);
                futures.add(future);
            }
            FutureUtils.ConjunctFuture conjunctFuture = FutureUtils.waitForAll(futures);
            conjunctFuture.get();
            int exists = 0;
            int nonExists = 0;
            for (int i2 = 0; i2 < 3; ++i2) {
                if (blobs[i2].blobFile.exists()) {
                    ++exists;
                    continue;
                }
                ++nonExists;
            }
            Assert.assertEquals((long)2L, (long)exists);
            Assert.assertEquals((long)1L, (long)nonExists);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws IOException {
        BlobInfo[] blobs = new BlobInfo[3];
        for (int i = 0; i < 3; ++i) {
            blobs[i] = new BlobInfo();
            blobs[i].blobKey = BlobServerPutTest.put((BlobService)server, blobs[i].jobId, blobs[i].data, PermanentBlobCacheSizeLimitTest.BLOB_TYPE);
            Assert.assertNotNull((Object)blobs[i].blobKey);
        }
        return blobs;
    }

    private static BlobCacheService initBlobCacheServiceWithSizeLimit(Configuration config, @Nullable InetSocketAddress serverAddress) throws IOException {
        PermanentBlobCache permanentBlobCache = new PermanentBlobCache(config, TEMPORARY_FOLDER.newFolder(), (BlobView)new VoidBlobStore(), serverAddress, new BlobCacheSizeTracker(20000L));
        TransientBlobCache transientBlobCache = new TransientBlobCache(config, TEMPORARY_FOLDER.newFolder(), serverAddress);
        return new BlobCacheService(permanentBlobCache, transientBlobCache);
    }

    private static void readFileAndVerifyContent(BlobService blobService, JobID jobId, BlobKey blobKey, byte[] expected) throws IOException {
        Assert.assertNotNull((Object)jobId);
        Assert.assertNotNull((Object)blobKey);
        Assert.assertTrue((boolean)(blobKey instanceof PermanentBlobKey));
        byte[] target = blobService.getPermanentBlobService().readFile(jobId, (PermanentBlobKey)blobKey);
        Assert.assertArrayEquals((byte[])expected, (byte[])target);
    }

    private static File getFile(BlobCacheService blobCacheService, JobID jobId, BlobKey blobKey) throws IOException {
        return blobCacheService.getPermanentBlobService().getStorageLocation(jobId, blobKey);
    }

    private static class BlobInfo {
        private final JobID jobId = new JobID();
        private final byte[] data = new byte[10000];
        private BlobKey blobKey;
        private File blobFile;

        private BlobInfo() {
            RANDOM.nextBytes(this.data);
        }
    }
}

