/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.comparator.ProxyComparator;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;

public class PipelinedSorter
extends ExternalSorter {
    private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
    public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
    private static final int APPROX_HEADER_LENGTH = 150;
    private final int partitionBits;
    private static final int PARTITION = 0;
    private static final int KEYSTART = 1;
    private static final int VALSTART = 2;
    private static final int VALLEN = 3;
    private static final int NMETA = 4;
    private static final int METASIZE = 16;
    volatile Throwable sortSpillException = null;
    int numSpills = 0;
    private final int minSpillsForCombine;
    private final ProxyComparator hasher;
    private SortSpan span;
    private ByteBuffer largeBuffer;
    private final SpanMerger merger;
    private final ExecutorService sortmaster;
    private final ArrayList<TezSpillRecord> indexCacheList = new ArrayList();
    private int totalIndexCacheMemory;
    private int indexCacheMemoryLimit;

    public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException {
        super(outputContext, conf, numOutputs, initialMemoryAvailable);
        this.partitionBits = this.bitcount(this.partitions) + 1;
        float spillper = this.conf.getFloat("tez.runtime.sort.spill.percent", TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT_DEFAULT);
        int sortmb = this.availableMemoryMb;
        this.indexCacheMemoryLimit = this.conf.getInt("tez.runtime.index.cache.memory.limit.bytes", 0x100000);
        if (spillper > 1.0f || spillper <= 0.0f) {
            throw new IOException("Invalid \"tez.runtime.sort.spill.percent\": " + spillper);
        }
        if ((sortmb & 0x7FF) != sortmb) {
            throw new IOException("Invalid \"tez.runtime.io.sort.mb\": " + sortmb);
        }
        int maxMemUsage = sortmb << 20;
        maxMemUsage -= maxMemUsage % 16;
        this.largeBuffer = ByteBuffer.allocate(maxMemUsage);
        LOG.info((Object)("tez.runtime.io.sort.mb = " + sortmb));
        this.span = new SortSpan(this.largeBuffer, 0x100000, 16);
        this.merger = new SpanMerger(this.comparator);
        int sortThreads = this.conf.getInt("tez.runtime.sort.threads", 1);
        this.sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Sorter [" + TezUtilsInternal.cleanVertexName((String)outputContext.getDestinationVertexName()) + "] #%d").build());
        if (this.comparator instanceof ProxyComparator) {
            this.hasher = (ProxyComparator)this.comparator;
            LOG.info((Object)"Using the HashComparator");
        } else {
            this.hasher = null;
        }
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
        this.minSpillsForCombine = this.conf.getInt("tez.runtime.combine.min.spills", 3);
    }

    private int bitcount(int n) {
        int bit = 0;
        while (n != 0) {
            ++bit;
            n >>= 1;
        }
        return bit;
    }

    public void sort() throws IOException {
        SortSpan newSpan = this.span.next();
        if (newSpan == null) {
            this.merger.add(this.span.sort(this.sorter, this.comparator));
            this.spill();
            int items = 0x100000;
            int perItem = 16;
            if (this.span.length() != 0) {
                items = this.span.length();
                perItem = this.span.kvbuffer.limit() / items;
                items = this.largeBuffer.capacity() / (16 + perItem);
                if (items > 0x100000) {
                    items = 0x100000;
                }
            }
            this.span = new SortSpan(this.largeBuffer, items, perItem);
        } else {
            SortTask task = new SortTask(this.span, this.sorter, this.comparator);
            Future<SpanIterator> future = this.sortmaster.submit(task);
            this.merger.add(future);
            this.span = newSpan;
        }
        this.valSerializer.open((OutputStream)this.span.out);
        this.keySerializer.open((OutputStream)this.span.out);
    }

    @Override
    public void write(Object key, Object value) throws IOException {
        this.collect(key, value, this.partitioner.getPartition(key, value, this.partitions));
    }

    synchronized void collect(Object key, Object value, int partition) throws IOException {
        if (key.getClass() != this.keyClass) {
            throw new IOException("Type mismatch in key from map: expected " + this.keyClass.getName() + ", received " + key.getClass().getName());
        }
        if (value.getClass() != this.valClass) {
            throw new IOException("Type mismatch in value from map: expected " + this.valClass.getName() + ", received " + value.getClass().getName());
        }
        if (partition < 0 || partition >= this.partitions) {
            throw new IOException("Illegal partition for " + key + " (" + partition + ")");
        }
        if (this.span.kvmeta.remaining() < 16) {
            this.sort();
        }
        int keystart = this.span.kvbuffer.position();
        int valstart = -1;
        int valend = -1;
        try {
            this.keySerializer.serialize(key);
            valstart = this.span.kvbuffer.position();
            this.valSerializer.serialize(value);
            valend = this.span.kvbuffer.position();
        }
        catch (BufferOverflowException overflow) {
            this.span.kvbuffer.position(keystart);
            this.sort();
            this.collect(key, value, partition);
            return;
        }
        int prefix = 0;
        if (this.hasher != null) {
            prefix = this.hasher.getProxy(key);
        }
        prefix = partition << 32 - this.partitionBits | prefix >>> this.partitionBits;
        this.span.kvmeta.put(prefix);
        this.span.kvmeta.put(keystart);
        this.span.kvmeta.put(valstart);
        this.span.kvmeta.put(valend - valstart);
        if (valstart - keystart > this.span.keymax) {
            this.span.keymax = valstart - keystart;
        }
        if (valend - valstart > this.span.valmax) {
            this.span.valmax = valend - valstart;
        }
        this.mapOutputRecordCounter.increment(1L);
        this.mapOutputByteCounter.increment((long)(valend - keystart));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void spill() throws IOException {
        long size = this.largeBuffer.capacity() + this.partitions * 150;
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, size);
        FSDataOutputStream out = this.rfs.create(filename, true, 4096);
        try {
            this.merger.ready();
            LOG.info((Object)("Spilling to " + filename.toString()));
            for (int i = 0; i < this.partitions; ++i) {
                TezRawKeyValueIterator kvIter = this.merger.filter(i);
                long segmentStart = out.getPos();
                IFile.Writer writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
                if (this.combiner == null) {
                    while (kvIter.next()) {
                        writer.append(kvIter.getKey(), kvIter.getValue());
                    }
                } else {
                    this.runCombineProcessor(kvIter, writer);
                }
                writer.close();
                TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                spillRec.putIndex(rec, i);
            }
            Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
            spillRec.writeToFile(indexFilename, this.conf);
            ++this.numSpills;
        }
        catch (InterruptedException ie) {
        }
        finally {
            out.close();
        }
    }

    @Override
    public void flush() throws IOException {
        Path indexFilename;
        int i;
        String uniqueIdentifier = this.outputContext.getUniqueIdentifier();
        Path finalOutputFile = this.mapOutputFile.getOutputFileForWrite(0L);
        Path finalIndexFile = this.mapOutputFile.getOutputIndexFileForWrite(0L);
        LOG.info((Object)"Starting flush of map output");
        this.span.end();
        this.merger.add(this.span.sort(this.sorter, this.comparator));
        this.spill();
        this.sortmaster.shutdown();
        this.largeBuffer = null;
        if (this.numSpills == 1) {
            Path filename = this.mapOutputFile.getSpillFile(0);
            Path indexFilename2 = this.mapOutputFile.getSpillIndexFile(0);
            this.sameVolRename(filename, this.mapOutputFile.getOutputFileForWriteInVolume(filename));
            this.sameVolRename(indexFilename2, this.mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename2));
            return;
        }
        FSDataOutputStream finalOut = this.rfs.create(finalOutputFile, true, 4096);
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
        for (i = 0; i < this.numSpills; ++i) {
            indexFilename = this.mapOutputFile.getSpillIndexFile(i);
            TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, this.conf);
            indexCacheList.add(spillIndex);
        }
        for (int parts = 0; parts < this.partitions; ++parts) {
            ArrayList<TezMerger.Segment> segmentList = new ArrayList<TezMerger.Segment>(this.numSpills);
            for (int i2 = 0; i2 < this.numSpills; ++i2) {
                Path spillFilename = this.mapOutputFile.getSpillFile(i2);
                TezIndexRecord indexRecord = ((TezSpillRecord)indexCacheList.get(i2)).getIndex(parts);
                TezMerger.Segment s = new TezMerger.Segment(this.conf, this.rfs, spillFilename, indexRecord.getStartOffset(), indexRecord.getPartLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, true);
                segmentList.add(i2, s);
            }
            int mergeFactor = this.conf.getInt("tez.runtime.io.sort.factor", 100);
            boolean sortSegments = segmentList.size() > mergeFactor;
            TezRawKeyValueIterator kvIter = TezMerger.merge(this.conf, this.rfs, this.keyClass, this.valClass, this.codec, segmentList, mergeFactor, new Path(uniqueIdentifier), ConfigUtils.getIntermediateOutputKeyComparator(this.conf), this.nullProgressable, sortSegments, true, null, this.spilledRecordsCounter, null, null);
            long segmentStart = finalOut.getPos();
            IFile.Writer writer = new IFile.Writer(this.conf, finalOut, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, this.merger.needsRLE());
            if (this.combiner == null || this.numSpills < this.minSpillsForCombine) {
                TezMerger.writeFile(kvIter, writer, this.nullProgressable, 10000L);
            } else {
                this.runCombineProcessor(kvIter, writer);
            }
            writer.close();
            TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
            spillRec.putIndex(rec, parts);
        }
        spillRec.writeToFile(finalIndexFile, this.conf);
        finalOut.close();
        for (i = 0; i < this.numSpills; ++i) {
            indexFilename = this.mapOutputFile.getSpillIndexFile(i);
            Path spillFilename = this.mapOutputFile.getSpillFile(i);
            this.rfs.delete(indexFilename, true);
            this.rfs.delete(spillFilename, true);
        }
    }

    @Override
    public void close() {
    }

    private class SpanMerger
    implements PartitionedRawKeyValueIterator {
        private final RawComparator comparator;
        InputByteBuffer key;
        InputByteBuffer value;
        int partition;
        private ArrayList<Future<SpanIterator>> futures;
        private SpanHeap heap;
        private PartitionFilter partIter;
        private int gallop;
        private SpanIterator horse;
        private long total;
        private long count;
        private long eq;

        public SpanMerger(RawComparator comparator) {
            this.key = new InputByteBuffer();
            this.value = new InputByteBuffer();
            this.futures = new ArrayList();
            this.heap = new SpanHeap();
            this.gallop = 0;
            this.total = 0L;
            this.count = 0L;
            this.eq = 0L;
            this.comparator = comparator;
            this.partIter = new PartitionFilter(this);
        }

        public void add(SpanIterator iter) throws IOException {
            if (iter.next()) {
                this.heap.add(iter);
            }
        }

        public void add(Future<SpanIterator> iter) throws IOException {
            this.futures.add(iter);
        }

        public boolean ready() throws IOException, InterruptedException {
            try {
                SpanIterator iter = null;
                while (this.futures.size() > 0) {
                    Future<SpanIterator> futureIter = this.futures.remove(0);
                    iter = futureIter.get();
                    this.add(iter);
                }
                StringBuilder sb = new StringBuilder();
                for (SpanIterator sp : this.heap) {
                    sb.append(sp.toString());
                    sb.append(",");
                    this.total += (long)sp.span.length();
                    this.eq += sp.span.getEq();
                }
                LOG.info((Object)("Heap = " + sb.toString()));
                return true;
            }
            catch (Exception e) {
                LOG.info((Object)e.toString());
                return false;
            }
        }

        private SpanIterator pop() throws IOException {
            if (this.gallop > 0) {
                --this.gallop;
                return this.horse;
            }
            SpanIterator current = this.heap.pop();
            SpanIterator next = (SpanIterator)this.heap.peek();
            if (next != null && current != null && this.horse == current) {
                this.gallop = current.bisect(next.getKey(), next.getPartition()) - 1;
            }
            this.horse = current;
            return current;
        }

        public boolean needsRLE() {
            return (double)this.eq > 0.1 * (double)this.total;
        }

        private SpanIterator peek() throws IOException {
            if (this.gallop > 0) {
                return this.horse;
            }
            return (SpanIterator)this.heap.peek();
        }

        @Override
        public boolean next() throws IOException {
            SpanIterator current = this.pop();
            if (current != null) {
                this.key.reset(current.getKey());
                this.value.reset(current.getValue());
                this.partition = current.getPartition();
                if (this.gallop <= 0) {
                    this.add(current);
                } else {
                    current.next();
                }
                return true;
            }
            return false;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            return this.value;
        }

        @Override
        public int getPartition() {
            return this.partition;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        public TezRawKeyValueIterator filter(int partition) {
            this.partIter.reset(partition);
            return this.partIter;
        }
    }

    private class SpanHeap
    extends PriorityQueue<SpanIterator> {
        public SpanHeap() {
            super(256);
        }

        public SpanIterator pop() {
            return (SpanIterator)this.poll();
        }
    }

    private class PartitionFilter
    implements TezRawKeyValueIterator {
        private final PartitionedRawKeyValueIterator iter;
        private int partition;
        private boolean dirty = false;

        public PartitionFilter(PartitionedRawKeyValueIterator iter) {
            this.iter = iter;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            return this.iter.getKey();
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            return this.iter.getValue();
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return new Progress();
        }

        @Override
        public boolean next() throws IOException {
            if (this.dirty || this.iter.next()) {
                int prefix = this.iter.getPartition();
                if (prefix >>> 32 - PipelinedSorter.this.partitionBits == this.partition) {
                    this.dirty = false;
                    return true;
                }
                if (!this.dirty) {
                    this.dirty = true;
                }
            }
            return false;
        }

        public void reset(int partition) {
            this.partition = partition;
        }

        public int getPartition() {
            return this.partition;
        }
    }

    private class SortTask
    implements Callable<SpanIterator> {
        private final SortSpan sortable;
        private final IndexedSorter sorter;
        private final RawComparator comparator;

        public SortTask(SortSpan sortable, IndexedSorter sorter, RawComparator comparator) {
            this.sortable = sortable;
            this.sorter = sorter;
            this.comparator = comparator;
        }

        @Override
        public SpanIterator call() {
            return this.sortable.sort(this.sorter, this.comparator);
        }
    }

    private class SpanIterator
    implements PartitionedRawKeyValueIterator,
    Comparable<SpanIterator> {
        private int kvindex = -1;
        private int maxindex;
        private IntBuffer kvmeta;
        private ByteBuffer kvbuffer;
        private SortSpan span;
        private InputByteBuffer key = new InputByteBuffer();
        private InputByteBuffer value = new InputByteBuffer();
        private Progress progress = new Progress();
        private final int minrun = 16;

        public SpanIterator(SortSpan span) {
            this.kvmeta = span.kvmeta;
            this.kvbuffer = span.kvbuffer;
            this.span = span;
            this.maxindex = this.kvmeta.limit() / 4 - 1;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            int keystart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 1);
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            this.key.reset(this.kvbuffer, keystart, valstart - keystart);
            return this.key;
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            int valstart = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 2);
            int vallen = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 3);
            this.value.reset(this.kvbuffer, valstart, vallen);
            return this.value;
        }

        @Override
        public boolean next() throws IOException {
            if (this.kvindex == this.maxindex) {
                return false;
            }
            if (this.kvindex % 100 == 0) {
                this.progress.set((float)((this.kvindex - this.maxindex) / this.maxindex));
            }
            ++this.kvindex;
            return true;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Progress getProgress() {
            return this.progress;
        }

        @Override
        public int getPartition() {
            int partition = this.kvmeta.get(this.span.offsetFor(this.kvindex) + 0);
            return partition;
        }

        public int size() {
            return this.maxindex - this.kvindex;
        }

        @Override
        public int compareTo(SpanIterator other) {
            try {
                return this.span.compareInternal(other.getKey(), other.getPartition(), this.kvindex);
            }
            catch (IOException iOException) {
                return -1;
            }
        }

        public String toString() {
            return String.format("SpanIterator<%d:%d> (span=%s)", this.kvindex, this.maxindex, this.span.toString());
        }

        int bisect(DataInputBuffer needle, int needlePart) {
            int start = this.kvindex;
            int end = this.maxindex - 1;
            int mid = start;
            int cmp = 0;
            if (end - start < 16) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, start) > 0) {
                return this.kvindex;
            }
            if (this.span.compareInternal(needle, needlePart, start + 16) > 0) {
                return 0;
            }
            if (this.span.compareInternal(needle, needlePart, end) < 0) {
                return end - this.kvindex;
            }
            boolean found = false;
            for (int i = 0; start < end && i < 16; ++i) {
                mid = start + (end - start) / 2;
                cmp = this.span.compareInternal(needle, needlePart, mid);
                if (cmp == 0) {
                    start = mid;
                    found = true;
                } else if (cmp < 0) {
                    start = mid;
                    found = true;
                }
                if (cmp <= 0) continue;
                end = mid;
            }
            if (found) {
                return start - this.kvindex;
            }
            return 0;
        }
    }

    private class SortSpan
    implements IndexedSortable {
        final IntBuffer kvmeta;
        final ByteBuffer kvbuffer;
        final DataOutputStream out;
        private RawComparator comparator;
        final int[] imeta = new int[4];
        final int[] jmeta = new int[4];
        int keymax = 1;
        int valmax = 1;
        private int i;
        private int j;
        private byte[] ki;
        private byte[] kj;
        private int index = 0;
        private InputByteBuffer hay = new InputByteBuffer();
        private long eq = 0L;

        public SortSpan(ByteBuffer source, int maxItems, int perItem) {
            int capacity = source.remaining();
            int metasize = 16 * maxItems;
            int dataSize = maxItems * perItem;
            if (capacity < metasize + dataSize) {
                metasize = 16 * (capacity / (perItem + 16));
            }
            ByteBuffer reserved = source.duplicate();
            reserved.mark();
            LOG.info((Object)("reserved.remaining() = " + reserved.remaining()));
            LOG.info((Object)("reserved.size = " + metasize));
            reserved.position(metasize);
            this.kvbuffer = reserved.slice();
            reserved.flip();
            reserved.limit(metasize);
            this.kvmeta = reserved.slice().order(ByteOrder.nativeOrder()).asIntBuffer();
            this.out = new DataOutputStream(new BufferStreamWrapper(this.kvbuffer));
        }

        public SpanIterator sort(IndexedSorter sorter, RawComparator comparator) {
            this.comparator = comparator;
            this.ki = new byte[this.keymax];
            this.kj = new byte[this.keymax];
            long start = System.currentTimeMillis();
            if (this.length() > 1) {
                sorter.sort((IndexedSortable)this, 0, this.length(), PipelinedSorter.this.nullProgressable);
            }
            LOG.info((Object)("done sorting span=" + this.index + ", length=" + this.length() + ", " + "time=" + (System.currentTimeMillis() - start)));
            return new SpanIterator(this);
        }

        int offsetFor(int i) {
            return i * 4;
        }

        public void swap(int mi, int mj) {
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            this.kvmeta.position(kvi);
            this.kvmeta.get(this.imeta);
            this.kvmeta.position(kvj);
            this.kvmeta.get(this.jmeta);
            this.kvmeta.position(kvj);
            this.kvmeta.put(this.imeta);
            this.kvmeta.position(kvi);
            this.kvmeta.put(this.jmeta);
            if (this.i == mi || this.j == mj) {
                this.i = -1;
            }
            if (this.i == mi || this.j == mj) {
                this.j = -1;
            }
        }

        public int compare(int mi, int mj) {
            int kvjp;
            int kvi = this.offsetFor(mi);
            int kvj = this.offsetFor(mj);
            int kvip = this.kvmeta.get(kvi + 0);
            if (kvip != (kvjp = this.kvmeta.get(kvj + 0))) {
                return kvip - kvjp;
            }
            int istart = this.kvmeta.get(kvi + 1);
            int jstart = this.kvmeta.get(kvj + 1);
            int ilen = this.kvmeta.get(kvi + 2) - istart;
            int jlen = this.kvmeta.get(kvj + 2) - jstart;
            this.kvbuffer.position(istart);
            this.kvbuffer.get(this.ki, 0, ilen);
            this.kvbuffer.position(jstart);
            this.kvbuffer.get(this.kj, 0, jlen);
            int cmp = this.comparator.compare(this.ki, 0, ilen, this.kj, 0, jlen);
            if (cmp == 0) {
                ++this.eq;
            }
            return cmp;
        }

        public SortSpan next() {
            ByteBuffer remaining = this.end();
            if (remaining != null) {
                int items = this.length();
                int perItem = this.kvbuffer.position() / items;
                SortSpan newSpan = new SortSpan(remaining, items, perItem);
                newSpan.index = this.index + 1;
                return newSpan;
            }
            return null;
        }

        public int length() {
            return this.kvmeta.limit() / 4;
        }

        public ByteBuffer end() {
            ByteBuffer remaining = this.kvbuffer.duplicate();
            remaining.position(this.kvbuffer.position());
            remaining = remaining.slice();
            this.kvbuffer.limit(this.kvbuffer.position());
            this.kvmeta.limit(this.kvmeta.position());
            int items = this.length();
            if (items == 0) {
                return null;
            }
            int perItem = this.kvbuffer.position() / items;
            LOG.info((Object)String.format("Span%d.length = %d, perItem = %d", this.index, this.length(), perItem));
            if (remaining.remaining() < 16 + perItem) {
                return null;
            }
            return remaining;
        }

        private int compareInternal(DataInputBuffer needle, int needlePart, int index) {
            int cmp = 0;
            int partition = this.kvmeta.get(PipelinedSorter.this.span.offsetFor(index) + 0);
            if (partition != needlePart) {
                cmp = partition - needlePart;
            } else {
                int keystart = this.kvmeta.get(PipelinedSorter.this.span.offsetFor(index) + 1);
                int valstart = this.kvmeta.get(PipelinedSorter.this.span.offsetFor(index) + 2);
                this.hay.reset(this.kvbuffer, keystart, valstart - keystart);
                cmp = this.comparator.compare(this.hay.getData(), this.hay.getPosition(), this.hay.getLength(), needle.getData(), needle.getPosition(), needle.getLength());
            }
            return cmp;
        }

        public long getEq() {
            return this.eq;
        }

        public String toString() {
            return String.format("Span[%d,%d]", 4 * this.kvmeta.capacity(), this.kvbuffer.limit());
        }
    }

    protected class InputByteBuffer
    extends DataInputBuffer {
        private byte[] buffer = new byte[256];
        private ByteBuffer wrapped = ByteBuffer.wrap(this.buffer);

        protected InputByteBuffer() {
        }

        private void resize(int length) {
            if (length > this.buffer.length) {
                this.buffer = new byte[length];
                this.wrapped = ByteBuffer.wrap(this.buffer);
            }
            this.wrapped.limit(length);
        }

        public void reset(ByteBuffer b, int start, int length) {
            this.resize(length);
            b.position(start);
            b.get(this.buffer, 0, length);
            super.reset(this.buffer, 0, length);
        }

        public void reset(DataInputBuffer clone) {
            byte[] data = clone.getData();
            int start = clone.getPosition();
            int length = clone.getLength();
            this.resize(length);
            System.arraycopy(data, start, this.buffer, 0, length);
            super.reset(this.buffer, 0, length);
        }
    }

    private class BufferStreamWrapper
    extends OutputStream {
        private final ByteBuffer out;

        public BufferStreamWrapper(ByteBuffer out) {
            this.out = out;
        }

        @Override
        public void write(int b) throws IOException {
            this.out.put((byte)b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.out.put(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.out.put(b, off, len);
        }
    }

    private static interface PartitionedRawKeyValueIterator
    extends TezRawKeyValueIterator {
        public int getPartition();
    }
}

