/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.xsort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.xsort.BatchGroup;
import org.apache.drill.exec.physical.impl.xsort.PriorityQueueCopierWrapper;
import org.apache.drill.exec.physical.impl.xsort.SortImpl;
import org.apache.drill.exec.physical.impl.xsort.SpilledRun;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpilledRuns {
    private static final Logger logger = LoggerFactory.getLogger(SpilledRuns.class);
    private final SpillSet spillSet;
    private final LinkedList<SpilledRun> spilledRuns = Lists.newLinkedList();
    private final PriorityQueueCopierWrapper copierHolder;
    private BatchSchema schema;
    private final OperatorContext context;

    public SpilledRuns(OperatorContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper copier) {
        this.context = opContext;
        this.spillSet = spillSet;
        this.copierHolder = copier;
    }

    public void setSchema(BatchSchema schema) {
        this.schema = schema;
        for (BatchGroup batchGroup : this.spilledRuns) {
            batchGroup.setSchema(schema);
        }
        this.copierHolder.close();
    }

    public int size() {
        return this.spilledRuns.size();
    }

    public boolean hasSpilled() {
        return this.spillSet.hasSpilled();
    }

    public long getWriteBytes() {
        return this.spillSet.getWriteBytes();
    }

    public static List<BatchGroup> prepareSpillBatches(LinkedList<? extends BatchGroup> source, int spillCount) {
        ArrayList<BatchGroup> batchesToSpill = Lists.newArrayList();
        spillCount = Math.min(source.size(), spillCount);
        assert (spillCount > 0) : "Spill count to mergeAndSpill must not be zero";
        for (int i = 0; i < spillCount; ++i) {
            batchesToSpill.add(source.pollFirst());
        }
        return batchesToSpill;
    }

    public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
        this.spilledRuns.add(this.safeMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper));
        logger.trace("Completed spill: memory = {}", (Object)this.context.getAllocator().getAllocatedMemory());
    }

    public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount, VectorInitializer allocHelper) {
        SpilledRun run;
        long batchSize;
        long allocated = this.context.getAllocator().getAllocatedMemory();
        logger.trace("Merging {} on-disk runs, alloc. memory = {}, avail. memory = {}", new Object[]{targetCount, allocated, mergeMemoryPool -= this.context.getAllocator().getAllocatedMemory()});
        int mergeCount = 0;
        long mergeSize = 0L;
        Iterator iterator = this.spilledRuns.iterator();
        while (iterator.hasNext() && mergeSize + (batchSize = (run = (SpilledRun)iterator.next()).getBatchSize()) <= mergeMemoryPool) {
            mergeSize += batchSize;
            if (++mergeCount != targetCount) continue;
            break;
        }
        mergeCount = Math.max(mergeCount, 2);
        mergeCount = Math.min(mergeCount, this.spilledRuns.size());
        List<BatchGroup> batchesToSpill = SpilledRuns.prepareSpillBatches(this.spilledRuns, mergeCount);
        this.mergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
    }

    private SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
        try {
            return this.doMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
        }
        catch (UserException ue) {
            throw ue;
        }
        catch (Throwable ex) {
            throw UserException.resourceError(ex).message("External Sort encountered an error while spilling to disk", new Object[0]).build(logger);
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) throws Throwable {
        String outputFile = this.spillSet.getNextSpillFile();
        SpilledRun newGroup = null;
        VectorContainer dest = new VectorContainer();
        try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);){
            PriorityQueueCopierWrapper.BatchMerger merger = this.copierHolder.startMerge(this.schema, batchesToSpill, dest, spillBatchRowCount, allocHelper);
            try {
                newGroup = new SpilledRun(this.spillSet, outputFile, this.context.getAllocator());
                logger.trace("Spilling {} batches, into spill batches of {} rows, to {}", new Object[]{batchesToSpill.size(), spillBatchRowCount, outputFile});
                while (merger.next()) {
                    newGroup.spillBatch(dest);
                }
                this.context.injectChecked("spilling", IOException.class);
                newGroup.closeWriter();
                logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}", new Object[]{merger.getBatchCount(), merger.getEstBatchSize(), spillBatchRowCount, outputFile});
                newGroup.setBatchSize(merger.getEstBatchSize());
                SpilledRun spilledRun = newGroup;
                if (merger != null) {
                    merger.close();
                }
                return spilledRun;
            }
            catch (Throwable throwable) {
                if (merger != null) {
                    try {
                        merger.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
        }
        catch (Throwable e) {
            try {
                if (newGroup != null) {
                    AutoCloseables.close(e, newGroup);
                }
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw e;
        }
    }

    public SortImpl.SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount, VectorInitializer allocHelper) {
        LinkedList<? extends BatchGroup> allBatches = new LinkedList<BatchGroup>();
        allBatches.addAll(bufferedBatches);
        bufferedBatches.clear();
        allBatches.addAll(this.spilledRuns);
        this.spilledRuns.clear();
        logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}", (Object)allBatches.size(), (Object)this.context.getAllocator().getAllocatedMemory());
        return this.copierHolder.startMerge(this.schema, allBatches, container, mergeRowCount, allocHelper);
    }

    public void close() {
        if (this.spillSet.getWriteBytes() > 0L) {
            logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}", (Object)this.spillSet.getWriteBytes(), (Object)this.spillSet.getWriteBytes());
        }
        AutoCloseable[] autoCloseableArray = new AutoCloseable[3];
        autoCloseableArray[0] = () -> BatchGroup.closeAll(this.spilledRuns);
        autoCloseableArray[1] = this.copierHolder::close;
        autoCloseableArray[2] = this.spillSet::close;
        AutoCloseables.closeWithUserException(autoCloseableArray);
    }
}

