/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.util.MutableObjectIterator;

public class AsynchronousPartialSorter<E>
extends UnilateralSortMerger<E> {
    private BufferQueueIterator bufferIterator;

    public AsynchronousPartialSorter(MemoryManager memoryManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction) throws IOException, MemoryAllocationException {
        super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction, 1, 2, 0.0f, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        try {
            if (this.bufferIterator != null) {
                this.bufferIterator.close();
                this.bufferIterator = null;
            }
        }
        finally {
            super.close();
        }
    }

    @Override
    protected UnilateralSortMerger.ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        this.bufferIterator = new BufferQueueIterator(queues);
        this.setResultIterator(this.bufferIterator);
        return null;
    }

    private final class BufferQueueIterator
    implements MutableObjectIterator<E> {
        private final UnilateralSortMerger.CircularQueues<E> queues;
        private UnilateralSortMerger.CircularElement<E> currentElement;
        private MutableObjectIterator<E> currentIterator;
        private volatile boolean closed = false;

        protected BufferQueueIterator(UnilateralSortMerger.CircularQueues<E> queues) {
            this.queues = queues;
        }

        public E next(E reuse) throws IOException {
            Object result;
            if (this.currentIterator != null && (result = this.currentIterator.next(reuse)) != null) {
                return result;
            }
            if (this.closed) {
                throw new IllegalStateException("The sorter has been closed.");
            }
            if (AsynchronousPartialSorter.this.iteratorException != null) {
                throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
            }
            while (this.currentElement != UnilateralSortMerger.endMarker()) {
                if (this.currentElement != null) {
                    this.currentElement.buffer.reset();
                    this.queues.empty.add(this.currentElement);
                }
                try {
                    this.currentElement = null;
                    while (!this.closed && this.currentElement == null) {
                        this.currentElement = this.queues.spill.poll(1000L, TimeUnit.MILLISECONDS);
                    }
                    if (AsynchronousPartialSorter.this.iteratorException != null) {
                        throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
                    }
                    if (this.currentElement == UnilateralSortMerger.endMarker()) {
                        this.releaseSortBuffers();
                        return null;
                    }
                    if (this.currentElement == UnilateralSortMerger.spillingMarker()) {
                        this.currentElement = null;
                        continue;
                    }
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Iterator was interrupted getting the next sortedBuffer.");
                }
                this.currentIterator = this.currentElement.buffer.getIterator();
                result = this.currentIterator.next(reuse);
                if (result != null) {
                    return result;
                }
                this.currentIterator = null;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            BufferQueueIterator bufferQueueIterator = this;
            synchronized (bufferQueueIterator) {
                if (this.closed) {
                    return;
                }
                this.closed = true;
            }
            if (this.currentElement != null) {
                this.queues.empty.add(this.currentElement);
                this.currentElement = null;
            }
            if (this.currentIterator != null) {
                this.currentIterator = null;
            }
        }

        private final void releaseSortBuffers() {
            while (!this.queues.empty.isEmpty()) {
                UnilateralSortMerger.CircularElement elem = (UnilateralSortMerger.CircularElement)this.queues.empty.poll();
                if (elem == null) continue;
                InMemorySorter sorter = elem.buffer;
                List<MemorySegment> segments = sorter.dispose();
                AsynchronousPartialSorter.this.memoryManager.release(segments);
            }
        }
    }
}

