/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.broadcastsender;

import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BroadcastSenderRootExec
extends BaseRootExec {
    private static final Logger logger = LoggerFactory.getLogger(BroadcastSenderRootExec.class);
    private final BroadcastSender config;
    private final int[][] receivingMinorFragments;
    private final AccountingDataTunnel[] tunnels;
    private final ExecProtos.FragmentHandle handle;
    private volatile boolean ok = true;
    private final RecordBatch incoming;

    public BroadcastSenderRootExec(RootFragmentContext context, RecordBatch incoming, BroadcastSender config) throws OutOfMemoryException {
        super(context, context.newOperatorContext(config, null), config);
        this.incoming = incoming;
        this.config = config;
        this.handle = context.getHandle();
        List<MinorFragmentEndpoint> destinations = config.getDestinations();
        ArrayListMultimap dests = ArrayListMultimap.create();
        for (MinorFragmentEndpoint destination : destinations) {
            dests.put(destination.getEndpoint(), (Object)destination.getId());
        }
        int destCount = dests.keySet().size();
        int i = 0;
        this.tunnels = new AccountingDataTunnel[destCount];
        this.receivingMinorFragments = new int[destCount][];
        for (CoordinationProtos.DrillbitEndpoint ep : dests.keySet()) {
            List minorsList = dests.get(ep);
            int[] minorsArray = new int[minorsList.size()];
            int x = 0;
            for (Integer m : minorsList) {
                minorsArray[x++] = m;
            }
            this.receivingMinorFragments[i] = minorsArray;
            this.tunnels[i] = context.getDataTunnel(ep);
            ++i;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean innerNext() {
        RecordBatch.IterOutcome out = this.next(this.incoming);
        logger.debug("Outcome of sender next {}", (Object)out);
        switch (out) {
            case NONE: {
                for (int i = 0; i < this.tunnels.length; ++i) {
                    FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(this.handle.getQueryId(), this.handle.getMajorFragmentId(), this.handle.getMinorFragmentId(), this.config.getOppositeMajorFragmentId(), this.receivingMinorFragments[i]);
                    this.stats.startWait();
                    try {
                        this.tunnels[i].sendRecordBatch(b2);
                        continue;
                    }
                    finally {
                        this.stats.stopWait();
                    }
                }
                return false;
            }
            case OK_NEW_SCHEMA: 
            case OK: {
                WritableBatch writableBatch = this.incoming.getWritableBatch().transfer(this.oContext.getAllocator());
                if (this.tunnels.length > 1) {
                    writableBatch.retainBuffers(this.tunnels.length - 1);
                }
                for (int i = 0; i < this.tunnels.length; ++i) {
                    FragmentWritableBatch batch = new FragmentWritableBatch(false, this.handle.getQueryId(), this.handle.getMajorFragmentId(), this.handle.getMinorFragmentId(), this.config.getOppositeMajorFragmentId(), this.receivingMinorFragments[i], writableBatch);
                    this.updateStats(batch);
                    this.stats.startWait();
                    try {
                        this.tunnels[i].sendRecordBatch(batch);
                        continue;
                    }
                    finally {
                        this.stats.stopWait();
                    }
                }
                return this.ok;
            }
        }
        throw new IllegalStateException();
    }

    public void updateStats(FragmentWritableBatch writableBatch) {
        this.stats.setLongStat(Metric.N_RECEIVERS, this.tunnels.length);
        this.stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
    }

    public static enum Metric implements MetricDef
    {
        N_RECEIVERS,
        BYTES_SENT;


        @Override
        public int metricId() {
            return this.ordinal();
        }
    }
}

