/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.join;

import com.sun.codemodel.JAssignmentTarget;
import com.sun.codemodel.JClass;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JFieldVar;
import com.sun.codemodel.JStatement;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
import java.util.HashSet;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.join.JoinStatus;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinWorker;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.RecordIterator;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeJoinBatch
extends AbstractBinaryRecordBatch<MergeJoinPOP> {
    private static final Logger logger = LoggerFactory.getLogger(MergeJoinBatch.class);
    private final MappingSet setupMapping = new MappingSet("null", "null", GeneratorMapping.GM("doSetup", "doSetup", null, null), GeneratorMapping.GM("doSetup", "doSetup", null, null));
    private final MappingSet copyLeftMapping = new MappingSet("leftIndex", "outIndex", GeneratorMapping.GM("doSetup", "doSetup", null, null), GeneratorMapping.GM("doSetup", "doCopyLeft", null, null));
    private final MappingSet copyRightMappping = new MappingSet("rightIndex", "outIndex", GeneratorMapping.GM("doSetup", "doSetup", null, null), GeneratorMapping.GM("doSetup", "doCopyRight", null, null));
    private final MappingSet compareMapping = new MappingSet("leftIndex", "rightIndex", GeneratorMapping.GM("doSetup", "doSetup", null, null), GeneratorMapping.GM("doSetup", "doCompare", null, null));
    private final MappingSet compareRightMapping = new MappingSet("rightIndex", "null", GeneratorMapping.GM("doSetup", "doSetup", null, null), GeneratorMapping.GM("doSetup", "doCompare", null, null));
    private final RecordIterator leftIterator;
    private final RecordIterator rightIterator;
    private final JoinStatus status;
    private final List<JoinCondition> conditions;
    private final List<Comparator> comparators;
    private final JoinRelType joinType;
    private JoinWorker worker;

    protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
        super(popConfig, context, true, left, right);
        int configuredBatchSize = (int)context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
        this.batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);
        RecordBatchStats.printConfiguredBatchSize(this.getRecordBatchStatsContext(), configuredBatchSize);
        if (popConfig.getConditions().size() == 0) {
            throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
        }
        this.leftIterator = new RecordIterator(left, this, this.oContext, 0, false, this.batchMemoryManager);
        this.rightIterator = new RecordIterator(right, this, this.oContext, 1, this.batchMemoryManager);
        this.joinType = popConfig.getJoinType();
        this.status = new JoinStatus(this.leftIterator, this.rightIterator, this);
        this.conditions = popConfig.getConditions();
        this.comparators = Lists.newArrayListWithExpectedSize(this.conditions.size());
        for (JoinCondition condition : this.conditions) {
            this.comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(condition));
        }
    }

    public JoinRelType getJoinType() {
        return this.joinType;
    }

    @Override
    public int getRecordCount() {
        return this.status.getOutPosition();
    }

    @Override
    public void buildSchema() {
        this.status.initialize();
        RecordBatch.IterOutcome leftOutcome = this.status.getLeftStatus();
        RecordBatch.IterOutcome rightOutcome = this.status.getRightStatus();
        if (!this.verifyOutcomeToSetBatchState(leftOutcome, rightOutcome)) {
            return;
        }
        this.allocateBatch(true);
        this.container.setEmpty();
    }

    @Override
    public RecordBatch.IterOutcome innerNext() {
        this.status.prepare();
        block17: while (true) {
            boolean isNewSchema = false;
            switch (this.status.getOutcome()) {
                case SCHEMA_CHANGED: {
                    isNewSchema = true;
                }
                case BATCH_RETURNED: {
                    this.allocateBatch(isNewSchema);
                    this.status.resetOutputPos();
                    this.status.setTargetOutputRowCount(this.batchMemoryManager.getOutputRowCount());
                    break;
                }
                case NO_MORE_DATA: {
                    this.status.resetOutputPos();
                    logger.debug("NO MORE DATA; returning NONE");
                    return RecordBatch.IterOutcome.NONE;
                }
                case FAILURE: {
                    this.status.left.clearInflightBatches();
                    this.status.right.clearInflightBatches();
                    throw UserException.executionError(null).message("Merge failed", new Object[0]).build(logger);
                }
                case WAITING: {
                    return RecordBatch.IterOutcome.NOT_YET;
                }
                default: {
                    throw new IllegalStateException();
                }
            }
            boolean first = false;
            if (this.worker == null) {
                try {
                    logger.debug("Creating New Worker");
                    this.stats.startSetup();
                    this.worker = this.generateNewWorker();
                    first = true;
                }
                finally {
                    this.stats.stopSetup();
                }
            }
            if (!this.worker.doJoin(this.status)) {
                this.worker = null;
            }
            switch (this.status.getOutcome()) {
                case BATCH_RETURNED: {
                    logger.debug("BATCH RETURNED; returning {}", (Object)(first ? "OK_NEW_SCHEMA" : "OK"));
                    this.setRecordCountInContainer();
                    return first ? RecordBatch.IterOutcome.OK_NEW_SCHEMA : RecordBatch.IterOutcome.OK;
                }
                case FAILURE: {
                    this.status.left.clearInflightBatches();
                    this.status.right.clearInflightBatches();
                    throw UserException.executionError(null).message("Merge failed", new Object[0]).build(logger);
                }
                case NO_MORE_DATA: {
                    logger.debug("NO MORE DATA; returning {}", (Object)(this.status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" : "NONE")));
                    this.setRecordCountInContainer();
                    this.state = AbstractRecordBatch.BatchState.DONE;
                    return first ? RecordBatch.IterOutcome.OK_NEW_SCHEMA : (this.status.getOutPosition() > 0 ? RecordBatch.IterOutcome.OK : RecordBatch.IterOutcome.NONE);
                }
                case SCHEMA_CHANGED: {
                    this.worker = null;
                    if (this.status.getOutPosition() <= 0) continue block17;
                    logger.debug("SCHEMA CHANGED; returning {} ", (Object)(first ? "OK_NEW_SCHEMA" : "OK"));
                    this.setRecordCountInContainer();
                    return first ? RecordBatch.IterOutcome.OK_NEW_SCHEMA : RecordBatch.IterOutcome.OK;
                }
                case WAITING: {
                    return RecordBatch.IterOutcome.NOT_YET;
                }
            }
            break;
        }
        throw new IllegalStateException();
    }

    private void setRecordCountInContainer() {
        this.container.setValueCount(this.getRecordCount());
        RecordBatchStats.logRecordBatchStats(RecordBatchStats.RecordBatchIOType.OUTPUT, this, this.getRecordBatchStatsContext());
        this.batchMemoryManager.updateOutgoingStats(this.getRecordCount());
    }

    @Override
    public void close() {
        this.updateBatchMemoryManagerStats();
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(0), this.batchMemoryManager.getAvgInputBatchSize(0), this.batchMemoryManager.getAvgInputRowWidth(0), this.batchMemoryManager.getTotalInputRecords(0));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(1), this.batchMemoryManager.getAvgInputBatchSize(1), this.batchMemoryManager.getAvgInputRowWidth(1), this.batchMemoryManager.getTotalInputRecords(1));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumOutgoingBatches(), this.batchMemoryManager.getAvgOutputBatchSize(), this.batchMemoryManager.getAvgOutputRowWidth(), this.batchMemoryManager.getTotalOutputRecords());
        super.close();
        try {
            this.leftIterator.close();
        }
        catch (Exception e) {
            this.rightIterator.close();
            throw UserException.executionError(e).message("Failed to close Iterator.", new Object[0]).build(logger);
        }
        this.rightIterator.close();
    }

    private JoinWorker generateNewWorker() {
        ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, this.context.getOptions());
        cg.getCodeGenerator().plainJavaCapable(true);
        ErrorCollectorImpl collector = new ErrorCollectorImpl();
        cg.setMappingSet(this.setupMapping);
        JClass joinStatusClass = cg.getModel().ref(JoinStatus.class);
        JFieldVar joinStatus = cg.clazz.field(0, (JType)joinStatusClass, "status");
        cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref((JVar)joinStatus), JExpr.direct((String)"status"));
        JClass vectorContainerClass = cg.getModel().ref(VectorContainer.class);
        JFieldVar outgoingVectorContainer = cg.clazz.field(0, (JType)vectorContainerClass, "outgoing");
        cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref((JVar)outgoingVectorContainer), JExpr.direct((String)"outgoing"));
        JClass recordBatchClass = cg.getModel().ref(RecordIterator.class);
        JFieldVar incomingLeftRecordBatch = cg.clazz.field(0, (JType)recordBatchClass, "incomingLeft");
        cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref((JVar)incomingLeftRecordBatch), (JExpression)joinStatus.ref("left"));
        JFieldVar incomingRightRecordBatch = cg.clazz.field(0, (JType)recordBatchClass, "incomingRight");
        cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref((JVar)incomingRightRecordBatch), (JExpression)joinStatus.ref("right"));
        JFieldVar incomingRecordBatch = cg.clazz.field(0, (JType)recordBatchClass, "incoming");
        LogicalExpression[] leftExpr = new LogicalExpression[this.conditions.size()];
        LogicalExpression[] rightExpr = new LogicalExpression[this.conditions.size()];
        RecordBatch.IterOutcome lastLeftStatus = this.status.getLeftStatus();
        RecordBatch.IterOutcome lastRightStatus = this.status.getRightStatus();
        for (int i = 0; i < this.conditions.size(); ++i) {
            JoinCondition condition = this.conditions.get(i);
            leftExpr[i] = this.materializeExpression(condition.getLeft(), lastLeftStatus, this.leftIterator, collector);
            rightExpr[i] = this.materializeExpression(condition.getRight(), lastRightStatus, this.rightIterator, collector);
        }
        if (lastRightStatus != RecordBatch.IterOutcome.NONE) {
            JoinUtils.addLeastRestrictiveCasts(leftExpr, this.leftIterator, rightExpr, this.rightIterator, this.context);
        }
        this.generateDoCompare(cg, (JVar)incomingRecordBatch, leftExpr, (JVar)incomingLeftRecordBatch, rightExpr, (JVar)incomingRightRecordBatch, collector);
        cg.setMappingSet(this.copyLeftMapping);
        int vectorId = 0;
        if (this.worker == null || !this.status.left.finished()) {
            for (Object vw : this.leftIterator) {
                TypeProtos.MajorType inputType = vw.getField().getType();
                TypeProtos.MajorType outputType = this.joinType == JoinRelType.RIGHT && inputType.getMode() == TypeProtos.DataMode.REQUIRED ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                TypedFieldId inTypedFieldId = new TypedFieldId.Builder().finalType(inputType).addId(vectorId).build();
                JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft", inTypedFieldId);
                TypedFieldId outTypedFieldId = new TypedFieldId.Builder().finalType(outputType).addId(vectorId).build();
                JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", outTypedFieldId);
                cg.getEvalBlock().add((JStatement)vvOut.invoke("copyFromSafe").arg((JExpression)this.copyLeftMapping.getValueReadIndex()).arg((JExpression)this.copyLeftMapping.getValueWriteIndex()).arg((JExpression)vvIn));
                cg.rotateBlock();
                ++vectorId;
            }
        }
        cg.setMappingSet(this.copyRightMappping);
        int rightVectorBase = vectorId;
        if (!(this.status.getRightStatus() == RecordBatch.IterOutcome.NONE || this.worker != null && this.status.right.finished())) {
            for (VectorWrapper vw : this.rightIterator) {
                TypeProtos.MajorType inputType = vw.getField().getType();
                TypeProtos.MajorType outputType = this.joinType == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                TypedFieldId inTypedFieldId = new TypedFieldId.Builder().finalType(inputType).addId(vectorId - rightVectorBase).build();
                JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight", inTypedFieldId);
                TypedFieldId outTypedFieldId = new TypedFieldId.Builder().finalType(outputType).addId(vectorId).build();
                JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", outTypedFieldId);
                cg.getEvalBlock().add((JStatement)vvOut.invoke("copyFromSafe").arg((JExpression)this.copyRightMappping.getValueReadIndex()).arg((JExpression)this.copyRightMappping.getValueWriteIndex()).arg((JExpression)vvIn));
                cg.rotateBlock();
                ++vectorId;
            }
        }
        JoinWorker w = this.context.getImplementationClass(cg);
        try {
            w.setupJoin(this.context, this.status, this.container);
        }
        catch (SchemaChangeException e) {
            throw this.schemaChangeException(e, logger);
        }
        return w;
    }

    private void allocateBatch(boolean newSchema) {
        boolean rightAllowed;
        boolean leftAllowed = this.status.getLeftStatus() != RecordBatch.IterOutcome.NONE;
        boolean bl = rightAllowed = this.status.getRightStatus() != RecordBatch.IterOutcome.NONE;
        if (newSchema) {
            Object v;
            MaterializedField newField;
            TypeProtos.MajorType outputType;
            TypeProtos.MajorType inputType;
            this.container.clear();
            if (leftAllowed) {
                for (VectorWrapper<?> w : this.leftIterator) {
                    inputType = w.getField().getType();
                    outputType = this.joinType == JoinRelType.RIGHT && inputType.getMode() == TypeProtos.DataMode.REQUIRED ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                    newField = MaterializedField.create(w.getField().getName(), outputType);
                    v = this.container.addOrGet(newField);
                    if (!(v instanceof AbstractContainerVector)) continue;
                    w.getValueVector().makeTransferPair((ValueVector)v);
                    v.clear();
                }
            }
            if (rightAllowed) {
                for (VectorWrapper<?> w : this.rightIterator) {
                    inputType = w.getField().getType();
                    outputType = this.joinType == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                    newField = MaterializedField.create(w.getField().getName(), outputType);
                    v = this.container.addOrGet(newField);
                    if (!(v instanceof AbstractContainerVector)) continue;
                    w.getValueVector().makeTransferPair((ValueVector)v);
                    v.clear();
                }
            }
        } else {
            this.container.zeroVectors();
        }
        int outputRowCount = this.batchMemoryManager.getOutputRowCount();
        for (VectorWrapper<?> w : this.container) {
            RecordBatchSizer.ColumnSize colSize = this.batchMemoryManager.getColumnSize(w.getField().getName());
            colSize.allocateVector((ValueVector)w.getValueVector(), outputRowCount);
        }
        this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        logger.debug("Built joined schema: {}", (Object)this.container.getSchema());
    }

    private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression, JVar incomingRightRecordBatch, ErrorCollector collector) {
        cg.setMappingSet(this.compareMapping);
        if (this.status.getRightStatus() != RecordBatch.IterOutcome.NONE) {
            assert (leftExpression.length == rightExpression.length);
            for (int i = 0; i < leftExpression.length; ++i) {
                cg.setMappingSet(this.compareMapping);
                cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref(incomingRecordBatch), (JExpression)JExpr._this().ref(incomingLeftRecordBatch));
                ClassGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(leftExpression[i], ClassGenerator.BlkCreateMode.FALSE);
                cg.setMappingSet(this.compareRightMapping);
                cg.getSetupBlock().assign((JAssignmentTarget)JExpr._this().ref(incomingRecordBatch), (JExpression)JExpr._this().ref(incomingRightRecordBatch));
                ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(rightExpression[i], ClassGenerator.BlkCreateMode.FALSE);
                LogicalExpression fh = FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder, compareRightExprHolder, this.context.getFunctionRegistry());
                ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
                if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional() && this.comparators.get(i) == Comparator.EQUALS) {
                    JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit((int)0)).cand(compareRightExprHolder.getIsSet().eq(JExpr.lit((int)0))));
                    jc._then()._return(JExpr.lit((int)1));
                    jc._elseif(out.getValue().ne(JExpr.lit((int)0)))._then()._return((JExpression)out.getValue());
                    continue;
                }
                cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit((int)0)))._then()._return((JExpression)out.getValue());
            }
        }
        cg.getEvalBlock()._return(JExpr.lit((int)0));
    }

    private LogicalExpression materializeExpression(LogicalExpression expression, RecordBatch.IterOutcome lastStatus, VectorAccessible input, ErrorCollector collector) {
        LogicalExpression materializedExpr = lastStatus != RecordBatch.IterOutcome.NONE ? ExpressionTreeMaterializer.materialize(expression, input, collector, this.context.getFunctionRegistry(), this.unionTypeEnabled) : new TypedNullConstant(Types.optional(TypeProtos.MinorType.INT));
        collector.reportErrors(logger);
        return materializedExpr;
    }

    @Override
    public void dump() {
        logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}, rightIterator={}, joinStatus={}, joinType={}]", new Object[]{this.container, this.left, this.right, this.leftUpstream, this.rightUpstream, this.joinType, this.leftIterator, this.rightIterator, this.status, this.joinType});
    }

    private class MergeJoinMemoryManager
    extends JoinBatchMemoryManager {
        MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
            super(outputBatchSize, leftBatch, rightBatch, new HashSet<String>());
        }

        @Override
        public void update(int inputIndex) {
            super.update(inputIndex, MergeJoinBatch.this.status.getOutPosition());
            MergeJoinBatch.this.status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount());
            RecordBatchStats.RecordBatchIOType type = inputIndex == 0 ? RecordBatchStats.RecordBatchIOType.INPUT_LEFT : RecordBatchStats.RecordBatchIOType.INPUT_RIGHT;
            RecordBatchStats.logRecordBatchStats(type, this.getRecordBatchSizer(inputIndex), MergeJoinBatch.this.getRecordBatchStatsContext());
        }
    }
}

