/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.filter;

import com.google.protobuf.ProtocolStringList;
import io.netty.buffer.DrillBuf;
import java.io.Closeable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.Consumer;
import org.apache.drill.exec.ops.DataTunnelStatusHandler;
import org.apache.drill.exec.ops.SendingAccountor;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeFilterSink
implements Closeable {
    private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<RuntimeFilterWritable>();
    private Map<Integer, Integer> joinMjId2rfNumber;
    private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<Integer, List<CoordinationProtos.DrillbitEndpoint>>();
    private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<Integer, Integer>();
    private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<Integer, RuntimeFilterWritable>();
    private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<Integer, Stopwatch>();
    private DrillbitContext drillbitContext;
    private SendingAccountor sendingAccountor;
    private AsyncAggregateWorker asyncAggregateWorker;
    private AtomicBoolean running = new AtomicBoolean(true);
    private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);

    public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) {
        this.drillbitContext = drillbitContext;
        this.sendingAccountor = sendingAccountor;
        this.asyncAggregateWorker = new AsyncAggregateWorker();
        drillbitContext.getExecutor().submit(this.asyncAggregateWorker);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(RuntimeFilterWritable runtimeFilterWritable) {
        if (!this.running.get()) {
            runtimeFilterWritable.close();
            return;
        }
        runtimeFilterWritable.retainBuffers(1);
        int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
        if (this.joinMjId2Stopwatch.get(joinMjId) == null) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            this.joinMjId2Stopwatch.put(joinMjId, stopwatch);
        }
        BlockingQueue<RuntimeFilterWritable> blockingQueue = this.rfQueue;
        synchronized (blockingQueue) {
            if (!this.running.get()) {
                runtimeFilterWritable.close();
                return;
            }
            this.rfQueue.add(runtimeFilterWritable);
            this.rfQueue.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.running.set(false);
        if (this.asyncAggregateWorker != null) {
            BlockingQueue<RuntimeFilterWritable> blockingQueue = this.rfQueue;
            synchronized (blockingQueue) {
                this.rfQueue.notify();
            }
        }
        while (!this.asyncAggregateWorker.over.get()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", (Throwable)e);
            }
        }
        for (RuntimeFilterWritable runtimeFilterWritable : this.joinMjId2AggregatedRF.values()) {
            runtimeFilterWritable.close();
        }
    }

    private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable) {
        BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
        int joinMajorId = runtimeFilterB.getMajorFragmentId();
        RuntimeFilterWritable toAggregated = null;
        int buildSideRfNumber = this.joinMjId2rfNumber.get(joinMajorId);
        this.joinMjId2rfNumber.put(joinMajorId, --buildSideRfNumber);
        toAggregated = this.joinMjId2AggregatedRF.get(joinMajorId);
        if (toAggregated == null) {
            toAggregated = srcRuntimeFilterWritable;
            toAggregated.retainBuffers(1);
        } else {
            toAggregated.aggregate(srcRuntimeFilterWritable);
        }
        this.joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
        if (buildSideRfNumber == 0) {
            this.joinMjId2AggregatedRF.remove(joinMajorId);
            this.route(toAggregated);
            this.joinMjId2rfNumber.remove(joinMajorId);
            Stopwatch stopwatch = this.joinMjId2Stopwatch.get(joinMajorId);
            logger.info("received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms", (Object)joinMajorId, (Object)stopwatch.elapsed(TimeUnit.MILLISECONDS));
        }
    }

    private void route(RuntimeFilterWritable srcRuntimeFilterWritable) {
        BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
        int joinMajorId = runtimeFilterB.getMajorFragmentId();
        UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
        ProtocolStringList probeFields = runtimeFilterB.getProbeFieldsList();
        List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
        long rfIdentifier = runtimeFilterB.getRfIdentifier();
        DrillBuf[] data = srcRuntimeFilterWritable.getData();
        List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = this.joinMjId2probeScanEps.get(joinMajorId);
        int scanNodeSize = scanNodeEps.size();
        srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
        int scanNodeMjId = this.joinMjId2ScanMjId.get(joinMajorId);
        for (int minorId = 0; minorId < scanNodeEps.size(); ++minorId) {
            BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
            for (String probeField : probeFields) {
                builder.addProbeFields(probeField);
            }
            BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId).setMajorFragmentId(scanNodeMjId).setMinorFragmentId(minorId).setToForeman(false).setRfIdentifier(rfIdentifier).addAllBloomFilterSizeInBytes(sizeInBytes).build();
            RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
            CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
            DataTunnel dataTunnel = this.drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
            Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>(){

                @Override
                public void accept(RpcException e) {
                    logger.warn("fail to broadcast a runtime filter to the probe side scan node", (Throwable)e);
                }

                @Override
                public void interrupt(InterruptedException e) {
                    logger.warn("fail to broadcast a runtime filter to the probe side scan node", (Throwable)e);
                }
            };
            DataTunnelStatusHandler statusHandler = new DataTunnelStatusHandler(exceptionConsumer, this.sendingAccountor);
            AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, this.sendingAccountor, statusHandler);
            accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
        }
    }

    public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber) {
        this.joinMjId2rfNumber = joinMjId2rfNumber;
    }

    public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps) {
        this.joinMjId2probeScanEps = joinMjId2probeScanEps;
    }

    public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId) {
        this.joinMjId2ScanMjId = joinMjId2ScanMjId;
    }

    private class AsyncAggregateWorker
    implements Runnable {
        private AtomicBoolean over = new AtomicBoolean(false);

        private AsyncAggregateWorker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while ((RuntimeFilterSink.this.joinMjId2rfNumber == null || !RuntimeFilterSink.this.joinMjId2rfNumber.isEmpty()) && RuntimeFilterSink.this.running.get()) {
                RuntimeFilterWritable toAggregate = null;
                BlockingQueue blockingQueue = RuntimeFilterSink.this.rfQueue;
                synchronized (blockingQueue) {
                    try {
                        toAggregate = (RuntimeFilterWritable)RuntimeFilterSink.this.rfQueue.poll();
                        while (toAggregate == null && RuntimeFilterSink.this.running.get()) {
                            RuntimeFilterSink.this.rfQueue.wait();
                            toAggregate = (RuntimeFilterWritable)RuntimeFilterSink.this.rfQueue.poll();
                        }
                    }
                    catch (InterruptedException ex) {
                        logger.error("RFW_Aggregator thread being interrupted", (Throwable)ex);
                        continue;
                    }
                }
                if (toAggregate == null) continue;
                try {
                    RuntimeFilterSink.this.aggregate(toAggregate);
                }
                catch (Exception ex) {
                    logger.error("Failed to aggregate or route the RFW", (Throwable)ex);
                    BlockingQueue blockingQueue2 = RuntimeFilterSink.this.rfQueue;
                    synchronized (blockingQueue2) {
                        RuntimeFilterSink.this.running.set(false);
                    }
                    this.cleanupQueue();
                    throw new DrillRuntimeException(ex);
                }
                finally {
                    toAggregate.close();
                }
            }
            this.cleanupQueue();
        }

        private void cleanupQueue() {
            if (!RuntimeFilterSink.this.running.get()) {
                RuntimeFilterWritable toClose;
                while ((toClose = (RuntimeFilterWritable)RuntimeFilterSink.this.rfQueue.poll()) != null) {
                    toClose.close();
                }
            }
            this.over.set(true);
        }
    }
}

