/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileSegmentReader;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriterReaderTest;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.FileSegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.QueuingCallback;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.IOUtils;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class BufferFileWriterFileSegmentReaderTest {
    private static final int BUFFER_SIZE = 32768;
    private static final BufferRecycler BUFFER_RECYCLER = FreeingBufferRecycler.INSTANCE;
    private static final Random random = new Random();
    private static final IOManager ioManager = new IOManagerAsync();
    private BufferFileWriter writer;
    private AsynchronousBufferFileSegmentReader reader;
    private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue();

    BufferFileWriterFileSegmentReaderTest() {
    }

    @AfterAll
    static void shutdown() throws Exception {
        ioManager.close();
    }

    @BeforeEach
    void setUpWriterAndReader() {
        FileIOChannel.ID channel = ioManager.createChannel();
        try {
            this.writer = ioManager.createBufferFileWriter(channel);
            this.reader = (AsynchronousBufferFileSegmentReader)ioManager.createBufferFileSegmentReader(channel, (RequestDoneCallback)new QueuingCallback(this.returnedFileSegments));
        }
        catch (IOException e) {
            this.tearDownWriterAndReader();
            Assertions.fail((String)"Failed to setup writer and reader.");
        }
    }

    @AfterEach
    void tearDownWriterAndReader() {
        if (this.writer != null) {
            if (!this.writer.isClosed()) {
                IOUtils.closeQuietly(() -> ((BufferFileWriter)this.writer).close());
            }
            this.writer.deleteChannel();
        }
        if (this.reader != null) {
            if (!this.reader.isClosed()) {
                IOUtils.closeQuietly(() -> ((AsynchronousBufferFileSegmentReader)this.reader).close());
            }
            this.reader.deleteChannel();
        }
        this.returnedFileSegments.clear();
    }

    @Test
    void testWriteRead() throws IOException, InterruptedException {
        FileSegment fileSegment;
        int i;
        int numBuffers = 1024;
        int currentNumber = 0;
        int minBufferSize = 8192;
        for (i = 0; i < numBuffers; ++i) {
            Buffer buffer = this.createBuffer();
            int size = this.getNextMultipleOf(this.getRandomNumberInRange(8192, 32768), 4);
            currentNumber = BufferFileWriterReaderTest.fillBufferWithAscendingNumbers(buffer, currentNumber, size);
            this.writer.writeBlock((Object)buffer);
        }
        this.writer.close();
        for (i = 0; i < numBuffers; ++i) {
            Assertions.assertThat((boolean)this.reader.hasReachedEndOfFile()).isFalse();
            this.reader.read();
        }
        CountDownLatch sync = new CountDownLatch(1);
        NotificationListener listener = sync::countDown;
        if (this.reader.registerAllRequestsProcessedListener(listener)) {
            sync.await();
        }
        Assertions.assertThat((boolean)this.reader.hasReachedEndOfFile()).isTrue();
        ((AbstractCollectionAssert)Assertions.assertThat(this.returnedFileSegments).withFailMessage("Read less buffers than written.", new Object[0])).hasSize(numBuffers);
        currentNumber = 0;
        ByteBuffer buffer = ByteBuffer.allocate(32768);
        while ((fileSegment = this.returnedFileSegments.poll()) != null) {
            buffer.position(0);
            buffer.limit(fileSegment.getLength());
            fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
            NetworkBuffer buffer1 = new NetworkBuffer(MemorySegmentFactory.wrap((byte[])buffer.array()), BUFFER_RECYCLER);
            buffer1.setSize(fileSegment.getLength());
            currentNumber = BufferFileWriterReaderTest.verifyBufferFilledWithAscendingNumbers((Buffer)buffer1, currentNumber);
        }
        this.reader.close();
    }

    private int getRandomNumberInRange(int min, int max) {
        return random.nextInt(max - min + 1) + min;
    }

    private int getNextMultipleOf(int number, int multiple) {
        int mod = number % multiple;
        if (mod == 0) {
            return number;
        }
        return number + multiple - mod;
    }

    private Buffer createBuffer() {
        return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)32768), BUFFER_RECYCLER);
    }
}

