/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.test.client.hotrod;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeDataSupport;
import org.infinispan.arquillian.core.InfinispanResource;
import org.infinispan.arquillian.core.RemoteInfinispanServer;
import org.infinispan.arquillian.core.RunningServer;
import org.infinispan.arquillian.core.WithRunningServer;
import org.infinispan.arquillian.utils.MBeanServerConnectionProvider;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.StreamingRemoteCache;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.test.Exceptions;
import org.jboss.arquillian.container.test.api.ContainerController;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=Arquillian.class)
@WithRunningServer(value={@RunningServer(name="default-clustered-manual-1"), @RunningServer(name="default-clustered-manual-2")})
public class HotRodRemoteStreamingIT {
    private static final String SERVER_1_NAME = "default-clustered-manual-1";
    private static final String SERVER_2_NAME = "default-clustered-manual-2";
    private static final String USED_MEMORY_KEY = "used";
    @InfinispanResource(value="default-clustered-manual-1")
    private static RemoteInfinispanServer server1;
    private static RemoteCacheManager rcm1;
    @InfinispanResource(value="default-clustered-manual-2")
    private static RemoteInfinispanServer server2;
    private static RemoteCacheManager rcm2;
    private StreamingRemoteCache<Object> src1;
    private StreamingRemoteCache<Object> src2;
    @ArquillianResource
    private ContainerController controller;
    private Configuration conf1;
    private Configuration conf2;
    private Boolean finalized = new Boolean(false);
    private static Random random;

    @Before
    public void setUp() {
        if (this.conf1 == null || this.conf2 == null) {
            this.conf1 = new ConfigurationBuilder().addServer().host(server1.getHotrodEndpoint().getInetAddress().getHostName()).port(server1.getHotrodEndpoint().getPort()).build();
            this.conf2 = new ConfigurationBuilder().addServer().host(server2.getHotrodEndpoint().getInetAddress().getHostName()).port(server2.getHotrodEndpoint().getPort()).build();
        }
        rcm1 = new RemoteCacheManager(this.conf1);
        rcm2 = new RemoteCacheManager(this.conf2);
        this.src1 = rcm1.getCache("streamingTestCache").streaming();
        this.src2 = rcm2.getCache("streamingTestCache").streaming();
    }

    private void checkServers() {
        if (!this.controller.isStarted(SERVER_1_NAME)) {
            this.controller.start(SERVER_1_NAME);
        }
        if (!this.controller.isStarted(SERVER_2_NAME)) {
            this.controller.start(SERVER_2_NAME);
        }
    }

    @Test
    public void testBasicFunctionality() throws Exception {
        int streamCount = 100;
        ArrayList<RandomInserter> inserterList = new ArrayList<RandomInserter>();
        for (int i = 0; i < streamCount; ++i) {
            inserterList.add(new RandomInserter(random.nextLong(), i % 2 == 0 ? this.src1 : this.src2, i % 2 == 0 ? this.src2 : this.src1));
        }
        boolean result = true;
        while (result) {
            Collections.shuffle(inserterList);
            for (RandomInserter r : inserterList) {
                result = result && r.process();
            }
            result = !result;
        }
    }

    @Test
    public void testGCOpenStream() throws Exception {
        Long seed = random.nextLong();
        RandomInserter ri = new RandomInserter(seed, this.src1, this.src1);
        ri.process();
        ri.process();
        ri = null;
        System.gc();
        for (int i = 0; i < 10 && !this.isFinalized(); ++i) {
            Thread.sleep(1000L);
        }
        if (!this.isFinalized()) {
            Assert.fail((String)"Testing object was not garbage collected in time limit");
        }
        Assert.assertNull((String)"Partial object found in cache1", (Object)this.src1.get((Object)seed));
        Assert.assertNull((String)"Partial object found in cache2", (Object)this.src2.get((Object)seed));
    }

    @Test
    public void testNegativeOneInStream() throws IOException {
        Long seed = random.nextLong();
        byte[] ba = new byte[1000];
        random.nextBytes(ba);
        for (int i = 1; i < 100; ++i) {
            ba[i * 10] = -1;
        }
        OutputStream out = this.src1.put((Object)seed);
        for (int i = 0; i < 1000; ++i) {
            out.write(ba[i]);
        }
        out.close();
        InputStream in = this.src2.get((Object)seed);
        for (int i = 0; i < 1000; ++i) {
            Assert.assertEquals((long)ba[i], (long)((byte)in.read()));
        }
        in.close();
    }

    @Test
    public void testKeyConcurency() throws IOException {
        int val1 = 123;
        int val2 = 234;
        Long seed = random.nextLong();
        OutputStream out1 = this.src1.put((Object)seed);
        out1.write(val1);
        OutputStream out2 = this.src2.put((Object)seed);
        out2.write(val2);
        out1.close();
        out2.close();
        InputStream in = this.src1.get((Object)seed);
        Assert.assertEquals((String)"", (long)val2, (long)in.read());
        in.close();
        seed = random.nextLong();
        out1 = this.src1.putIfAbsent((Object)seed);
        out1.write(val1);
        out2 = this.src2.putIfAbsent((Object)seed);
        out2.write(val2);
        out1.close();
        out2.close();
        in = this.src1.get((Object)seed);
        Assert.assertEquals((String)"", (long)val1, (long)in.read());
        in.close();
    }

    @Test
    public void RCMStopTest() throws IOException, InterruptedException {
        byte[] value = new byte[100];
        byte[] ret = new byte[100];
        random.nextBytes(value);
        Long key = random.nextLong();
        OutputStream out = this.src1.put((Object)key);
        out.write(value, 0, 50);
        rcm1.stop();
        Exceptions.expectException(IOException.class, () -> {
            out.write(value, 50, 50);
            out.close();
        });
        Exceptions.expectException(TransportException.class, () -> this.src1.get((Object)key));
        Exceptions.expectException(TransportException.class, () -> this.src1.put((Object)key));
        rcm1.start();
        this.src1 = rcm1.getCache("streamingTestCache").streaming();
        Assert.assertEquals(null, (Object)this.src1.get((Object)key));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="ISPN-8724")
    public void serverShutdownTest() throws IOException, InterruptedException {
        byte[] value = new byte[5000];
        random.nextBytes(value);
        try {
            for (int i = 0; i < 10; ++i) {
                Long key = random.nextLong();
                OutputStream out = this.src1.put((Object)key);
                out.write(value);
                out.write(value);
                this.stopServer(i);
                try {
                    out.write(value);
                    out.close();
                }
                catch (Exception e) {
                    this.startServer(i);
                    if (this.src1.get((Object)key) == null) continue;
                    Assert.fail((String)"Failed key found in te cache");
                }
                this.startServer(i);
            }
        }
        finally {
            this.checkServers();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="ISPN-8724")
    public void serverKillTest() throws IOException, InterruptedException {
        byte[] value = new byte[5000];
        random.nextBytes(value);
        try {
            for (int i = 0; i < 2; ++i) {
                Long key = random.nextLong();
                OutputStream out = this.src1.put((Object)key);
                out.write(value);
                out.write(value);
                this.killServer(i);
                try {
                    out.write(value);
                    out.close();
                }
                catch (Exception e) {
                    this.startServer(i);
                    if (this.src1.get((Object)key) == null) continue;
                    Assert.fail((String)"Failed key found in te cache");
                }
                this.startServer(i);
            }
        }
        finally {
            this.checkServers();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void performanceTest() throws Exception {
        try {
            byte[] ba = new byte[0xA00000];
            byte[] baForStream = new byte[1024];
            this.controller.stop(SERVER_2_NAME);
            MBeanServerConnectionProvider provider = new MBeanServerConnectionProvider(server1.getHotrodEndpoint().getInetAddress().getHostName(), 9990);
            RemoteCache cache = rcm1.getCache("streamingTestCache");
            CompositeDataSupport attribute = (CompositeDataSupport)provider.getConnection().getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage");
            Runtime runtime = Runtime.getRuntime();
            Long averageMemoryConsumptionStatistic = new Long(0L);
            Long totalMemoryConsumptionStatistic = new Long(0L);
            for (int top = 0; top < 10; ++top) {
                System.gc();
                MemoryUsage serverMem = new MemoryUsage((long)((Long)attribute.get(USED_MEMORY_KEY)));
                MemoryUsage clientMem = new MemoryUsage(runtime.totalMemory() - runtime.freeMemory());
                for (int i = 0; i < 10; ++i) {
                    Long key = random.nextLong();
                    random.nextBytes(ba);
                    cache.put((Object)key.toString(), (Object)ba);
                    serverMem.update((Long)((CompositeDataSupport)provider.getConnection().getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage")).get(USED_MEMORY_KEY));
                    clientMem.update(runtime.totalMemory() - runtime.freeMemory());
                    System.gc();
                    cache.remove((Object)key.toString());
                }
                System.gc();
                MemoryUsage serverMemStreaming = new MemoryUsage((long)((Long)attribute.get(USED_MEMORY_KEY)));
                MemoryUsage clientMemStreaming = new MemoryUsage(runtime.totalMemory() - runtime.freeMemory());
                StreamingRemoteCache streamingCache = cache.streaming();
                for (int i = 0; i < 10; ++i) {
                    Long key = random.nextLong();
                    OutputStream out = streamingCache.put((Object)key.toString());
                    for (int y = 0; y < 10240; ++y) {
                        random.nextBytes(baForStream);
                        out.write(baForStream);
                    }
                    out.close();
                    serverMemStreaming.update((Long)((CompositeDataSupport)provider.getConnection().getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage")).get(USED_MEMORY_KEY));
                    clientMemStreaming.update(runtime.totalMemory() - runtime.freeMemory());
                    System.gc();
                    cache.remove((Object)key.toString());
                }
                Long averageMemoryConsumptionDifference = (clientMemStreaming.getAverage() - clientMemStreaming.getMin()) / ((clientMem.getAverage() - clientMem.getMin()) / 100L);
                Long totalMemoryConsumptionDifference = (clientMemStreaming.getMax() - clientMemStreaming.getMin()) / ((clientMem.getMax() - clientMem.getMin()) / 100L);
                averageMemoryConsumptionStatistic = averageMemoryConsumptionStatistic == 0L ? averageMemoryConsumptionDifference : Long.valueOf((averageMemoryConsumptionStatistic + averageMemoryConsumptionDifference) / 2L);
                totalMemoryConsumptionStatistic = totalMemoryConsumptionStatistic == 0L ? totalMemoryConsumptionDifference : Long.valueOf((totalMemoryConsumptionStatistic + totalMemoryConsumptionDifference) / 2L);
            }
            Assert.assertTrue((String)("Average memory consumption difference outside limit, max 15, actual " + averageMemoryConsumptionStatistic), (averageMemoryConsumptionStatistic < 15L ? 1 : 0) != 0);
            Assert.assertTrue((String)("Total memory consumption difference outside limit, max 30, actual " + totalMemoryConsumptionStatistic), (totalMemoryConsumptionStatistic < 30L ? 1 : 0) != 0);
        }
        finally {
            this.checkServers();
        }
    }

    private boolean isFinalized() {
        return this.finalized;
    }

    private void setFinalized(boolean finalized) {
        this.finalized = finalized;
    }

    private void killServer(int i) {
        if (i % 2 == 0) {
            this.controller.kill(SERVER_1_NAME);
        } else {
            this.controller.kill(SERVER_2_NAME);
        }
        rcm1 = null;
        rcm2 = null;
    }

    private void stopServer(int i) {
        if (i % 2 == 0) {
            this.controller.stop(SERVER_1_NAME);
        } else {
            this.controller.stop(SERVER_2_NAME);
        }
        rcm1 = null;
        rcm2 = null;
    }

    private void startServer(int i) {
        if (i % 2 == 0) {
            this.controller.start(SERVER_1_NAME);
        } else {
            this.controller.start(SERVER_2_NAME);
        }
        this.setUp();
    }

    static {
        random = new Random();
    }

    private class MemoryUsage {
        private Long min;
        private Long max;
        private Long average;

        public MemoryUsage(Long startValue) {
            this(startValue, startValue, startValue);
        }

        public MemoryUsage(Long min, Long max, Long average) {
            this.min = min;
            this.max = max;
            this.average = average;
        }

        public void update(Long value) {
            this.setMax(value);
            this.setMin(value);
            this.addToAverage(value);
        }

        public Long getMax() {
            return this.max;
        }

        private void setMax(Long max) {
            if (this.max < max) {
                this.max = max;
            }
        }

        public Long getMin() {
            return this.min;
        }

        private void setMin(Long min) {
            if (this.min > min) {
                this.min = min;
            }
        }

        public Long getAverage() {
            return this.average;
        }

        private void addToAverage(Long average) {
            this.average = (this.average + average) / 2L;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("MemoryStats:\n");
            sb.append("Max memory: " + this.getMax() + "\n");
            sb.append("Min memory: " + this.getMin() + "\n");
            sb.append("Avg memory: " + this.getAverage() + "\n");
            return sb.toString();
        }
    }

    private class RandomInserter {
        private Long seed;
        private int size = HotRodRemoteStreamingIT.access$000().nextInt(1000000);
        private Random randForData;
        private StreamingRemoteCache<Object> cache1;
        private StreamingRemoteCache<Object> cache2;
        private OutputStream outStream;
        private InputStream inStream;
        private int count = 0;
        private Boolean state;

        public RandomInserter(Long seed, StreamingRemoteCache<Object> cache1, StreamingRemoteCache<Object> cache2) {
            this.seed = seed != null ? seed.longValue() : random.nextLong();
            this.cache1 = cache1;
            this.cache2 = cache2;
            this.randForData = new Random(this.seed);
        }

        public boolean process() throws Exception {
            if (this.count == this.size && !this.state.booleanValue()) {
                return true;
            }
            int randomInt = random.nextInt(100);
            if (this.state == null) {
                this.state = true;
                this.outStream = this.cache1.put((Object)this.seed);
            }
            if (this.state.booleanValue()) {
                if (randomInt + this.count > this.size) {
                    randomInt = this.size - this.count;
                }
                byte[] arr = new byte[randomInt];
                this.randomBytes(arr, arr.length);
                this.outStream.write(arr);
                this.count += arr.length;
            } else {
                byte[] arr = new byte[randomInt];
                int ret = this.inStream.read(arr);
                byte[] fromrand = new byte[arr.length];
                this.randomBytes(fromrand, ret);
                if (ret < arr.length) {
                    for (int i = ret; i < fromrand.length; ++i) {
                        fromrand[i] = 0;
                    }
                }
                if (!Arrays.equals(arr, fromrand)) {
                    throw new Exception("Data returned from stream were not correct: \n\n" + Arrays.toString(arr) + "\n\n" + Arrays.toString(fromrand));
                }
                this.count += ret;
            }
            if (this.count == this.size) {
                if (this.state.booleanValue()) {
                    this.state = false;
                    this.outStream.close();
                    this.inStream = this.cache2.get((Object)this.seed);
                    this.randForData = new Random(this.seed);
                    this.count = 0;
                } else {
                    this.inStream.close();
                    return true;
                }
            }
            return false;
        }

        public void finalize() throws Throwable {
            super.finalize();
            HotRodRemoteStreamingIT.this.setFinalized(true);
        }

        private void randomBytes(byte[] ba, int count) {
            for (int i = 0; i < count; ++i) {
                ba[i] = (byte)this.randForData.nextInt();
            }
        }
    }
}

