/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
import org.apache.flink.runtime.operators.hash.InMemoryPartition;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;

public class CompactingHashTable<T>
extends AbstractMutableHashTable<T> {
    private static final Log LOG = LogFactory.getLog(CompactingHashTable.class);
    private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
    private static final int MAX_NUM_PARTITIONS = 32;
    private static final int DEFAULT_RECORD_LEN = 24;
    private static final int HASH_CODE_LEN = 4;
    private static final int POINTER_LEN = 8;
    private static final int RECORD_TABLE_BYTES = 12;
    private static final int RECORD_OVERHEAD_BYTES = 14;
    private static final int NUM_INTRA_BUCKET_BITS = 7;
    private static final int HASH_BUCKET_SIZE = 128;
    private static final int BUCKET_HEADER_LENGTH = 16;
    private static final int NUM_ENTRIES_PER_BUCKET = 9;
    private static final int BUCKET_POINTER_START_OFFSET = 52;
    private static final int HEADER_PARTITION_OFFSET = 0;
    private static final int HEADER_COUNT_OFFSET = 4;
    private static final int HEADER_FORWARD_OFFSET = 8;
    private static final long BUCKET_FORWARD_POINTER_NOT_SET = -1L;
    private final ArrayList<MemorySegment> availableMemory;
    private final int segmentSize;
    private final int bucketsPerSegmentMask;
    private final int bucketsPerSegmentBits;
    private final int avgRecordLen;
    private final ArrayList<InMemoryPartition<T>> partitions;
    private MemorySegment[] buckets;
    private InMemoryPartition<T> compactionMemory;
    private int numBuckets;
    private boolean isResizing = false;
    private AtomicBoolean closed = new AtomicBoolean();
    private boolean running = true;
    private int pageSizeInBits;

    public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments) {
        this(buildSideSerializer, buildSideComparator, memorySegments, 24);
    }

    public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments, int avgRecordLen) {
        super(buildSideSerializer, buildSideComparator);
        if (memorySegments == null) {
            throw new NullPointerException();
        }
        if (memorySegments.size() < 33) {
            throw new IllegalArgumentException("Too few memory segments provided. Hash Table needs at least 33 memory segments.");
        }
        this.availableMemory = memorySegments instanceof ArrayList ? (ArrayList<Object>)memorySegments : new ArrayList<MemorySegment>(memorySegments);
        this.avgRecordLen = buildSideSerializer.getLength() > 0 ? buildSideSerializer.getLength() : avgRecordLen;
        this.segmentSize = memorySegments.get(0).size();
        if ((this.segmentSize & this.segmentSize - 1) != 0) {
            throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
        }
        int bucketsPerSegment = this.segmentSize >> 7;
        if (bucketsPerSegment == 0) {
            throw new IllegalArgumentException("Hash Table requires buffers of at least 128 bytes.");
        }
        this.bucketsPerSegmentMask = bucketsPerSegment - 1;
        this.bucketsPerSegmentBits = MathUtils.log2strict(bucketsPerSegment);
        this.partitions = new ArrayList();
        this.closed.set(true);
    }

    @Override
    public void open() {
        if (!this.closed.compareAndSet(true, false)) {
            throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed.");
        }
        int partitionFanOut = CompactingHashTable.getPartitioningFanOutNoEstimates(this.availableMemory.size());
        this.createPartitions(partitionFanOut);
        int numBuckets = CompactingHashTable.getInitialTableSize(this.availableMemory.size(), this.segmentSize, partitionFanOut, this.avgRecordLen);
        this.initTable(numBuckets, (byte)partitionFanOut);
    }

    @Override
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        LOG.debug((Object)"Closing hash table and releasing resources.");
        this.releaseTable();
        this.clearPartitions();
    }

    @Override
    public void abort() {
        this.running = false;
        LOG.debug((Object)"Cancelling hash table operations.");
    }

    @Override
    public List<MemorySegment> getFreeMemory() {
        if (!this.closed.get()) {
            throw new IllegalStateException("Cannot return memory while join is open.");
        }
        return this.availableMemory;
    }

    @Override
    public void buildTable(MutableObjectIterator<T> input) throws IOException {
        Object record = this.buildSideSerializer.createInstance();
        while (this.running && (record = input.next(record)) != null) {
            this.insert(record);
        }
    }

    @Override
    public final void insert(T record) throws IOException {
        long pointer;
        if (this.closed.get()) {
            return;
        }
        int hashCode = CompactingHashTable.hash(this.buildSideComparator.hash(record));
        int posHashCode = hashCode % this.numBuckets;
        int bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
        int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
        MemorySegment bucket = this.buckets[bucketArrayPos];
        byte partitionNumber = bucket.get(bucketInSegmentPos + 0);
        InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
        try {
            pointer = partition.appendRecord(record);
            if (pointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
            }
        }
        catch (EOFException e) {
            try {
                this.compactPartition(partitionNumber);
                partition = this.partitions.get(partitionNumber);
                pointer = partition.appendRecord(record);
            }
            catch (EOFException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
            }
            catch (IndexOutOfBoundsException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
            }
        }
        catch (IndexOutOfBoundsException e1) {
            try {
                this.compactPartition(partitionNumber);
                partition = this.partitions.get(partitionNumber);
                pointer = partition.appendRecord(record);
            }
            catch (EOFException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
            }
            catch (IndexOutOfBoundsException ex) {
                throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
            }
        }
        this.insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
    }

    public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
        return new HashTableProber(probeSideComparator, pairComparator);
    }

    @Override
    public MutableObjectIterator<T> getEntryIterator() {
        return new EntryIterator(this);
    }

    @Override
    public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
        if (this.closed.get()) {
            return;
        }
        int searchHashCode = CompactingHashTable.hash(this.buildSideComparator.hash(record));
        int posHashCode = searchHashCode % this.numBuckets;
        MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
        int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << 7;
        MemorySegment bucket = originalBucket;
        int bucketInSegmentOffset = originalBucketOffset;
        byte partitionNumber = bucket.get(bucketInSegmentOffset + 0);
        InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
        MemorySegment[] overflowSegments = partition.overflowSegments;
        this.buildSideComparator.setReference(record);
        int countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
        int numInSegment = 0;
        int posInSegment = bucketInSegmentOffset + 16;
        long currentForwardPointer = -1L;
        while (true) {
            if (numInSegment < countInSegment) {
                int thisCode = bucket.getInt(posInSegment);
                posInSegment += 4;
                if (thisCode == searchHashCode) {
                    int pointerOffset = bucketInSegmentOffset + 52 + numInSegment * 8;
                    long pointer = bucket.getLong(pointerOffset);
                    ++numInSegment;
                    try {
                        if (!this.buildSideComparator.equalToReference(tempHolder = partition.readRecordAt(pointer, tempHolder))) continue;
                        long newPointer = partition.appendRecord(record);
                        bucket.putLong(pointerOffset, newPointer);
                        partition.setCompaction(false);
                        if (newPointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                            this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
                        }
                        return;
                    }
                    catch (EOFException e) {
                        long newPointer;
                        try {
                            this.compactPartition(partition.getPartitionNumber());
                            partition = this.partitions.get(partitionNumber);
                            newPointer = partition.appendRecord(record);
                        }
                        catch (EOFException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                        }
                        catch (IndexOutOfBoundsException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                        }
                        bucket.putLong(pointerOffset, newPointer);
                        return;
                    }
                    catch (IndexOutOfBoundsException e) {
                        long newPointer;
                        try {
                            this.compactPartition(partition.getPartitionNumber());
                            partition = this.partitions.get(partitionNumber);
                            newPointer = partition.appendRecord(record);
                        }
                        catch (EOFException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                        }
                        catch (IndexOutOfBoundsException ex) {
                            throw new RuntimeException("Memory ran out. Compaction failed. " + this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                        }
                        bucket.putLong(pointerOffset, newPointer);
                        return;
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
                    }
                }
                ++numInSegment;
                continue;
            }
            long newForwardPointer = bucket.getLong(bucketInSegmentOffset + 8);
            if (newForwardPointer == -1L) {
                long pointer = partition.appendRecord(record);
                this.insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
                if (pointer >> this.pageSizeInBits > (long)this.compactionMemory.getBlockCount()) {
                    this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
                }
                return;
            }
            int overflowSegNum = (int)(newForwardPointer >>> 32);
            bucket = overflowSegments[overflowSegNum];
            bucketInSegmentOffset = (int)(newForwardPointer & 0xFFFFFFFFFFFFFFFFL);
            countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
            posInSegment = bucketInSegmentOffset + 16;
            numInSegment = 0;
            currentForwardPointer = newForwardPointer;
        }
    }

    private final void insertBucketEntryFromStart(InMemoryPartition<T> p, MemorySegment bucket, int bucketInSegmentPos, int hashCode, long pointer) throws IOException {
        boolean checkForResize = false;
        int count = bucket.getInt(bucketInSegmentPos + 4);
        if (count < 9) {
            bucket.putInt(bucketInSegmentPos + 16 + count * 4, hashCode);
            bucket.putLong(bucketInSegmentPos + 52 + count * 8, pointer);
            bucket.putInt(bucketInSegmentPos + 4, count + 1);
        } else {
            int overflowBucketNum;
            int overflowBucketOffset;
            MemorySegment overflowSeg;
            long forwardForNewBucket;
            long originalForwardPointer = bucket.getLong(bucketInSegmentPos + 8);
            if (originalForwardPointer != -1L) {
                int overflowSegNum = (int)(originalForwardPointer >>> 32);
                MemorySegment seg = p.overflowSegments[overflowSegNum];
                int segOffset = (int)(originalForwardPointer & 0xFFFFFFFFFFFFFFFFL);
                int obCount = seg.getInt(segOffset + 4);
                if (obCount < 9) {
                    seg.putInt(segOffset + 16 + obCount * 4, hashCode);
                    seg.putLong(segOffset + 52 + obCount * 8, pointer);
                    seg.putInt(segOffset + 4, obCount + 1);
                    return;
                }
                forwardForNewBucket = originalForwardPointer;
            } else {
                forwardForNewBucket = -1L;
            }
            if (p.nextOverflowBucket == 0) {
                overflowSeg = this.getNextBuffer();
                overflowBucketOffset = 0;
                overflowBucketNum = p.numOverflowSegments;
                if (p.overflowSegments.length <= p.numOverflowSegments) {
                    MemorySegment[] newSegsArray = new MemorySegment[p.overflowSegments.length * 2];
                    System.arraycopy(p.overflowSegments, 0, newSegsArray, 0, p.overflowSegments.length);
                    p.overflowSegments = newSegsArray;
                }
                p.overflowSegments[p.numOverflowSegments] = overflowSeg;
                ++p.numOverflowSegments;
                checkForResize = true;
            } else {
                overflowBucketNum = p.numOverflowSegments - 1;
                overflowSeg = p.overflowSegments[overflowBucketNum];
                overflowBucketOffset = p.nextOverflowBucket << 7;
            }
            p.nextOverflowBucket = p.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : p.nextOverflowBucket + 1;
            overflowSeg.putLong(overflowBucketOffset + 8, forwardForNewBucket);
            long pointerToNewBucket = (long)overflowBucketNum << 32 | (long)overflowBucketOffset;
            bucket.putLong(bucketInSegmentPos + 8, pointerToNewBucket);
            overflowSeg.putInt(overflowBucketOffset + 16, hashCode);
            overflowSeg.putLong(overflowBucketOffset + 52, pointer);
            overflowSeg.putInt(overflowBucketOffset + 4, 1);
            if (checkForResize && !this.isResizing && this.buckets.length <= this.getOverflowSegmentCount()) {
                this.resizeHashTable();
            }
        }
    }

    private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
        boolean checkForResize = false;
        if (countInCurrentBucket < 9) {
            currentBucket.putInt(currentBucketOffset + 16 + countInCurrentBucket * 4, hashCode);
            currentBucket.putLong(currentBucketOffset + 52 + countInCurrentBucket * 8, pointer);
            currentBucket.putInt(currentBucketOffset + 4, countInCurrentBucket + 1);
        } else {
            int overflowBucketNum;
            int overflowBucketOffset;
            MemorySegment overflowSeg;
            if (partition.nextOverflowBucket == 0) {
                overflowSeg = this.getNextBuffer();
                overflowBucketOffset = 0;
                overflowBucketNum = partition.numOverflowSegments;
                if (partition.overflowSegments.length <= partition.numOverflowSegments) {
                    MemorySegment[] newSegsArray = new MemorySegment[partition.overflowSegments.length * 2];
                    System.arraycopy(partition.overflowSegments, 0, newSegsArray, 0, partition.overflowSegments.length);
                    partition.overflowSegments = newSegsArray;
                }
                partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
                ++partition.numOverflowSegments;
                checkForResize = true;
            } else {
                overflowBucketNum = partition.numOverflowSegments - 1;
                overflowSeg = partition.overflowSegments[overflowBucketNum];
                overflowBucketOffset = partition.nextOverflowBucket << 7;
            }
            partition.nextOverflowBucket = partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1;
            overflowSeg.putLong(overflowBucketOffset + 8, currentForwardPointer);
            long pointerToNewBucket = (long)overflowBucketNum << 32 | (long)overflowBucketOffset;
            originalBucket.putLong(originalBucketOffset + 8, pointerToNewBucket);
            overflowSeg.putInt(overflowBucketOffset + 16, hashCode);
            overflowSeg.putLong(overflowBucketOffset + 52, pointer);
            overflowSeg.putInt(overflowBucketOffset + 4, 1);
            if (checkForResize && !this.isResizing && this.buckets.length <= this.getOverflowSegmentCount()) {
                this.resizeHashTable();
            }
        }
    }

    private void createPartitions(int numPartitions) {
        this.partitions.clear();
        ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory);
        this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
        for (int i = 0; i < numPartitions; ++i) {
            this.partitions.add(new InMemoryPartition(this.buildSideSerializer, i, memSource, this.segmentSize, this.pageSizeInBits));
        }
        this.compactionMemory = new InMemoryPartition(this.buildSideSerializer, -1, memSource, this.segmentSize, this.pageSizeInBits);
    }

    private void clearPartitions() {
        for (int i = 0; i < this.partitions.size(); ++i) {
            InMemoryPartition<T> p = this.partitions.get(i);
            p.clearAllMemory(this.availableMemory);
        }
        this.partitions.clear();
        this.compactionMemory.clearAllMemory(this.availableMemory);
    }

    private void initTable(int numBuckets, byte numPartitions) {
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int numSegs = (numBuckets >>> this.bucketsPerSegmentBits) + ((numBuckets & this.bucketsPerSegmentMask) == 0 ? 0 : 1);
        MemorySegment[] table = new MemorySegment[numSegs];
        int bucket = 0;
        for (int i = 0; i < numSegs && bucket < numBuckets; ++i) {
            MemorySegment seg = this.getNextBuffer();
            for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; ++k, ++bucket) {
                int bucketOffset = k * 128;
                byte partition = CompactingHashTable.assignPartition(bucket, numPartitions);
                seg.put(bucketOffset + 0, partition);
                seg.putInt(bucketOffset + 4, 0);
                seg.putLong(bucketOffset + 8, -1L);
            }
            table[i] = seg;
        }
        this.buckets = table;
        this.numBuckets = numBuckets;
    }

    private void releaseTable() {
        this.numBuckets = 0;
        if (this.buckets != null) {
            for (int i = 0; i < this.buckets.length; ++i) {
                this.availableMemory.add(this.buckets[i]);
            }
            this.buckets = null;
        }
    }

    private final MemorySegment getNextBuffer() {
        int s = this.availableMemory.size();
        if (s > 0) {
            return this.availableMemory.remove(s - 1);
        }
        throw new RuntimeException("Memory ran out. " + this.getMemoryConsumptionString());
    }

    private static final int getPartitioningFanOutNoEstimates(int numBuffers) {
        return Math.max(10, Math.min(numBuffers / 10, 32));
    }

    private String getMemoryConsumptionString() {
        String result = new String("numPartitions: " + this.partitions.size() + " minPartition: " + this.getMinPartition() + " maxPartition: " + this.getMaxPartition() + " number of overflow segments: " + this.getOverflowSegmentCount() + " bucketSize: " + this.buckets.length + " Overall memory: " + this.getSize() + " Partition memory: " + this.getPartitionSize());
        return result;
    }

    private long getSize() {
        long numSegments = 0L;
        numSegments += (long)this.availableMemory.size();
        numSegments += (long)this.buckets.length;
        for (InMemoryPartition<T> p : this.partitions) {
            numSegments += (long)p.getBlockCount();
            numSegments += (long)p.numOverflowSegments;
        }
        return (numSegments += (long)this.compactionMemory.getBlockCount()) * (long)this.segmentSize;
    }

    private long getPartitionSize() {
        long numSegments = 0L;
        for (InMemoryPartition<T> p : this.partitions) {
            numSegments += (long)p.getBlockCount();
        }
        return numSegments * (long)this.segmentSize;
    }

    private int getMaxPartition() {
        int maxPartition = 0;
        for (InMemoryPartition<T> p1 : this.partitions) {
            if (p1.getBlockCount() <= maxPartition) continue;
            maxPartition = p1.getBlockCount();
        }
        return maxPartition;
    }

    private int getMinPartition() {
        int minPartition = Integer.MAX_VALUE;
        for (InMemoryPartition<T> p1 : this.partitions) {
            if (p1.getBlockCount() >= minPartition) continue;
            minPartition = p1.getBlockCount();
        }
        return minPartition;
    }

    private int getOverflowSegmentCount() {
        int result = 0;
        for (InMemoryPartition<T> p : this.partitions) {
            result += p.numOverflowSegments;
        }
        return result;
    }

    private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
        long totalSize = (long)bufferSize * (long)numBuffers;
        long numRecordsStorable = totalSize / (long)(recordLenBytes + 14);
        long bucketBytes = numRecordsStorable * 14L;
        long numBuckets = bucketBytes / 256L + 1L;
        while (numBuckets % (long)numPartitions != 0L) {
            ++numBuckets;
        }
        return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)numBuckets;
    }

    private static final byte assignPartition(int bucket, byte numPartitions) {
        return (byte)(bucket % numPartitions);
    }

    private boolean resizeHashTable() throws IOException {
        int startSegment;
        int newNumBuckets = 2 * this.numBuckets;
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int newNumSegments = (newNumBuckets + (bucketsPerSegment - 1)) / bucketsPerSegment;
        int additionalSegments = newNumSegments - this.buckets.length;
        int numPartitions = this.partitions.size();
        if (this.availableMemory.size() < additionalSegments) {
            for (int i = 0; i < numPartitions; ++i) {
                this.compactPartition(i);
                if (this.availableMemory.size() >= additionalSegments) break;
            }
        }
        if (this.availableMemory.size() < additionalSegments || this.closed.get()) {
            return false;
        }
        this.isResizing = true;
        int startOffset = this.numBuckets * 128 % this.segmentSize;
        MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
        int oldNumBuckets = this.numBuckets;
        int oldNumSegments = this.buckets.length;
        MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
        System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
        System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
        this.buckets = mergedBuckets;
        this.numBuckets = newNumBuckets;
        boolean oldSegment = startOffset != 0;
        int bucket = oldNumBuckets;
        for (int i = startSegment = oldSegment ? oldNumSegments - 1 : oldNumSegments; i < newNumSegments && bucket < this.numBuckets; ++i) {
            int k;
            MemorySegment seg;
            int bucketOffset = 0;
            if (oldSegment) {
                seg = this.buckets[i];
                for (k = oldNumBuckets % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; ++k, ++bucket) {
                    bucketOffset = k * 128;
                    seg.put(bucketOffset + 0, CompactingHashTable.assignPartition(bucket, (byte)numPartitions));
                    seg.putInt(bucketOffset + 4, 0);
                    seg.putLong(bucketOffset + 8, -1L);
                }
            } else {
                seg = this.getNextBuffer();
                for (k = 0; k < bucketsPerSegment && bucket < this.numBuckets; ++k, ++bucket) {
                    bucketOffset = k * 128;
                    seg.put(bucketOffset + 0, CompactingHashTable.assignPartition(bucket, (byte)numPartitions));
                    seg.putInt(bucketOffset + 4, 0);
                    seg.putLong(bucketOffset + 8, -1L);
                }
            }
            this.buckets[i] = seg;
            oldSegment = false;
        }
        int hashOffset = 0;
        int hash = 0;
        int pointerOffset = 0;
        long pointer = 0L;
        IntArrayList hashList = new IntArrayList(9);
        LongArrayList pointerList = new LongArrayList(9);
        IntArrayList overflowHashes = new IntArrayList(64);
        LongArrayList overflowPointers = new LongArrayList(64);
        for (int i = 0; i < numPartitions; ++i) {
            InMemoryPartition<T> partition = this.partitions.get(i);
            MemorySegment[] overflowSegments = partition.overflowSegments;
            int posHashCode = 0;
            int bucket2 = i;
            for (int j = 0; j < this.buckets.length && bucket2 < oldNumBuckets; ++j) {
                MemorySegment segment = this.buckets[j];
                for (int k = bucket2 % bucketsPerSegment; k < bucketsPerSegment && bucket2 < oldNumBuckets; k += numPartitions, bucket2 += numPartitions) {
                    int bucketOffset = k * 128;
                    if (segment.get(bucketOffset + 0) != i) {
                        throw new IOException("Accessed wrong bucket! wanted: " + i + " got: " + segment.get(bucketOffset + 0));
                    }
                    int countInSegment = segment.getInt(bucketOffset + 4);
                    int numInSegment = 0;
                    pointerOffset = bucketOffset + 52;
                    hashOffset = bucketOffset + 16;
                    while (true) {
                        if (numInSegment < countInSegment) {
                            hash = segment.getInt(hashOffset);
                            if (hash % this.numBuckets != bucket2 && hash % this.numBuckets != bucket2 + oldNumBuckets) {
                                throw new IOException("wanted: " + bucket2 + " or " + (bucket2 + oldNumBuckets) + " got: " + hash % this.numBuckets);
                            }
                            pointer = segment.getLong(pointerOffset);
                            hashList.add(hash);
                            pointerList.add(pointer);
                            pointerOffset += 8;
                            hashOffset += 4;
                            ++numInSegment;
                            continue;
                        }
                        long forwardPointer = segment.getLong(bucketOffset + 8);
                        if (forwardPointer == -1L) break;
                        int overflowSegNum = (int)(forwardPointer >>> 32);
                        segment = overflowSegments[overflowSegNum];
                        bucketOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                        countInSegment = segment.getInt(bucketOffset + 4);
                        pointerOffset = bucketOffset + 52;
                        hashOffset = bucketOffset + 16;
                        numInSegment = 0;
                    }
                    segment = this.buckets[j];
                    bucketOffset = k * 128;
                    segment.putInt(bucketOffset + 4, 0);
                    segment.putLong(bucketOffset + 8, -1L);
                    if (hashList.size() != pointerList.size()) {
                        throw new IOException("Pointer and hash counts do not match. hashes: " + hashList.size() + " pointer: " + pointerList.size());
                    }
                    int newSegmentIndex = (bucket2 + oldNumBuckets) / bucketsPerSegment;
                    MemorySegment newSegment = this.buckets[newSegmentIndex];
                    int oldBucketCount = 0;
                    int newBucketCount = 0;
                    while (!hashList.isEmpty()) {
                        hash = hashList.removeInt(hashList.size() - 1);
                        pointer = pointerList.removeLong(pointerList.size() - 1);
                        posHashCode = hash % this.numBuckets;
                        if (posHashCode == bucket2 && oldBucketCount < 9) {
                            bucketOffset = bucket2 % bucketsPerSegment * 128;
                            this.insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
                            ++oldBucketCount;
                            continue;
                        }
                        if (posHashCode == bucket2 + oldNumBuckets && newBucketCount < 9) {
                            bucketOffset = (bucket2 + oldNumBuckets) % bucketsPerSegment * 128;
                            this.insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
                            ++newBucketCount;
                            continue;
                        }
                        if (posHashCode == bucket2 + oldNumBuckets || posHashCode == bucket2) {
                            overflowHashes.add(hash);
                            overflowPointers.add(pointer);
                            continue;
                        }
                        throw new IOException("Accessed wrong bucket. Target: " + bucket2 + " or " + (bucket2 + oldNumBuckets) + " Hit: " + posHashCode);
                    }
                    hashList.clear();
                    pointerList.clear();
                }
            }
            this.availableMemory.addAll(partition.resetOverflowBuckets());
            int bucketArrayPos = 0;
            int bucketInSegmentPos = 0;
            MemorySegment bucket3 = null;
            while (!overflowHashes.isEmpty()) {
                hash = overflowHashes.removeInt(overflowHashes.size() - 1);
                pointer = overflowPointers.removeLong(overflowPointers.size() - 1);
                posHashCode = hash % this.numBuckets;
                bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
                bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << 7;
                bucket3 = this.buckets[bucketArrayPos];
                this.insertBucketEntryFromStart(partition, bucket3, bucketInSegmentPos, hash, pointer);
            }
            overflowHashes.clear();
            overflowPointers.clear();
        }
        this.isResizing = false;
        return true;
    }

    private void compactPartition(int partitionNumber) throws IOException {
        if (this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
            return;
        }
        this.compactionMemory.clearAllMemory(this.availableMemory);
        this.compactionMemory.allocateSegments(1);
        this.compactionMemory.pushDownPages();
        Object tempHolder = this.buildSideSerializer.createInstance();
        int numPartitions = this.partitions.size();
        InMemoryPartition<Object> partition = this.partitions.remove(partitionNumber);
        MemorySegment[] overflowSegments = partition.overflowSegments;
        long pointer = 0L;
        int pointerOffset = 0;
        int bucketOffset = 0;
        int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
        int bucket = partitionNumber;
        for (int i = 0; i < this.buckets.length && bucket < this.numBuckets; ++i) {
            MemorySegment segment = this.buckets[i];
            for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; k += numPartitions, bucket += numPartitions) {
                bucketOffset = k * 128;
                if (segment.get(bucketOffset + 0) != partitionNumber) {
                    throw new IOException("Accessed wrong bucket! wanted: " + partitionNumber + " got: " + segment.get(bucketOffset + 0));
                }
                int countInSegment = segment.getInt(bucketOffset + 4);
                int numInSegment = 0;
                pointerOffset = bucketOffset + 52;
                while (true) {
                    if (numInSegment < countInSegment) {
                        pointer = segment.getLong(pointerOffset);
                        tempHolder = partition.readRecordAt(pointer, tempHolder);
                        pointer = this.compactionMemory.appendRecord(tempHolder);
                        segment.putLong(pointerOffset, pointer);
                        pointerOffset += 8;
                        ++numInSegment;
                        continue;
                    }
                    long forwardPointer = segment.getLong(bucketOffset + 8);
                    if (forwardPointer == -1L) break;
                    int overflowSegNum = (int)(forwardPointer >>> 32);
                    segment = overflowSegments[overflowSegNum];
                    bucketOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                    countInSegment = segment.getInt(bucketOffset + 4);
                    pointerOffset = bucketOffset + 52;
                    numInSegment = 0;
                }
                segment = this.buckets[i];
            }
        }
        this.compactionMemory.setPartitionNumber(partitionNumber);
        this.partitions.add(partitionNumber, this.compactionMemory);
        this.partitions.get((int)partitionNumber).overflowSegments = partition.overflowSegments;
        this.partitions.get((int)partitionNumber).numOverflowSegments = partition.numOverflowSegments;
        this.partitions.get((int)partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
        this.partitions.get(partitionNumber).setCompaction(true);
        this.compactionMemory = partition;
        this.compactionMemory.resetRecordCounter();
        this.compactionMemory.setPartitionNumber(-1);
        this.compactionMemory.overflowSegments = null;
        this.compactionMemory.numOverflowSegments = 0;
        this.compactionMemory.nextOverflowBucket = 0;
        this.compactionMemory.clearAllMemory(this.availableMemory);
        int maxSegmentNumber = this.getMaxPartition();
        this.compactionMemory.allocateSegments(maxSegmentNumber);
        this.compactionMemory.resetRWViews();
        this.compactionMemory.pushDownPages();
    }

    private void fastCompactPartition(int partitionNumber) throws IOException {
        if (this.partitions.get(partitionNumber).isCompacted()) {
            return;
        }
    }

    private static final int hash(int code) {
        code = code + 2127912214 + (code << 12);
        code = code ^ 0xC761C23C ^ code >>> 19;
        code = code + 374761393 + (code << 5);
        code = code + -744332180 ^ code << 9;
        code = code + -42973499 + (code << 3);
        return (code = code ^ 0xB55A4F09 ^ code >>> 16) >= 0 ? code : -(code + 1);
    }

    public final class HashTableProber<PT>
    extends AbstractHashTableProber<PT, T> {
        private InMemoryPartition<T> partition;
        private MemorySegment bucket;
        private int pointerOffsetInBucket;

        private HashTableProber(TypeComparator<PT> probeTypeComparator, TypePairComparator<PT, T> pairComparator) {
            super(probeTypeComparator, pairComparator);
        }

        @Override
        public T getMatchFor(PT probeSideRecord, T targetForMatch) {
            if (CompactingHashTable.this.closed.get()) {
                return null;
            }
            int searchHashCode = CompactingHashTable.hash(this.probeTypeComparator.hash(probeSideRecord));
            int posHashCode = searchHashCode % CompactingHashTable.this.numBuckets;
            MemorySegment bucket = CompactingHashTable.this.buckets[posHashCode >> CompactingHashTable.this.bucketsPerSegmentBits];
            int bucketInSegmentOffset = (posHashCode & CompactingHashTable.this.bucketsPerSegmentMask) << 7;
            byte partitionNumber = bucket.get(bucketInSegmentOffset + 0);
            InMemoryPartition p = (InMemoryPartition)CompactingHashTable.this.partitions.get(partitionNumber);
            MemorySegment[] overflowSegments = p.overflowSegments;
            this.pairComparator.setReference(probeSideRecord);
            int countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
            int numInSegment = 0;
            int posInSegment = bucketInSegmentOffset + 16;
            while (true) {
                if (numInSegment < countInSegment) {
                    int thisCode = bucket.getInt(posInSegment);
                    posInSegment += 4;
                    if (thisCode == searchHashCode) {
                        int pointerOffset = bucketInSegmentOffset + 52 + numInSegment * 8;
                        long pointer = bucket.getLong(pointerOffset);
                        ++numInSegment;
                        try {
                            if (!this.pairComparator.equalToReference(targetForMatch = p.readRecordAt(pointer, targetForMatch))) continue;
                            this.partition = p;
                            this.bucket = bucket;
                            this.pointerOffsetInBucket = pointerOffset;
                            return targetForMatch;
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
                        }
                    }
                    ++numInSegment;
                    continue;
                }
                long forwardPointer = bucket.getLong(bucketInSegmentOffset + 8);
                if (forwardPointer == -1L) {
                    return null;
                }
                int overflowSegNum = (int)(forwardPointer >>> 32);
                bucket = overflowSegments[overflowSegNum];
                bucketInSegmentOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                countInSegment = bucket.getInt(bucketInSegmentOffset + 4);
                posInSegment = bucketInSegmentOffset + 16;
                numInSegment = 0;
            }
        }

        @Override
        public void updateMatch(T record) throws IOException {
            long newPointer;
            if (CompactingHashTable.this.closed.get()) {
                return;
            }
            try {
                newPointer = this.partition.appendRecord(record);
            }
            catch (EOFException e) {
                try {
                    int partitionNumber = this.partition.getPartitionNumber();
                    CompactingHashTable.this.compactPartition(partitionNumber);
                    this.partition = (InMemoryPartition)CompactingHashTable.this.partitions.get(partitionNumber);
                    newPointer = this.partition.appendRecord(record);
                }
                catch (EOFException ex) {
                    throw new RuntimeException("Memory ran out. Compaction failed. " + CompactingHashTable.this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                }
                catch (IndexOutOfBoundsException ex) {
                    throw new RuntimeException("Memory ran out. Compaction failed. " + CompactingHashTable.this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                }
            }
            catch (IndexOutOfBoundsException e) {
                try {
                    int partitionNumber = this.partition.getPartitionNumber();
                    CompactingHashTable.this.compactPartition(partitionNumber);
                    this.partition = (InMemoryPartition)CompactingHashTable.this.partitions.get(partitionNumber);
                    newPointer = this.partition.appendRecord(record);
                }
                catch (EOFException ex) {
                    throw new RuntimeException("Memory ran out. Compaction failed. " + CompactingHashTable.this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                }
                catch (IndexOutOfBoundsException ex) {
                    throw new RuntimeException("Memory ran out. Compaction failed. " + CompactingHashTable.this.getMemoryConsumptionString() + " Message: " + ex.getMessage());
                }
            }
            this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
            this.partition.setCompaction(false);
        }
    }

    public class EntryIterator
    implements MutableObjectIterator<T> {
        private CompactingHashTable<T> table;
        private ArrayList<T> cache;
        private int currentBucketIndex = 0;
        private int currentSegmentIndex = 0;
        private int currentBucketOffset = 0;
        private int bucketsPerSegment;
        private boolean done;

        private EntryIterator(CompactingHashTable<T> compactingHashTable2) {
            this.table = compactingHashTable2;
            this.cache = new ArrayList(64);
            this.done = false;
            this.bucketsPerSegment = this.table.bucketsPerSegmentMask + 1;
        }

        public T next(T reuse) throws IOException {
            if (this.done || this.table.closed.get()) {
                return null;
            }
            if (!this.cache.isEmpty()) {
                reuse = this.cache.remove(this.cache.size() - 1);
                return reuse;
            }
            while (!this.done && this.cache.isEmpty()) {
                this.done = !this.fillCache();
            }
            if (!this.done) {
                reuse = this.cache.remove(this.cache.size() - 1);
                return reuse;
            }
            return null;
        }

        private boolean fillCache() throws IOException {
            if (this.currentBucketIndex >= this.table.numBuckets) {
                return false;
            }
            MemorySegment bucket = this.table.buckets[this.currentSegmentIndex];
            byte partitionNumber = bucket.get(this.currentBucketOffset + 0);
            InMemoryPartition partition = (InMemoryPartition)this.table.partitions.get(partitionNumber);
            MemorySegment[] overflowSegments = partition.overflowSegments;
            int countInSegment = bucket.getInt(this.currentBucketOffset + 4);
            int numInSegment = 0;
            int posInSegment = this.currentBucketOffset + 52;
            int bucketOffset = this.currentBucketOffset;
            while (true) {
                if (numInSegment < countInSegment) {
                    long pointer = bucket.getLong(posInSegment);
                    posInSegment += 8;
                    ++numInSegment;
                    Object target = this.table.buildSideSerializer.createInstance();
                    try {
                        target = partition.readRecordAt(pointer, target);
                        this.cache.add(target);
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
                    }
                }
                long forwardPointer = bucket.getLong(bucketOffset + 8);
                if (forwardPointer == -1L) break;
                int overflowSegNum = (int)(forwardPointer >>> 32);
                bucket = overflowSegments[overflowSegNum];
                bucketOffset = (int)(forwardPointer & 0xFFFFFFFFFFFFFFFFL);
                countInSegment = bucket.getInt(bucketOffset + 4);
                posInSegment = bucketOffset + 52;
                numInSegment = 0;
            }
            ++this.currentBucketIndex;
            if (this.currentBucketIndex % this.bucketsPerSegment == 0) {
                ++this.currentSegmentIndex;
                this.currentBucketOffset = 0;
            } else {
                this.currentBucketOffset += 128;
            }
            return true;
        }
    }
}

