/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBuffer;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.junit.Assert;
import org.junit.Test;

public class PartitionSortedBufferTest {
    @Test
    public void testWriteAndReadSortBuffer() throws Exception {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int bufferPoolSize = 1000;
        Random random = new Random(1111L);
        Queue[] dataWritten = new Queue[numSubpartitions];
        Queue[] buffersRead = new Queue[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            dataWritten[i] = new ArrayDeque();
            buffersRead[i] = new ArrayDeque();
        }
        int[] numBytesWritten = new int[numSubpartitions];
        int[] numBytesRead = new int[numSubpartitions];
        Arrays.fill(numBytesWritten, 0);
        Arrays.fill(numBytesRead, 0);
        int totalBytesWritten = 0;
        SortBuffer sortBuffer = this.createSortBuffer(bufferPoolSize, bufferSize, numSubpartitions, PartitionSortedBufferTest.getRandomSubpartitionOrder(numSubpartitions));
        while (true) {
            Buffer.DataType dataType;
            int recordSize = random.nextInt(bufferSize * 4 - 1) + 1;
            byte[] bytes = new byte[recordSize];
            random.nextBytes(bytes);
            ByteBuffer record = ByteBuffer.wrap(bytes);
            int subpartition = random.nextInt(numSubpartitions);
            boolean isBuffer = random.nextBoolean();
            Buffer.DataType dataType2 = dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            if (!sortBuffer.append(record, subpartition, dataType)) {
                sortBuffer.finish();
                break;
            }
            record.rewind();
            dataWritten[subpartition].add(new DataAndType(record, dataType));
            int n = subpartition;
            numBytesWritten[n] = numBytesWritten[n] + recordSize;
            totalBytesWritten += recordSize;
        }
        while (sortBuffer.hasRemaining()) {
            MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            BufferWithChannel bufferAndChannel = sortBuffer.copyIntoSegment(readBuffer);
            int subpartition = bufferAndChannel.getChannelIndex();
            buffersRead[subpartition].add(bufferAndChannel.getBuffer());
            int n = subpartition;
            numBytesRead[n] = numBytesRead[n] + bufferAndChannel.getBuffer().readableBytes();
        }
        Assert.assertEquals((long)totalBytesWritten, (long)sortBuffer.numBytes());
        PartitionSortedBufferTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
    }

    /*
     * WARNING - void declaration
     */
    public static void checkWriteReadResult(int numSubpartitions, int[] numBytesWritten, int[] numBytesRead, Queue<DataAndType>[] dataWritten, Queue<Buffer>[] buffersRead) {
        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
            void var10_10;
            Assert.assertEquals((long)numBytesWritten[subpartitionIndex], (long)numBytesRead[subpartitionIndex]);
            ArrayList<DataAndType> eventsWritten = new ArrayList<DataAndType>();
            ArrayList<Buffer> eventsRead = new ArrayList<Buffer>();
            ByteBuffer subpartitionDataWritten = ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
            for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
                subpartitionDataWritten.put(dataAndType.data);
                dataAndType.data.rewind();
                if (!dataAndType.dataType.isEvent()) continue;
                eventsWritten.add(dataAndType);
            }
            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
            for (Buffer buffer : buffersRead[subpartitionIndex]) {
                subpartitionDataRead.put(buffer.getNioBufferReadable());
                if (buffer.isBuffer()) continue;
                eventsRead.add(buffer);
            }
            subpartitionDataWritten.flip();
            subpartitionDataRead.flip();
            Assert.assertEquals((Object)subpartitionDataWritten, (Object)subpartitionDataRead);
            Assert.assertEquals((long)eventsWritten.size(), (long)eventsRead.size());
            boolean bl = false;
            while (var10_10 < eventsWritten.size()) {
                Assert.assertEquals((Object)((DataAndType)eventsWritten.get((int)var10_10)).dataType, (Object)((Buffer)eventsRead.get((int)var10_10)).getDataType());
                Assert.assertEquals((Object)((DataAndType)eventsWritten.get((int)var10_10)).data, (Object)((Buffer)eventsRead.get((int)var10_10)).getNioBufferReadable());
                ++var10_10;
            }
        }
    }

    @Test
    public void testWriteReadWithEmptyChannel() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int numSubpartitions = 5;
        ByteBuffer[] subpartitionRecords = new ByteBuffer[]{ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null, ByteBuffer.allocate(1024)};
        SortBuffer sortBuffer = this.createSortBuffer(bufferPoolSize, bufferSize, numSubpartitions);
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            ByteBuffer record = subpartitionRecords[subpartition];
            if (record == null) continue;
            sortBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
            record.rewind();
        }
        sortBuffer.finish();
        this.checkReadResult(sortBuffer, subpartitionRecords[0], 0, bufferSize);
        ByteBuffer expected1 = subpartitionRecords[2].duplicate();
        expected1.limit(bufferSize);
        this.checkReadResult(sortBuffer, expected1.slice(), 2, bufferSize);
        ByteBuffer expected2 = subpartitionRecords[2].duplicate();
        expected2.position(bufferSize);
        this.checkReadResult(sortBuffer, expected2.slice(), 2, bufferSize);
        this.checkReadResult(sortBuffer, subpartitionRecords[4], 4, bufferSize);
    }

    private void checkReadResult(SortBuffer sortBuffer, ByteBuffer expectedBuffer, int expectedChannel, int bufferSize) {
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
        BufferWithChannel bufferWithChannel = sortBuffer.copyIntoSegment(segment);
        Assert.assertEquals((long)expectedChannel, (long)bufferWithChannel.getChannelIndex());
        Assert.assertEquals((Object)expectedBuffer, (Object)bufferWithChannel.getBuffer().getNioBufferReadable());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testWriteEmptyData() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        ByteBuffer record = ByteBuffer.allocate(1);
        record.position(1);
        sortBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected=IllegalStateException.class)
    public void testWriteFinishedSortBuffer() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        sortBuffer.finish();
        sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected=IllegalStateException.class)
    public void testWriteReleasedSortBuffer() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        sortBuffer.release();
        sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test
    public void testWriteMoreDataThanCapacity() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(bufferPoolSize, bufferSize, 1);
        for (int i = 1; i < bufferPoolSize; ++i) {
            this.appendAndCheckResult(sortBuffer, bufferSize, true, bufferSize * i, i, true);
        }
        int numRecords = bufferPoolSize - 1;
        this.appendAndCheckResult(sortBuffer, bufferSize, false, bufferSize * numRecords, numRecords, true);
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(bufferPoolSize, bufferSize, 1);
        this.appendAndCheckResult(sortBuffer, bufferPoolSize * bufferSize, false, 0L, 0L, false);
    }

    private void appendAndCheckResult(SortBuffer sortBuffer, int recordSize, boolean isSuccessful, long numBytes, long numRecords, boolean hasRemaining) throws IOException {
        ByteBuffer largeRecord = ByteBuffer.allocate(recordSize);
        Assert.assertEquals((Object)isSuccessful, (Object)sortBuffer.append(largeRecord, 0, Buffer.DataType.DATA_BUFFER));
        Assert.assertEquals((long)numBytes, (long)sortBuffer.numBytes());
        Assert.assertEquals((long)numRecords, (long)sortBuffer.numRecords());
        Assert.assertEquals((Object)hasRemaining, (Object)sortBuffer.hasRemaining());
    }

    @Test(expected=IllegalStateException.class)
    public void testReadUnfinishedSortBuffer() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertTrue((boolean)sortBuffer.hasRemaining());
        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
    }

    @Test(expected=IllegalStateException.class)
    public void testReadReleasedSortBuffer() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        sortBuffer.finish();
        Assert.assertTrue((boolean)sortBuffer.hasRemaining());
        sortBuffer.release();
        Assert.assertFalse((boolean)sortBuffer.hasRemaining());
        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
    }

    @Test(expected=IllegalStateException.class)
    public void testReadEmptySortBuffer() throws Exception {
        int bufferSize = 1024;
        SortBuffer sortBuffer = this.createSortBuffer(1, bufferSize, 1);
        sortBuffer.finish();
        Assert.assertFalse((boolean)sortBuffer.hasRemaining());
        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
    }

    @Test
    public void testReleaseSortBuffer() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int recordSize = (bufferPoolSize - 1) * bufferSize;
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        PartitionSortedBuffer sortBuffer = new PartitionSortedBuffer(new Object(), bufferPool, 1, bufferSize, bufferPoolSize, null);
        sortBuffer.append(ByteBuffer.allocate(recordSize), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertEquals((long)bufferPoolSize, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        Assert.assertTrue((boolean)sortBuffer.hasRemaining());
        Assert.assertEquals((long)1L, (long)sortBuffer.numRecords());
        Assert.assertEquals((long)recordSize, (long)sortBuffer.numBytes());
        sortBuffer.release();
        Assert.assertEquals((long)0L, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        Assert.assertFalse((boolean)sortBuffer.hasRemaining());
        Assert.assertEquals((long)0L, (long)sortBuffer.numRecords());
        Assert.assertEquals((long)0L, (long)sortBuffer.numBytes());
    }

    private SortBuffer createSortBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions) throws IOException {
        return this.createSortBuffer(bufferPoolSize, bufferSize, numSubpartitions, null);
    }

    private SortBuffer createSortBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions, int[] customReadOrder) throws IOException {
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        return new PartitionSortedBuffer(new Object(), bufferPool, numSubpartitions, bufferSize, bufferPoolSize, customReadOrder);
    }

    public static int[] getRandomSubpartitionOrder(int numSubpartitions) {
        Random random = new Random(1111L);
        int[] subpartitionReadOrder = new int[numSubpartitions];
        int shift = random.nextInt(numSubpartitions);
        for (int i = 0; i < numSubpartitions; ++i) {
            subpartitionReadOrder[i] = (i + shift) % numSubpartitions;
        }
        return subpartitionReadOrder;
    }

    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        DataAndType(ByteBuffer data, Buffer.DataType dataType) {
            this.data = data;
            this.dataType = dataType;
        }
    }
}

