/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.join;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LateralJoinBatch
extends AbstractBinaryRecordBatch<LateralJoinPOP>
implements LateralContract {
    private static final Logger logger = LoggerFactory.getLogger(LateralJoinBatch.class);
    private int maxOutputRowCount;
    private BatchSchema leftSchema;
    private BatchSchema rightSchema;
    private int outputIndex;
    private int leftJoinIndex = -1;
    private int rightJoinIndex = -1;
    private boolean processLeftBatchInFuture;
    private boolean matchedRecordFound;
    private boolean useMemoryManager = true;
    private boolean isNewLeftBatch;
    private final HashSet<String> excludedFieldNames = new HashSet();
    private final String implicitColumn;
    private boolean hasRemainderForLeftJoin;
    private ValueVector implicitVector;
    private final Map<ValueVector, ValueVector> leftInputOutputVector = new HashMap<ValueVector, ValueVector>();
    private final Map<ValueVector, ValueVector> rightInputOutputVector = new HashMap<ValueVector, ValueVector>();

    public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
        super(popConfig, context, left, right);
        Preconditions.checkNotNull(left);
        Preconditions.checkNotNull(right);
        int configOutputBatchSize = (int)context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
        RecordBatchStats.printConfiguredBatchSize(this.getRecordBatchStatsContext(), configOutputBatchSize);
        this.implicitColumn = popConfig.getImplicitRIDColumn();
        this.populateExcludedField(popConfig);
        this.batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, this.excludedFieldNames);
        this.maxOutputRowCount = this.batchMemoryManager.getOutputRowCount();
    }

    private boolean handleRemainingLeftRows() {
        Preconditions.checkState(((LateralJoinPOP)this.popConfig).getJoinType() == JoinRelType.LEFT, "Unexpected leftover rows from previous left batch when join type is not left join");
        while (this.leftJoinIndex < this.left.getRecordCount() && !this.isOutgoingBatchFull()) {
            this.emitLeft(this.leftJoinIndex, this.outputIndex, 1);
            ++this.outputIndex;
            ++this.leftJoinIndex;
        }
        return this.leftJoinIndex >= this.left.getRecordCount();
    }

    @Override
    public RecordBatch.IterOutcome innerNext() {
        if (this.hasRemainderForLeftJoin) {
            boolean hasMoreRows;
            this.allocateVectors();
            boolean bl = hasMoreRows = !this.handleRemainingLeftRows();
            if (this.leftUpstream == RecordBatch.IterOutcome.EMIT || hasMoreRows) {
                logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully consumed now in output batch");
                this.hasRemainderForLeftJoin = hasMoreRows;
                this.finalizeOutputContainer();
                return this.leftUpstream == RecordBatch.IterOutcome.EMIT ? RecordBatch.IterOutcome.EMIT : RecordBatch.IterOutcome.OK;
            }
            this.leftJoinIndex = -1;
            VectorAccessibleUtilities.clear(this.left);
        }
        RecordBatch.IterOutcome childOutcome = this.processLeftBatch();
        logger.debug("Received left batch with outcome {}", (Object)childOutcome);
        if (this.processLeftBatchInFuture && this.hasRemainderForLeftJoin) {
            this.finalizeOutputContainer();
            this.hasRemainderForLeftJoin = false;
            return RecordBatch.IterOutcome.OK;
        }
        this.processLeftBatchInFuture = false;
        this.hasRemainderForLeftJoin = false;
        if (this.isTerminalOutcome(childOutcome) || this.left.getRecordCount() == 0) {
            this.container.setRecordCount(0);
            return childOutcome;
        }
        childOutcome = this.processRightBatch();
        logger.debug("Received right batch with outcome {}", (Object)childOutcome);
        if (childOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
            this.leftUpstream = this.leftUpstream != RecordBatch.IterOutcome.EMIT ? RecordBatch.IterOutcome.OK : this.leftUpstream;
            this.rightUpstream = RecordBatch.IterOutcome.OK;
            return childOutcome;
        }
        if (this.isTerminalOutcome(childOutcome)) {
            return childOutcome;
        }
        if (this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
            this.handleSchemaChange();
        }
        this.state = AbstractRecordBatch.BatchState.NOT_FIRST;
        this.updateMemoryManager(0);
        this.updateMemoryManager(1);
        if (this.outputIndex > 0 && this.useMemoryManager) {
            this.setMaxOutputRowCount(this.batchMemoryManager.getCurrentOutgoingMaxRowCount());
        }
        this.allocateVectors();
        return this.produceOutputBatch();
    }

    @Override
    public void close() {
        this.updateBatchMemoryManagerStats();
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(0), this.batchMemoryManager.getAvgInputBatchSize(0), this.batchMemoryManager.getAvgInputRowWidth(0), this.batchMemoryManager.getTotalInputRecords(0));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(1), this.batchMemoryManager.getAvgInputBatchSize(1), this.batchMemoryManager.getAvgInputRowWidth(1), this.batchMemoryManager.getTotalInputRecords(1));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumOutgoingBatches(), this.batchMemoryManager.getAvgOutputBatchSize(), this.batchMemoryManager.getAvgOutputRowWidth(), this.batchMemoryManager.getTotalOutputRecords());
        super.close();
    }

    @Override
    public int getRecordCount() {
        return this.container.getRecordCount();
    }

    @Override
    public RecordBatch getIncoming() {
        Preconditions.checkState(this.left != null, "Retuning null left batch. It's unexpected since right side will only be called iff there is any valid left batch");
        return this.left;
    }

    @Override
    public int getRecordIndex() {
        Preconditions.checkState(this.leftJoinIndex < this.left.getRecordCount(), "Left join index: %s is out of bounds: %s", this.leftJoinIndex, this.left.getRecordCount());
        return this.leftJoinIndex;
    }

    @Override
    public RecordBatch.IterOutcome getLeftOutcome() {
        return this.leftUpstream;
    }

    @Override
    protected boolean prefetchFirstBatchFromBothSides() {
        this.leftUpstream = this.next(0, this.left);
        boolean validBatch = this.setBatchState(this.leftUpstream);
        if (validBatch) {
            this.isNewLeftBatch = true;
            this.rightUpstream = this.next(1, this.right);
            validBatch = this.setBatchState(this.rightUpstream);
        }
        if (this.leftUpstream == RecordBatch.IterOutcome.EMIT || this.rightUpstream == RecordBatch.IterOutcome.EMIT) {
            throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in buildSchema phase");
        }
        return validBatch;
    }

    @Override
    protected void buildSchema() {
        if (!this.prefetchFirstBatchFromBothSides()) {
            return;
        }
        Preconditions.checkState(this.right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
        this.setupNewSchema();
        VectorAccessibleUtilities.clear(this.right);
        this.leftJoinIndex = this.left.getRecordCount() <= 0 ? -1 : 0;
        this.rightJoinIndex = -1;
        this.leftUpstream = RecordBatch.IterOutcome.OK;
        this.rightUpstream = RecordBatch.IterOutcome.OK;
    }

    @Override
    protected void cancelIncoming() {
        this.left.cancel();
        this.right.cancel();
    }

    private void handleSchemaChange() {
        try {
            this.stats.startSetup();
            logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", (Object)this.container.getSchema());
            this.setupNewSchema();
        }
        finally {
            this.stats.stopSetup();
        }
    }

    private boolean isTerminalOutcome(RecordBatch.IterOutcome outcome) {
        return outcome == RecordBatch.IterOutcome.NONE;
    }

    private RecordBatch.IterOutcome processLeftBatch() {
        boolean needLeftBatch;
        boolean bl = needLeftBatch = this.leftJoinIndex == -1;
        block9: while (needLeftBatch) {
            if (!this.processLeftBatchInFuture) {
                this.leftUpstream = this.next(0, this.left);
                this.isNewLeftBatch = true;
            }
            boolean emptyLeftBatch = this.left.getRecordCount() <= 0;
            logger.trace("Received a left batch and isEmpty: {}", (Object)emptyLeftBatch);
            switch (this.leftUpstream) {
                case OK_NEW_SCHEMA: {
                    if (this.outputIndex > 0) {
                        this.processLeftBatchInFuture = true;
                        return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
                    }
                    if (emptyLeftBatch) {
                        this.handleSchemaChange();
                        this.leftJoinIndex = -1;
                        return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
                    }
                }
                case OK: {
                    if (emptyLeftBatch) {
                        this.leftJoinIndex = -1;
                        continue block9;
                    }
                    this.leftJoinIndex = 0;
                    break;
                }
                case EMIT: {
                    if (emptyLeftBatch) {
                        this.leftJoinIndex = -1;
                        return RecordBatch.IterOutcome.EMIT;
                    }
                    this.leftJoinIndex = 0;
                    break;
                }
                case NONE: {
                    if (this.outputIndex > 0) {
                        this.processLeftBatchInFuture = true;
                    }
                    return this.leftUpstream;
                }
                case NOT_YET: {
                    try {
                        Thread.sleep(5L);
                    }
                    catch (InterruptedException ex) {
                        logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it received NOT_YET");
                    }
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected iter outcome: " + this.leftUpstream.name());
                }
            }
            needLeftBatch = this.leftJoinIndex == -1;
        }
        return this.leftUpstream;
    }

    private RecordBatch.IterOutcome processRightBatch() {
        boolean needNewRightBatch;
        boolean bl = needNewRightBatch = this.leftJoinIndex >= 0 && this.rightJoinIndex == -1;
        block8: while (needNewRightBatch) {
            this.rightUpstream = this.next(1, this.right);
            switch (this.rightUpstream) {
                case OK_NEW_SCHEMA: {
                    if (this.outputIndex > 0) {
                        throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of current left batch or a new non-empty left batch with no schema change");
                    }
                    this.handleSchemaChange();
                    this.container.setEmpty();
                    this.rightJoinIndex = this.right.getRecordCount() > 0 ? 0 : -1;
                    return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
                }
                case OK: 
                case EMIT: {
                    this.rightJoinIndex = this.right.getRecordCount() > 0 ? 0 : -1;
                    needNewRightBatch = false;
                    continue block8;
                }
                case NONE: {
                    needNewRightBatch = false;
                    continue block8;
                }
                case NOT_YET: {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException ex) {
                        logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it received NOT_YET");
                    }
                    continue block8;
                }
            }
            throw new IllegalStateException("Unexpected iter outcome: " + this.leftUpstream.name());
        }
        return this.rightUpstream;
    }

    private RecordBatch.IterOutcome produceOutputBatch() {
        boolean isLeftProcessed = false;
        while (!this.isOutgoingBatchFull()) {
            boolean isRightProcessed;
            this.crossJoinAndOutputRecords();
            boolean bl = isRightProcessed = this.rightJoinIndex == -1 || this.rightJoinIndex >= this.right.getRecordCount();
            if (isRightProcessed) {
                VectorAccessibleUtilities.clear(this.right);
                this.rightJoinIndex = -1;
            }
            if (isRightProcessed && this.rightUpstream == RecordBatch.IterOutcome.EMIT && this.matchedRecordFound) {
                ++this.leftJoinIndex;
                this.matchedRecordFound = false;
            }
            boolean bl2 = isLeftProcessed = this.rightUpstream == RecordBatch.IterOutcome.EMIT && this.leftJoinIndex >= this.left.getRecordCount();
            if (!isLeftProcessed && this.rightUpstream == RecordBatch.IterOutcome.EMIT && isRightProcessed) {
                if (((LateralJoinPOP)this.popConfig).getJoinType() == JoinRelType.LEFT) {
                    isLeftProcessed = this.handleRemainingLeftRows();
                    this.hasRemainderForLeftJoin = !isLeftProcessed;
                } else {
                    isLeftProcessed = true;
                }
            }
            if (isLeftProcessed) {
                this.leftJoinIndex = -1;
                VectorAccessibleUtilities.clear(this.left);
                this.matchedRecordFound = false;
            }
            if (this.isOutgoingBatchFull()) continue;
            if (isLeftProcessed) {
                if (this.leftUpstream == RecordBatch.IterOutcome.EMIT || this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) break;
                logger.debug("Output batch still has some space left, getting new batches from left and right. OutIndex: {}", (Object)this.outputIndex);
                this.leftUpstream = this.processLeftBatch();
                logger.debug("Received left batch with outcome {}", (Object)this.leftUpstream);
                if (this.processLeftBatchInFuture) {
                    logger.debug("Received left batch such that we have to return the current outgoing batch and process the new batch in subsequent next call");
                    this.finalizeOutputContainer();
                    return RecordBatch.IterOutcome.OK;
                }
                if (this.isTerminalOutcome(this.leftUpstream)) {
                    this.finalizeOutputContainer();
                    return this.leftUpstream;
                }
                if ((this.leftUpstream == RecordBatch.IterOutcome.EMIT || this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) && this.left.getRecordCount() == 0) {
                    isLeftProcessed = true;
                    break;
                }
                this.updateMemoryManager(0);
            }
            this.rightUpstream = this.processRightBatch();
            logger.debug("Received right batch with outcome {}", (Object)this.rightUpstream);
            if (this.rightUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
                this.leftUpstream = this.leftUpstream != RecordBatch.IterOutcome.EMIT ? RecordBatch.IterOutcome.OK : this.leftUpstream;
                this.rightUpstream = RecordBatch.IterOutcome.OK;
                this.finalizeOutputContainer();
                return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
            }
            if (this.isTerminalOutcome(this.rightUpstream)) {
                this.finalizeOutputContainer();
                return this.rightUpstream;
            }
            this.updateMemoryManager(1);
            if (this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA && this.outputIndex == 0) {
                this.handleSchemaChange();
                this.allocateVectors();
                continue;
            }
            if (!this.useMemoryManager) continue;
            this.setMaxOutputRowCount(this.batchMemoryManager.getCurrentOutgoingMaxRowCount());
        }
        this.finalizeOutputContainer();
        if (this.leftUpstream == RecordBatch.IterOutcome.EMIT && isLeftProcessed) {
            logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully consumed in output batch");
            return RecordBatch.IterOutcome.EMIT;
        }
        if (this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
            logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set of batches");
            this.leftUpstream = RecordBatch.IterOutcome.OK;
            return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        return RecordBatch.IterOutcome.OK;
    }

    private void finalizeOutputContainer() {
        VectorAccessibleUtilities.setValueCount(this.container, this.outputIndex);
        this.container.setRecordCount(this.outputIndex);
        this.batchMemoryManager.updateOutgoingStats(this.outputIndex);
        RecordBatchStats.logRecordBatchStats(RecordBatchStats.RecordBatchIOType.OUTPUT, this, this.getRecordBatchStatsContext());
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]", this.outputIndex, this.container.getAllocator().getAllocatedMemory(), this.container.getAllocator().getPeakMemoryAllocation());
        this.outputIndex = 0;
    }

    private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
        return newSchema == null || oldSchema == null || !newSchema.isEquivalent(oldSchema);
    }

    private boolean verifyInputSchema(BatchSchema schema) {
        boolean isValid = true;
        if (schema == null) {
            logger.error("Null schema found for the incoming batch");
            isValid = false;
        } else {
            BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
            if (svMode != BatchSchema.SelectionVectorMode.NONE) {
                logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}", (Object)svMode.toString());
                isValid = false;
            }
        }
        return isValid;
    }

    private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema, boolean isRightBatch) {
        if (this.excludedFieldNames.size() == 0) {
            return originSchema;
        }
        SchemaBuilder newSchemaBuilder = BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
        for (MaterializedField field : originSchema) {
            if (this.excludedFieldNames.contains(field.getName()) && (!field.getName().equals(this.implicitColumn) || isRightBatch)) continue;
            newSchemaBuilder.addField(field);
        }
        return newSchemaBuilder.build();
    }

    private void setupNewSchema() {
        logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}", (Object)this.left.getSchema(), (Object)this.right.getSchema());
        this.container.clear();
        this.leftInputOutputVector.clear();
        this.rightInputOutputVector.clear();
        this.leftSchema = this.batchSchemaWithNoExcludedCols(this.left.getSchema(), false);
        this.rightSchema = this.batchSchemaWithNoExcludedCols(this.right.getSchema(), true);
        if (!this.verifyInputSchema(this.leftSchema)) {
            throw UserException.schemaChangeError().message("Invalid Schema found for left incoming batch of lateral join", new Object[0]).build(logger);
        }
        if (!this.verifyInputSchema(this.rightSchema)) {
            throw UserException.schemaChangeError().message("Invalid Schema found for right incoming batch of lateral join", new Object[0]).build(logger);
        }
        for (VectorWrapper vectorWrapper : this.left) {
            MaterializedField leftField = vectorWrapper.getField();
            if (this.excludedFieldNames.contains(leftField.getName()) && !leftField.getName().equals(this.implicitColumn)) continue;
            this.container.addOrGet(leftField);
        }
        for (VectorWrapper vectorWrapper : this.right) {
            MaterializedField rightField = vectorWrapper.getField();
            if (this.excludedFieldNames.contains(rightField.getName())) {
                if (!rightField.getName().equals(this.implicitColumn)) continue;
                this.implicitVector = vectorWrapper.getValueVector();
                continue;
            }
            TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
            if (((LateralJoinPOP)this.popConfig).getJoinType() == JoinRelType.LEFT && rightFieldType.getMode() == TypeProtos.DataMode.REQUIRED) {
                TypeProtos.MajorType outputType = Types.overrideMode(rightField.getType(), TypeProtos.DataMode.OPTIONAL);
                rightField = rightField.withType(outputType);
            }
            this.container.addOrGet(rightField);
        }
        Preconditions.checkState(this.implicitVector != null, "Implicit column vector %s not found in right incoming batch", (Object)this.implicitColumn);
        this.outputIndex = 0;
        this.container.setRecordCount(this.outputIndex);
        this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        this.setupInputOutputVectors(this.left, 0, this.leftSchema.getFieldCount(), 0, false);
        this.setupInputOutputVectors(this.right, 0, this.rightSchema.getFieldCount(), this.leftSchema.getFieldCount(), true);
        logger.debug("Output Schema created {} based on input left schema {} and right schema {}", new Object[]{this.container.getSchema(), this.leftSchema, this.rightSchema});
    }

    private void allocateVectors() {
        if (this.outputIndex > 0) {
            logger.trace("Allocation is already done for output container vectors since it already holds some record");
            return;
        }
        if (this.useMemoryManager) {
            this.setMaxOutputRowCount(this.batchMemoryManager.getOutputRowCount());
        }
        for (VectorWrapper<?> w : this.container) {
            RecordBatchSizer.ColumnSize colSize = this.batchMemoryManager.getColumnSize(w.getField().getName());
            colSize.allocateVector((ValueVector)w.getValueVector(), this.maxOutputRowCount);
        }
        logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", (Object)this.container.getAllocator().getAllocatedMemory(), (Object)this.container.getAllocator().getPeakMemoryAllocation());
    }

    private boolean setBatchState(RecordBatch.IterOutcome outcome) {
        switch (outcome) {
            case EMIT: 
            case NONE: 
            case NOT_YET: {
                this.state = AbstractRecordBatch.BatchState.DONE;
                return false;
            }
        }
        return true;
    }

    private Map<Integer, Integer> getRowIdToRowCountMapping() {
        HashMap<Integer, Integer> indexToFreq = new HashMap<Integer, Integer>();
        IntVector rowIdVector = (IntVector)this.implicitVector;
        int prevRowId = rowIdVector.getAccessor().get(this.rightJoinIndex);
        int countRows = 1;
        for (int i = this.rightJoinIndex + 1; i < this.right.getRecordCount(); ++i) {
            int currentRowId = rowIdVector.getAccessor().get(i);
            if (prevRowId == currentRowId) {
                ++countRows;
                continue;
            }
            indexToFreq.put(prevRowId, countRows);
            prevRowId = currentRowId;
            countRows = 1;
        }
        indexToFreq.put(prevRowId, countRows);
        return indexToFreq;
    }

    private void crossJoinAndOutputRecords() {
        int rightRecordCount = this.right.getRecordCount();
        if (rightRecordCount <= 0) {
            return;
        }
        Preconditions.checkState(this.rightJoinIndex != -1, "Right batch record count is >0 but index is -1");
        int currentOutIndex = this.outputIndex;
        int maxAvailableRowSlot = this.maxOutputRowCount - currentOutIndex;
        if (logger.isDebugEnabled()) {
            logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and availableSlotInOutput: {}", new Object[]{this.leftJoinIndex, this.rightJoinIndex, rightRecordCount, this.outputIndex, maxAvailableRowSlot});
            logger.debug("Output Batch stats before copying new data: {}", (Object)new RecordBatchSizer(this));
        }
        IntVector rowIdVector = (IntVector)this.implicitVector;
        int leftRecordCount = this.left.getRecordCount();
        while (maxAvailableRowSlot > 0 && this.rightJoinIndex < rightRecordCount) {
            int currentRowId = rowIdVector.getAccessor().get(this.rightJoinIndex);
            int leftRowId = this.leftJoinIndex + 1;
            int numRowsCopied = 0;
            if (currentRowId > leftRecordCount || this.leftJoinIndex > leftRecordCount) {
                throw new IllegalStateException(String.format("Either RowId in right batch is greater than total records in left batch or all rows in left batch is processed but there are still rows in right batch. Details[RightRowId: %s, LeftRecordCount: %s, LeftJoinIndex: %s, RightJoinIndex: %s]", currentRowId, leftRecordCount, this.leftJoinIndex, this.rightJoinIndex));
            }
            if (logger.isTraceEnabled()) {
                logger.trace("leftRowId and currentRowId are: {}, {}", (Object)leftRowId, (Object)currentRowId);
            }
            if (leftRowId == currentRowId) {
                this.matchedRecordFound = true;
                numRowsCopied = 1;
                this.emitRight(this.rightJoinIndex, this.outputIndex, numRowsCopied);
                this.emitLeft(this.leftJoinIndex, this.outputIndex, numRowsCopied);
                this.outputIndex += numRowsCopied;
                this.rightJoinIndex += numRowsCopied;
            } else if (leftRowId < currentRowId) {
                if (this.matchedRecordFound) {
                    this.matchedRecordFound = false;
                    ++this.leftJoinIndex;
                    continue;
                }
                if (JoinRelType.LEFT == ((LateralJoinPOP)this.popConfig).getJoinType()) {
                    numRowsCopied = 1;
                    this.emitLeft(this.leftJoinIndex, this.outputIndex, numRowsCopied);
                    ++this.outputIndex;
                }
                ++this.leftJoinIndex;
            } else {
                Preconditions.checkState(leftRowId <= currentRowId, "Unexpected case where rowId %s in right batch of lateral is smaller than rowId %s in left batch being processed", currentRowId, leftRowId);
            }
            maxAvailableRowSlot -= numRowsCopied;
        }
    }

    private void setupInputOutputVectors(RecordBatch batch, int startVectorIndex, int endVectorIndex, int baseVectorIndex, boolean isRightBatch) {
        int inputIndex = 0;
        Map<ValueVector, ValueVector> mappingToUse = isRightBatch ? this.rightInputOutputVector : this.leftInputOutputVector;
        for (int i = startVectorIndex; i < endVectorIndex; ++i) {
            Class<?> inputValueClass;
            Object inputVector;
            String inputFieldName;
            int outputVectorIndex = i + baseVectorIndex;
            Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
            Object outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
            String outputFieldName = outputVector.getField().getName();
            while (!(inputFieldName = (inputVector = batch.getValueAccessorById(inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass(), inputIndex++).getValueVector()).getField().getName()).equals(this.implicitColumn) || isRightBatch) {
                ++inputIndex;
                if (this.excludedFieldNames.contains(inputFieldName)) continue;
            }
            Preconditions.checkState(outputFieldName.equals(inputFieldName), "Non-excluded Input and output container fields are not in same order. [Output Schema: %s and Input Schema:%s]", (Object)this.getSchema(), (Object)batch.getSchema());
            mappingToUse.put((ValueVector)inputVector, (ValueVector)outputVector);
        }
    }

    private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, Map<ValueVector, ValueVector> mapping, int numRowsToCopy, boolean isRightBatch) {
        for (Map.Entry<ValueVector, ValueVector> entry : mapping.entrySet()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: (RowIndex: {}, ColumnName: {}), OutputBatch: (RowIndex: {}, ColumnName: {}) and Other: (TimeEachValue: {})]", new Object[]{fromRowIndex, entry.getKey().getField().getName(), toRowIndex, entry.getValue().getField().getName(), numRowsToCopy});
            }
            for (int j = 0; j < numRowsToCopy; ++j) {
                entry.getValue().copyEntry(toRowIndex + j, entry.getKey(), isRightBatch ? fromRowIndex + j : fromRowIndex);
            }
        }
    }

    private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
        if (logger.isTraceEnabled()) {
            logger.trace("Copying the left batch data. Details: [leftIndex: {}, outputIndex: {}, numsCopy: {}]", new Object[]{leftIndex, outIndex, numRowsToCopy});
        }
        this.copyDataToOutputVectors(leftIndex, outIndex, this.leftInputOutputVector, numRowsToCopy, false);
    }

    private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
        if (logger.isTraceEnabled()) {
            logger.trace("Copying the right batch data. Details: [rightIndex: {}, outputIndex: {}, numsCopy: {}]", new Object[]{rightIndex, outIndex, numRowsToCopy});
        }
        this.copyDataToOutputVectors(rightIndex, outIndex, this.rightInputOutputVector, numRowsToCopy, true);
    }

    @VisibleForTesting
    public void setMaxOutputRowCount(int outputRowCount) {
        if (this.isRecordBatchStatsLoggingEnabled()) {
            RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "Previous OutputRowCount: %d, New OutputRowCount: %d", this.maxOutputRowCount, outputRowCount);
        }
        this.maxOutputRowCount = outputRowCount;
    }

    @VisibleForTesting
    public void setUseMemoryManager(boolean useMemoryManager) {
        this.useMemoryManager = useMemoryManager;
    }

    private boolean isOutgoingBatchFull() {
        return this.outputIndex >= this.maxOutputRowCount;
    }

    private void updateMemoryManager(int inputIndex) {
        if (inputIndex == 0 && this.isNewLeftBatch) {
            this.isNewLeftBatch = false;
        } else if (inputIndex != 1 || this.rightJoinIndex != 0 && this.rightJoinIndex != -1) {
            return;
        }
        this.batchMemoryManager.update(inputIndex, this.outputIndex);
        if (this.isRecordBatchStatsLoggingEnabled()) {
            RecordBatchStats.RecordBatchIOType type = inputIndex == 0 ? RecordBatchStats.RecordBatchIOType.INPUT_LEFT : RecordBatchStats.RecordBatchIOType.INPUT_RIGHT;
            RecordBatchStats.logRecordBatchStats(type, this.batchMemoryManager.getRecordBatchSizer(inputIndex), this.getRecordBatchStatsContext());
        }
    }

    private void populateExcludedField(PhysicalOperator lateralPop) {
        this.excludedFieldNames.add(this.implicitColumn);
        List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
        if (excludedCols != null) {
            for (SchemaPath currentPath : excludedCols) {
                this.excludedFieldNames.add(currentPath.rootName());
            }
        }
    }

    @Override
    public void dump() {
        logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]", new Object[]{this.container, this.left, this.right, this.leftUpstream, this.rightUpstream, this.leftSchema, this.rightSchema, this.outputIndex, this.leftJoinIndex, this.rightJoinIndex, this.hasRemainderForLeftJoin});
    }
}

