/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.mergereceiver;

import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingReceiverGeneratorBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergingRecordBatch
extends AbstractRecordBatch<MergingReceiverPOP>
implements RecordBatch {
    private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(MergingRecordBatch.class);
    private static final int OUTGOING_BATCH_SIZE = 32768;
    private RecordBatchLoader[] batchLoaders;
    private final RawFragmentBatchProvider[] fragProviders;
    private final ExchangeFragmentContext context;
    private MergingReceiverGeneratorBase merger;
    private final MergingReceiverPOP config;
    private boolean hasRun;
    private boolean outgoingBatchHasSpace = true;
    private boolean hasMoreIncoming = true;
    private int outgoingPosition;
    private int senderCount;
    private RawFragmentBatch[] incomingBatches;
    private int[] batchOffsets;
    private PriorityQueue<Node> pqueue;
    private RawFragmentBatch[] tempBatchHolder;
    private final long[] inputCounts;
    private final long[] outputCounts;
    private final MappingSet MAIN_MAPPING = new MappingSet((String)null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
    private final MappingSet COPIER_MAPPING_SET = new MappingSet(this.COPIER_MAPPING, this.COPIER_MAPPING);

    public MergingRecordBatch(ExchangeFragmentContext context, MergingReceiverPOP config, RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
        super(config, context, true, context.newOperatorContext(config));
        this.fragProviders = fragProviders;
        this.context = context;
        this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
        this.config = config;
        this.inputCounts = new long[config.getNumSenders()];
        this.outputCounts = new long[config.getNumSenders()];
        context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(this.oContext.getAllocator());
    }

    private RawFragmentBatch getNext(int providerIndex) {
        this.stats.startWait();
        RawFragmentBatchProvider provider = this.fragProviders[providerIndex];
        try {
            RawFragmentBatch b;
            injector.injectInterruptiblePause(this.context.getExecutionControls(), "waiting-for-data", logger);
            try {
                b = provider.getNext();
            }
            catch (IOException e) {
                throw UserException.dataReadError(e).addContext("Failed to read incoming merge batch").build(logger);
            }
            if (b != null) {
                this.stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
                this.stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
                int n = providerIndex;
                this.inputCounts[n] = this.inputCounts[n] + (long)b.getHeader().getDef().getRecordCount();
            }
            RawFragmentBatch rawFragmentBatch = b;
            return rawFragmentBatch;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new QueryCancelledException();
        }
        finally {
            this.stats.stopWait();
        }
    }

    private void clearBatches(List<RawFragmentBatch> batches) {
        for (RawFragmentBatch batch : batches) {
            if (batch == null) continue;
            batch.release();
        }
    }

    @Override
    public RecordBatch.IterOutcome innerNext() {
        Node node;
        if (this.fragProviders.length == 0) {
            return RecordBatch.IterOutcome.NONE;
        }
        boolean schemaChanged = false;
        if (!this.outgoingBatchHasSpace) {
            logger.debug("Outgoing vectors were full on last iteration");
            this.allocateOutgoing();
            this.outgoingPosition = 0;
            this.outgoingBatchHasSpace = true;
        }
        if (!this.hasMoreIncoming) {
            logger.debug("next() was called after all values have been processed");
            this.outgoingPosition = 0;
            return RecordBatch.IterOutcome.NONE;
        }
        List<UserBitShared.SerializedField> fieldList = null;
        boolean createDummyBatch = false;
        if (!this.hasRun) {
            int i;
            schemaChanged = true;
            ArrayList<RawFragmentBatch> rawBatches = Lists.newArrayList();
            try {
                int p = 0;
                for (RawFragmentBatchProvider provider : this.fragProviders) {
                    RawFragmentBatch rawBatch;
                    if (this.tempBatchHolder[p] != null) {
                        rawBatch = this.tempBatchHolder[p];
                        this.tempBatchHolder[p] = null;
                    } else {
                        rawBatch = this.getNext(p);
                    }
                    this.checkContinue();
                    if (rawBatch == null) {
                        this.checkContinue();
                        createDummyBatch = true;
                        rawBatches.add(rawBatch);
                        ++p;
                        continue;
                    }
                    if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) {
                        fieldList = rawBatch.getHeader().getDef().getFieldList();
                    }
                    if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
                        rawBatches.add(rawBatch);
                    } else {
                        while ((rawBatch = this.getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
                        }
                        if (rawBatch == null) {
                            this.checkContinue();
                            createDummyBatch = true;
                        }
                        if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
                            createDummyBatch = true;
                        }
                        rawBatches.add(rawBatch);
                    }
                    ++p;
                }
                if (fieldList == null) {
                    return RecordBatch.IterOutcome.NONE;
                }
                if (createDummyBatch) {
                    RawFragmentBatch[] dummyDef = UserBitShared.RecordBatchDef.newBuilder().addAllField(this.createDummyFieldList(fieldList)).setRecordCount(0).build();
                    BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder().setIsLastBatch(true).setDef((UserBitShared.RecordBatchDef)dummyDef).build();
                    for (int i2 = 0; i2 < p; ++i2) {
                        RawFragmentBatch rawBatch = (RawFragmentBatch)rawBatches.get(i2);
                        if (rawBatch != null && rawBatch.getHeader().getDef().getFieldCount() != 0) continue;
                        rawBatch = new RawFragmentBatch(dummyHeader, null, null);
                        rawBatches.set(i2, rawBatch);
                    }
                }
            }
            catch (Throwable t) {
                this.clearBatches(rawBatches);
                throw t;
            }
            this.senderCount = rawBatches.size();
            this.incomingBatches = new RawFragmentBatch[this.senderCount];
            this.batchOffsets = new int[this.senderCount];
            this.batchLoaders = new RecordBatchLoader[this.senderCount];
            for (i = 0; i < this.senderCount; ++i) {
                this.incomingBatches[i] = (RawFragmentBatch)rawBatches.get(i);
                this.batchLoaders[i] = new RecordBatchLoader(this.oContext.getAllocator());
            }
            rawBatches.clear();
            i = 0;
            for (RawFragmentBatch batch : this.incomingBatches) {
                UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
                this.batchLoaders[i].load(rbd, batch.getBody());
                batch.release();
                int n = i++;
                this.batchOffsets[n] = this.batchOffsets[n] + 1;
            }
            this.checkSameSchemaAmongBatches(this.batchLoaders);
            SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
            for (VectorWrapper<?> v : this.batchLoaders[0]) {
                bldr.addField(v.getField());
                this.container.addOrGet(v.getField());
            }
            this.allocateOutgoing();
            this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
            this.merger = this.createMerger();
            this.pqueue = new PriorityQueue<Node>(this.fragProviders.length, new Comparator<Node>(){

                @Override
                public int compare(Node node1, Node node2) {
                    int leftIndex = (node1.batchId << 16) + node1.valueIndex;
                    int rightIndex = (node2.batchId << 16) + node2.valueIndex;
                    try {
                        return MergingRecordBatch.this.merger.doEval(leftIndex, rightIndex);
                    }
                    catch (SchemaChangeException e) {
                        throw new UnsupportedOperationException(e);
                    }
                }
            });
            for (int b = 0; b < this.senderCount; ++b) {
                while (this.batchLoaders[b] != null && this.batchLoaders[b].getRecordCount() == 0) {
                    RawFragmentBatch batch;
                    this.incomingBatches[b] = batch = this.getNext(b);
                    if (batch != null) {
                        this.batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
                        continue;
                    }
                    this.batchLoaders[b].clear();
                    this.batchLoaders[b] = null;
                    this.checkContinue();
                }
                if (this.batchLoaders[b] == null) continue;
                this.pqueue.add(new Node(b, 0));
            }
            this.hasRun = true;
        }
        while (this.outgoingBatchHasSpace && (node = this.pqueue.poll()) != null) {
            this.outgoingBatchHasSpace = this.copyRecordToOutgoingBatch(node);
            if (node.valueIndex == this.batchLoaders[node.batchId].getRecordCount() - 1) {
                RawFragmentBatch nextBatch = this.getNext(node.batchId);
                while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
                    nextBatch = this.getNext(node.batchId);
                }
                assert (nextBatch != null || this.inputCounts[node.batchId] == this.outputCounts[node.batchId]) : String.format("Stream %d input count: %d output count %d", node.batchId, this.inputCounts[node.batchId], this.outputCounts[node.batchId]);
                if (nextBatch == null) {
                    this.checkContinue();
                }
                this.incomingBatches[node.batchId] = nextBatch;
                if (nextBatch == null) {
                    boolean allBatchesEmpty = true;
                    for (RawFragmentBatch batch : this.incomingBatches) {
                        if (batch == null) continue;
                        allBatchesEmpty = false;
                        break;
                    }
                    if (!allBatchesEmpty) continue;
                    this.hasMoreIncoming = false;
                    break;
                }
                UserBitShared.RecordBatchDef rbd = this.incomingBatches[node.batchId].getHeader().getDef();
                this.batchLoaders[node.batchId].load(rbd, this.incomingBatches[node.batchId].getBody());
                this.incomingBatches[node.batchId].release();
                this.batchOffsets[node.batchId] = 0;
                if (this.batchLoaders[node.batchId].getRecordCount() == 0) continue;
                node.valueIndex = 0;
                this.pqueue.add(node);
                continue;
            }
            ++node.valueIndex;
            this.pqueue.add(node);
        }
        this.container.setValueCount(this.outgoingPosition);
        if (this.pqueue.isEmpty()) {
            this.state = AbstractRecordBatch.BatchState.DONE;
        }
        if (schemaChanged) {
            return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        return RecordBatch.IterOutcome.OK;
    }

    private UserBitShared.SerializedField createDummyField(UserBitShared.SerializedField field) {
        UserBitShared.SerializedField.Builder newDummyFieldBuilder = UserBitShared.SerializedField.newBuilder().setVarByteLength(0).setBufferLength(0).setValueCount(0).setNamePart(field.getNamePart()).setMajorType(field.getMajorType());
        int index = 0;
        for (UserBitShared.SerializedField childField : field.getChildList()) {
            newDummyFieldBuilder.addChild(index, this.createDummyField(childField));
            ++index;
        }
        UserBitShared.SerializedField newDummyField = newDummyFieldBuilder.build();
        return newDummyField;
    }

    private List<UserBitShared.SerializedField> createDummyFieldList(List<UserBitShared.SerializedField> fieldList) {
        ArrayList<UserBitShared.SerializedField> dummyFieldList = new ArrayList<UserBitShared.SerializedField>();
        for (UserBitShared.SerializedField field : fieldList) {
            dummyFieldList.add(this.createDummyField(field));
        }
        return dummyFieldList;
    }

    @Override
    public FragmentContext getContext() {
        return this.context;
    }

    @Override
    public BatchSchema getSchema() {
        if (this.container.hasSchema()) {
            return this.container.getSchema();
        }
        return null;
    }

    @Override
    public void buildSchema() {
        block4: {
            RawFragmentBatch batch;
            this.tempBatchHolder = new RawFragmentBatch[this.fragProviders.length];
            int i = 0;
            while (true) {
                if (i >= this.fragProviders.length) {
                    this.state = AbstractRecordBatch.BatchState.DONE;
                    return;
                }
                batch = this.getNext(i);
                if (batch == null) {
                    this.checkContinue();
                    break block4;
                }
                if (batch.getHeader().getDef().getFieldCount() != 0) break;
                ++i;
            }
            this.tempBatchHolder[i] = batch;
            for (UserBitShared.SerializedField field : batch.getHeader().getDef().getFieldList()) {
                Object v = this.container.addOrGet(MaterializedField.create(field));
                v.allocateNew();
            }
        }
        this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        this.container.setEmpty();
    }

    @Override
    public int getRecordCount() {
        return this.outgoingPosition;
    }

    @Override
    public void cancel() {
        this.informSenders();
        for (RawFragmentBatchProvider provider : this.fragProviders) {
            provider.kill(this.context);
        }
    }

    private void informSenders() {
        logger.info("Informing senders of request to terminate sending.");
        ExecProtos.FragmentHandle handlePrototype = ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(this.config.getOppositeMajorFragmentId()).setQueryId(this.context.getHandle().getQueryId()).build();
        for (MinorFragmentEndpoint providingEndpoint : this.config.getProvidingEndpoints()) {
            ExecProtos.FragmentHandle sender = ExecProtos.FragmentHandle.newBuilder(handlePrototype).setMinorFragmentId(providingEndpoint.getId()).build();
            BitControl.FinishedReceiver finishedReceiver = BitControl.FinishedReceiver.newBuilder().setReceiver(this.context.getHandle()).setSender(sender).build();
            this.context.getController().getTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver);
        }
    }

    @Override
    protected void cancelIncoming() {
    }

    private void checkSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
        Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
        BatchSchema schema = batchLoaders[0].getSchema();
        for (int i = 1; i < batchLoaders.length; ++i) {
            if (schema.equals(batchLoaders[i].getSchema())) continue;
            throw UserException.schemaChangeError().message("Incoming batches for merging receiver have different schemas!", new Object[0]).addContext("Schema 1: %s, Schema 2: %s", schema.toString()).addContext("Schema 2: %s", batchLoaders[i].getSchema().toString()).build(logger);
        }
    }

    private void allocateOutgoing() {
        for (VectorWrapper<?> w : this.container) {
            Object v = w.getValueVector();
            if (v instanceof FixedWidthVector) {
                AllocationHelper.allocate(v, 32768, 1);
                continue;
            }
            v.allocateNewSafe();
        }
    }

    private MergingReceiverGeneratorBase createMerger() {
        this.stats.startSetup();
        CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, this.context.getOptions());
        cg.plainJavaCapable(true);
        ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
        ExpandableHyperContainer batch = null;
        boolean first = true;
        for (RecordBatchLoader loader : this.batchLoaders) {
            if (first) {
                batch = new ExpandableHyperContainer(loader);
                first = false;
                continue;
            }
            batch.addBatch(loader);
        }
        try {
            this.generateComparisons(g, batch);
            g.setMappingSet(this.COPIER_MAPPING_SET);
            CopyUtil.generateCopies(g, batch, true);
            g.setMappingSet(this.MAIN_MAPPING);
            MergingReceiverGeneratorBase merger = this.context.getImplementationClass(cg);
            merger.doSetup(this.context, batch, this.container);
            MergingReceiverGeneratorBase mergingReceiverGeneratorBase = merger;
            return mergingReceiverGeneratorBase;
        }
        catch (SchemaChangeException e) {
            throw this.schemaChangeException(e, logger);
        }
        finally {
            this.stats.stopSetup();
        }
    }

    private void generateComparisons(ClassGenerator<?> g, VectorAccessible batch) throws SchemaChangeException {
        g.setMappingSet(this.MAIN_MAPPING);
        for (Order.Ordering od : ((MergingReceiverPOP)this.popConfig).getOrderings()) {
            ErrorCollectorImpl collector = new ErrorCollectorImpl();
            LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, this.context.getFunctionRegistry());
            if (collector.hasErrors()) {
                throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
            }
            g.setMappingSet(this.LEFT_MAPPING);
            ClassGenerator.HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
            g.setMappingSet(this.RIGHT_MAPPING);
            ClassGenerator.HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
            g.setMappingSet(this.MAIN_MAPPING);
            LogicalExpression fh = FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, this.context.getFunctionRegistry());
            ClassGenerator.HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
            JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit((int)0)));
            if (od.getDirection() == RelFieldCollation.Direction.ASCENDING) {
                jc._then()._return((JExpression)out.getValue());
                continue;
            }
            jc._then()._return(out.getValue().minus());
        }
        g.getEvalBlock()._return(JExpr.lit((int)0));
    }

    private boolean copyRecordToOutgoingBatch(Node node) {
        assert (this.outgoingPosition < 32768) : String.format("Outgoing position %d must be less than bath size %d", this.outgoingPosition, 32768);
        if (!$assertionsDisabled) {
            int n = node.batchId;
            this.outputCounts[n] = this.outputCounts[n] + 1L;
            if (this.outputCounts[n] > this.inputCounts[node.batchId]) {
                throw new AssertionError((Object)String.format("Stream %d input count: %d output count %d", node.batchId, this.inputCounts[node.batchId], this.outputCounts[node.batchId]));
            }
        }
        int inIndex = (node.batchId << 16) + node.valueIndex;
        try {
            this.merger.doCopy(inIndex, this.outgoingPosition);
        }
        catch (SchemaChangeException e) {
            throw new UnsupportedOperationException(e);
        }
        if (++this.outgoingPosition == 32768) {
            logger.debug("Outgoing vectors space is full (batch size {}).", (Object)32768);
            return false;
        }
        return true;
    }

    @Override
    public void close() {
        this.container.clear();
        if (this.batchLoaders != null) {
            for (RecordBatchLoader rbl : this.batchLoaders) {
                if (rbl == null) continue;
                rbl.clear();
            }
        }
        super.close();
    }

    @Override
    public void dump() {
        logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, tempBatchHolder={}, inputCounts={}, outputCounts={}]", new Object[]{this.container, this.outgoingPosition, Arrays.toString(this.incomingBatches), Arrays.toString(this.batchOffsets), Arrays.toString(this.tempBatchHolder), Arrays.toString(this.inputCounts), Arrays.toString(this.outputCounts)});
    }

    public static enum Metric implements MetricDef
    {
        BYTES_RECEIVED,
        NUM_SENDERS,
        NEXT_WAIT_NANOS;


        @Override
        public int metricId() {
            return this.ordinal();
        }
    }

    public class Node {
        public int batchId;
        public int valueIndex;

        Node(int batchId, int valueIndex) {
            this.batchId = batchId;
            this.valueIndex = valueIndex;
        }
    }

    private class OutcomeListener
    implements RpcOutcomeListener<GeneralRPCProtos.Ack> {
        private OutcomeListener() {
        }

        @Override
        public void failed(RpcException ex) {
            logger.warn("Failed to inform upstream that receiver is finished");
        }

        @Override
        public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) {
        }

        @Override
        public void interrupted(InterruptedException e) {
            if (MergingRecordBatch.this.context.getExecutorState().shouldContinue()) {
                String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
                logger.error("Received an interrupt RPC outcome while sending ReceiverFinished message", (Throwable)e);
                MergingRecordBatch.this.context.getExecutorState().fail(new RpcException("Received an interrupt RPC outcome while sending ReceiverFinished message", e));
            }
        }
    }
}

