/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId;
import org.apache.flink.runtime.io.network.partition.hybrid.HsDataView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionConsumerInternalOperations;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class HsFileDataManager
implements Runnable,
BufferRecycler {
    private static final Logger LOG = LoggerFactory.getLogger(HsFileDataManager.class);
    private final ScheduledExecutorService ioExecutor;
    private final int maxRequestedBuffers;
    private final Duration bufferRequestTimeout;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final CompletableFuture<?> releaseFuture = new CompletableFuture();
    private final BatchShuffleReadBufferPool bufferPool;
    private final Path dataFilePath;
    private final HsFileDataIndex dataIndex;
    private final HsSubpartitionFileReader.Factory fileReaderFactory;
    private final HybridShuffleConfiguration hybridShuffleConfiguration;
    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
    @GuardedBy(value="lock")
    private final Set<HsSubpartitionFileReader> allReaders = new HashSet<HsSubpartitionFileReader>();
    @GuardedBy(value="lock")
    private boolean isRunning;
    @GuardedBy(value="lock")
    private volatile int numRequestedBuffers;
    @GuardedBy(value="lock")
    private volatile boolean isReleased;
    @GuardedBy(value="lock")
    private FileChannel dataFileChannel;

    public HsFileDataManager(BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, HsFileDataIndex dataIndex, Path dataFilePath, HsSubpartitionFileReader.Factory fileReaderFactory, HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.fileReaderFactory = fileReaderFactory;
        this.hybridShuffleConfiguration = (HybridShuffleConfiguration)Preconditions.checkNotNull((Object)hybridShuffleConfiguration);
        this.dataIndex = (HsFileDataIndex)Preconditions.checkNotNull((Object)dataIndex);
        this.dataFilePath = (Path)Preconditions.checkNotNull((Object)dataFilePath);
        this.bufferPool = (BatchShuffleReadBufferPool)Preconditions.checkNotNull((Object)bufferPool);
        this.ioExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)ioExecutor);
        this.maxRequestedBuffers = hybridShuffleConfiguration.getMaxRequestedBuffers();
        this.bufferRequestTimeout = (Duration)Preconditions.checkNotNull((Object)hybridShuffleConfiguration.getBufferRequestTimeout());
    }

    public void setup() {
        this.bufferPool.initialize();
    }

    @Override
    public synchronized void run() {
        int numBuffersRead = this.tryRead();
        this.endCurrentRoundOfReading(numBuffersRead);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HsDataView registerNewConsumer(int subpartitionId, HsConsumerId consumerId, HsSubpartitionConsumerInternalOperations operation) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"HsFileDataManager is already released.");
            this.lazyInitialize();
            HsSubpartitionFileReader subpartitionReader = this.fileReaderFactory.createFileReader(subpartitionId, consumerId, this.dataFileChannel, operation, this.dataIndex, this.hybridShuffleConfiguration.getMaxBuffersReadAhead(), this::releaseSubpartitionReader, this.headerBuf);
            this.allReaders.add(subpartitionReader);
            this.mayTriggerReading();
            return subpartitionReader;
        }
    }

    public void closeDataIndexAndDeleteShuffleFile() {
        this.dataIndex.close();
        IOUtils.deleteFileQuietly((Path)this.dataFilePath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileReader) {
        Object object = this.lock;
        synchronized (object) {
            this.removeSubpartitionReaders(Collections.singleton(subpartitionFileReader));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void release() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            ArrayList<HsSubpartitionFileReader> pendingReaders = new ArrayList<HsSubpartitionFileReader>(this.allReaders);
            this.mayNotifyReleased();
            this.failSubpartitionReaders(pendingReaders, new IllegalStateException("Result partition has been already released."));
            this.releaseFuture.thenRun(this::closeDataIndexAndDeleteShuffleFile);
        }
    }

    private int tryRead() {
        Queue<MemorySegment> buffers;
        Queue<HsSubpartitionFileReader> availableReaders = this.prepareAndGetAvailableReaders();
        if (availableReaders.isEmpty()) {
            return 0;
        }
        try {
            buffers = this.allocateBuffers();
        }
        catch (Exception exception) {
            this.failSubpartitionReaders(availableReaders, exception);
            LOG.error("Failed to request buffers for data reading.", (Throwable)exception);
            return 0;
        }
        int numBuffersAllocated = buffers.size();
        if (numBuffersAllocated <= 0) {
            return 0;
        }
        this.readData(availableReaders, buffers);
        int numBuffersRead = numBuffersAllocated - buffers.size();
        this.releaseBuffers(buffers);
        return numBuffersRead;
    }

    /*
     * Unable to fully structure code
     */
    private Queue<MemorySegment> allocateBuffers() throws Exception {
        timeoutTime = this.getBufferRequestTimeoutTime();
        do lbl-1000:
        // 3 sources

        {
            if (!(buffers = this.bufferPool.requestBuffers()).isEmpty()) {
                return new ArrayDeque<MemorySegment>(buffers);
            }
            Preconditions.checkState((boolean)(this.isReleased == false), (Object)"Result partition has been already released.");
            if (System.nanoTime() < timeoutTime) ** GOTO lbl-1000
            timeoutTime = this.getBufferRequestTimeoutTime();
        } while (System.nanoTime() < timeoutTime);
        throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", new Object[]{TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mayTriggerReading() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.isRunning && !this.allReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= this.maxRequestedBuffers && this.numRequestedBuffers < this.bufferPool.getAverageBuffersPerRequester()) {
                this.isRunning = true;
                this.ioExecutor.execute(() -> {
                    try {
                        this.run();
                    }
                    catch (Throwable throwable) {
                        FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
                    }
                });
            }
        }
    }

    @GuardedBy(value="lock")
    private void mayNotifyReleased() {
        assert (Thread.holdsLock(this.lock));
        if (this.isReleased && this.allReaders.isEmpty()) {
            this.releaseFuture.complete(null);
        }
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toNanos();
    }

    private void releaseBuffers(Queue<MemorySegment> buffers) {
        if (!buffers.isEmpty()) {
            try {
                this.bufferPool.recycle(buffers);
                buffers.clear();
            }
            catch (Throwable throwable) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Queue<HsSubpartitionFileReader> prepareAndGetAvailableReaders() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return new ArrayDeque<HsSubpartitionFileReader>();
            }
            for (HsSubpartitionFileReader reader : this.allReaders) {
                reader.prepareForScheduling();
            }
            return new PriorityQueue<HsSubpartitionFileReader>(this.allReaders);
        }
    }

    private void readData(Queue<HsSubpartitionFileReader> availableReaders, Queue<MemorySegment> buffers) {
        while (!availableReaders.isEmpty() && !buffers.isEmpty()) {
            HsSubpartitionFileReader subpartitionReader = availableReaders.poll();
            try {
                subpartitionReader.readBuffers(buffers, this);
            }
            catch (IOException throwable) {
                this.failSubpartitionReaders(Collections.singletonList(subpartitionReader), throwable);
                LOG.debug("Failed to read shuffle data.", (Throwable)throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failSubpartitionReaders(Collection<HsSubpartitionFileReader> readers, Throwable failureCause) {
        Iterator<HsSubpartitionFileReader> iterator = this.lock;
        synchronized (iterator) {
            this.removeSubpartitionReaders(readers);
        }
        for (HsSubpartitionFileReader reader : readers) {
            reader.fail(failureCause);
        }
    }

    @GuardedBy(value="lock")
    private void removeSubpartitionReaders(Collection<HsSubpartitionFileReader> readers) {
        this.allReaders.removeAll(readers);
        if (this.allReaders.isEmpty()) {
            this.bufferPool.unregisterRequester(this);
            this.closeFileChannel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void endCurrentRoundOfReading(int numBuffersRead) {
        Object object = this.lock;
        synchronized (object) {
            this.numRequestedBuffers += numBuffersRead;
            this.isRunning = false;
            this.mayNotifyReleased();
        }
        if (numBuffersRead == 0) {
            this.ioExecutor.schedule(this::mayTriggerReading, 5L, TimeUnit.MILLISECONDS);
        } else {
            this.mayTriggerReading();
        }
    }

    @GuardedBy(value="lock")
    private void lazyInitialize() throws IOException {
        assert (Thread.holdsLock(this.lock));
        try {
            if (this.allReaders.isEmpty()) {
                this.dataFileChannel = this.openFileChannel(this.dataFilePath);
                this.bufferPool.registerRequester(this);
            }
        }
        catch (IOException exception) {
            if (this.allReaders.isEmpty()) {
                this.bufferPool.unregisterRequester(this);
                this.closeFileChannel();
            }
            throw exception;
        }
    }

    private FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    @GuardedBy(value="lock")
    private void closeFileChannel() {
        assert (Thread.holdsLock(this.lock));
        IOUtils.closeQuietly((AutoCloseable)this.dataFileChannel);
        this.dataFileChannel = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        Object object = this.lock;
        synchronized (object) {
            this.bufferPool.recycle(segment);
            --this.numRequestedBuffers;
            this.mayTriggerReading();
        }
    }
}

