/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.io.EOFException;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ChannelViewsTest {
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_SHORT_LENGTH = 114;
    private static final int VALUE_LONG_LENGTH = 114688;
    private static final int NUM_PAIRS_SHORT = 1000000;
    private static final int NUM_PAIRS_LONG = 3000;
    private static final int MEMORY_SIZE = 0x100000;
    private static final int MEMORY_PAGE_SIZE = 65536;
    private static final int NUM_MEMORY_SEGMENTS = 3;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;

    ChannelViewsTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x100000L).setPageSize(65536).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testWriteReadSmallRecords() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    void testWriteAndReadLongRecords() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114688, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 3000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 3000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    void testReadTooMany() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        generator.next((Tuple2<Integer, String>)rec);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            Tuple2 cfr_ignored_0 = (Tuple2)serializer.deserialize((Object)readRec, (DataInputView)inView);
        }).withFailMessage("Expected an EOFException which did not occur.", new Object[0])).isInstanceOf(EOFException.class);
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    void testReadWithoutKnownBlockCount() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    void testWriteReadOneBufferOnly() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 1);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 1);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    @Test
    void testWriteReadNotAll() throws Exception {
        TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
        List memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, 65536);
        Tuple2 rec = new Tuple2();
        for (int i = 0; i < 1000000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.serialize((Object)rec, (DataOutputView)outView);
        }
        this.memoryManager.release((Collection)outView.close());
        memory = this.memoryManager.allocatePages((Object)this.parentTask, 3);
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel);
        ChannelReaderInputView inView = new ChannelReaderInputView(reader, memory, outView.getBlockCount(), true);
        generator.reset();
        Tuple2 readRec = new Tuple2();
        for (int i = 0; i < 500000; ++i) {
            generator.next((Tuple2<Integer, String>)rec);
            serializer.deserialize((Object)readRec, (DataInputView)inView);
            ChannelViewsTest.assertReadRecordMatchRegenerated((Tuple2<Integer, String>)readRec, (Tuple2<Integer, String>)rec);
        }
        this.memoryManager.release((Collection)inView.close());
        reader.deleteChannel();
    }

    private static void assertReadRecordMatchRegenerated(Tuple2<Integer, String> readRec, Tuple2<Integer, String> rec) {
        int k1 = (Integer)rec.f0;
        String v1 = (String)rec.f1;
        int k2 = (Integer)readRec.f0;
        String v2 = (String)readRec.f1;
        ((AbstractIntegerAssert)Assertions.assertThat((int)k2).withFailMessage("The re-generated and the read record do not match.", new Object[0])).isEqualTo(k1);
        ((AbstractStringAssert)Assertions.assertThat((String)v2).withFailMessage("The re-generated and the read record do not match.", new Object[0])).isEqualTo(v1);
    }
}

