/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.producer;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.config.ProducerConsumer;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerConsumerBatch
extends AbstractRecordBatch<ProducerConsumer> {
    private static final Logger logger = LoggerFactory.getLogger(ProducerConsumerBatch.class);
    private final RecordBatch incoming;
    private final Thread producer = new Thread((Runnable)new Producer(), Thread.currentThread().getName() + " - Producer Thread");
    private boolean running = false;
    private final BlockingDeque<RecordBatchDataWrapper> queue;
    private int recordCount;
    private BatchSchema schema;
    private final CountDownLatch cleanUpLatch = new CountDownLatch(1);

    protected ProducerConsumerBatch(ProducerConsumer popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
        super(popConfig, context);
        this.incoming = incoming;
        this.queue = new LinkedBlockingDeque<RecordBatchDataWrapper>(popConfig.getSize());
    }

    @Override
    public RecordBatch.IterOutcome innerNext() {
        RecordBatchDataWrapper wrapper;
        if (!this.running) {
            this.producer.start();
            this.running = true;
        }
        try {
            this.stats.startWait();
            wrapper = this.queue.take();
            logger.debug("Got batch from queue");
        }
        catch (InterruptedException e) {
            throw new QueryCancelledException();
        }
        finally {
            this.stats.stopWait();
        }
        if (wrapper.finished) {
            return RecordBatch.IterOutcome.NONE;
        }
        this.recordCount = wrapper.batch.getRecordCount();
        boolean newSchema = this.load(wrapper.batch);
        return newSchema ? RecordBatch.IterOutcome.OK_NEW_SCHEMA : RecordBatch.IterOutcome.OK;
    }

    private boolean load(RecordBatchData batch) {
        VectorContainer newContainer = batch.getContainer();
        if (this.schema != null && newContainer.getSchema().equals(this.schema)) {
            this.container.zeroVectors();
            BatchSchema schema = this.container.getSchema();
            for (int i = 0; i < this.container.getNumberOfColumns(); ++i) {
                MaterializedField field = schema.getColumn(i);
                TypeProtos.MajorType type = field.getType();
                Object vOut = this.container.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()), this.container.getValueVectorId(SchemaPath.getSimplePath(field.getName())).getFieldIds()).getValueVector();
                Object vIn = newContainer.getValueAccessorById(TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()), newContainer.getValueVectorId(SchemaPath.getSimplePath(field.getName())).getFieldIds()).getValueVector();
                TransferPair tp = vIn.makeTransferPair((ValueVector)vOut);
                tp.transfer();
            }
            return false;
        }
        this.container.clear();
        for (VectorWrapper<?> w : newContainer) {
            this.container.add((ValueVector)w.getValueVector());
        }
        this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        this.schema = this.container.getSchema();
        return true;
    }

    private void clearQueue() {
        RecordBatchDataWrapper wrapper;
        while ((wrapper = this.queue.poll()) != null) {
            if (wrapper.batch == null) continue;
            wrapper.batch.getContainer().clear();
        }
    }

    @Override
    protected void cancelIncoming() {
    }

    @Override
    public void close() {
        this.producer.interrupt();
        try {
            this.producer.join();
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for producer thread");
        }
        try {
            this.cleanUpLatch.await();
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", (Throwable)e);
            throw new QueryCancelledException();
        }
        finally {
            super.close();
            this.clearQueue();
        }
    }

    @Override
    public int getRecordCount() {
        return this.recordCount;
    }

    @Override
    public void dump() {
        logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}]", new Object[]{this.container, this.recordCount, this.schema});
    }

    private class Producer
    implements Runnable {
        RecordBatchDataWrapper wrapper;

        private Producer() {
        }

        @Override
        public void run() {
            block16: {
                boolean stop = false;
                try {
                    while (true) {
                        RecordBatch.IterOutcome upstream = ProducerConsumerBatch.this.incoming.next();
                        switch (upstream) {
                            case NONE: {
                                stop = true;
                                break block16;
                            }
                            case OK_NEW_SCHEMA: 
                            case OK: {
                                this.wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(ProducerConsumerBatch.this.incoming, ProducerConsumerBatch.this.oContext.getAllocator()));
                                ProducerConsumerBatch.this.queue.put(this.wrapper);
                                this.wrapper = null;
                                break;
                            }
                            default: {
                                throw new UnsupportedOperationException();
                            }
                        }
                    }
                }
                catch (InterruptedException e) {
                    logger.warn("Producer thread is interrupted.", (Throwable)e);
                    throw new QueryCancelledException();
                }
                finally {
                    if (stop) {
                        try {
                            ProducerConsumerBatch.this.clearQueue();
                            ProducerConsumerBatch.this.queue.put(RecordBatchDataWrapper.finished());
                        }
                        catch (InterruptedException e) {
                            logger.error("Unable to enqueue the last batch indicator. Something is broken.", (Throwable)e);
                        }
                    }
                    if (this.wrapper != null) {
                        this.wrapper.batch.clear();
                    }
                    ProducerConsumerBatch.this.cleanUpLatch.countDown();
                }
            }
        }
    }

    private static class RecordBatchDataWrapper {
        final RecordBatchData batch;
        final boolean finished;

        RecordBatchDataWrapper(RecordBatchData batch, boolean finished) {
            this.batch = batch;
            this.finished = finished;
        }

        public static RecordBatchDataWrapper batch(RecordBatchData batch) {
            return new RecordBatchDataWrapper(batch, false);
        }

        public static RecordBatchDataWrapper finished() {
            return new RecordBatchDataWrapper(null, true);
        }
    }
}

