/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl;

import com.mapr.db.exceptions.AccessDeniedException;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.StreamsDocument;
import com.mapr.streams.impl.StreamsDocumentTranslator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.ojai.Document;
import org.ojai.DocumentListener;
import org.ojai.DocumentReader;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
import org.ojai.exceptions.StreamInUseException;
import org.ojai.store.exceptions.StoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinDocumentStream
implements DocumentStream {
    static final Logger LOG = LoggerFactory.getLogger(MarlinDocumentStream.class);
    private final ExecutorService pool;
    DocumentListNode first;
    DocumentListNode last;
    int scansInProgress;
    boolean iteratorOpened;
    boolean closed;
    boolean accessExceptionOccurred;
    long totalCachedMemory;
    long maxCacheMemory;
    StoreException exception;

    public MarlinDocumentStream(List<DocumentStream> dbDocumentStreams, List<FieldPath> ps, int parallelScans, long maxMemory) {
        List<Integer> pathIds = StreamsDocument.getProjectionIdList(ps);
        this.iteratorOpened = false;
        this.closed = false;
        this.pool = Executors.newFixedThreadPool(parallelScans);
        this.first = null;
        this.last = null;
        this.scansInProgress = dbDocumentStreams.size();
        this.totalCachedMemory = 0L;
        this.maxCacheMemory = maxMemory;
        this.accessExceptionOccurred = false;
        this.exception = null;
        ArrayList<Future<Void>> futureList = new ArrayList<Future<Void>>();
        for (DocumentStream documentStream : dbDocumentStreams) {
            PartitionScanner pScanObj = new PartitionScanner(documentStream, this, pathIds);
            Future<Void> future = this.pool.submit(pScanObj);
            futureList.add(future);
        }
        for (Future future : futureList) {
            try {
                future.get();
            }
            catch (ExecutionException ee) {
                Throwable te = ee.getCause();
                if (te instanceof AccessDeniedException) {
                    this.accessExceptionOccurred = true;
                    break;
                }
                this.exception = new StoreException(te);
                break;
            }
            catch (InterruptedException ie) {
                this.exception = new StoreException((Throwable)ie);
                break;
            }
        }
        this.pool.shutdown();
    }

    private synchronized DocumentListNode getAllScannedDocuments() {
        while (true) {
            if (this.first != null) {
                DocumentListNode ret = this.first;
                this.first = null;
                this.last = null;
                this.totalCachedMemory = 0L;
                this.notifyAll();
                return ret;
            }
            if (this.scansInProgress == 0) {
                return null;
            }
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                LOG.error("getScannedDocuments: Interrupted: " + e.getMessage());
                return null;
            }
        }
    }

    private synchronized int markScanComplete() {
        --this.scansInProgress;
        this.notifyAll();
        return this.scansInProgress;
    }

    private synchronized void addScannedDocuments(DocumentListNode node) {
        if (this.first == null) {
            this.first = node;
            this.last = node;
        } else {
            this.last.setNextDocumentList(node);
            node.setNextDocumentList(null);
            this.last = node;
        }
        this.totalCachedMemory += (long)node.getBatchSize();
        this.notifyAll();
        while (this.totalCachedMemory > this.maxCacheMemory) {
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }

    public synchronized void close() {
        if (!this.closed) {
            this.pool.shutdownNow();
            this.closed = true;
        }
    }

    public void streamTo(DocumentListener l) {
        try {
            Iterator<Document> iterator = this.iterator();
            while (iterator.hasNext()) {
                Document record = iterator.next();
                l.documentArrived(record);
            }
        }
        catch (Exception e) {
            try {
                this.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            l.failed(e);
        }
    }

    private void checkDocStreamIteratorOpened() {
        if (this.iteratorOpened) {
            throw new StreamInUseException("An iterator has already been opened on this document stream.");
        }
    }

    private synchronized void checkDocStreamClosed() {
        if (this.closed) {
            throw new IllegalStateException("DocumentStream already closed.");
        }
    }

    public Iterator<Document> iterator() {
        this.checkDocStreamIteratorOpened();
        this.checkDocStreamClosed();
        this.iteratorOpened = true;
        return new Iterator<Document>(){
            DocumentListNode allScannedDocs = null;
            boolean done = false;

            @Override
            public boolean hasNext() {
                if (this.done) {
                    return false;
                }
                MarlinDocumentStream.this.checkDocStreamClosed();
                while (this.allScannedDocs != null && this.allScannedDocs.getNumTotalDocuments() == 0) {
                    this.allScannedDocs = this.allScannedDocs.getNextDocumentList();
                }
                if (this.allScannedDocs == null) {
                    this.allScannedDocs = MarlinDocumentStream.this.getAllScannedDocuments();
                    if (this.allScannedDocs == null) {
                        this.done = true;
                        try {
                            MarlinDocumentStream.this.close();
                        }
                        catch (Exception e) {
                            LOG.error("Error while closing stream: " + e.getMessage());
                        }
                    }
                }
                return !this.done;
            }

            @Override
            public Document next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException("next() called after hasNext() return false.");
                }
                return this.allScannedDocs.getNextDocument();
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public Iterable<DocumentReader> documentReaders() {
        throw new UnsupportedOperationException("documentReaders() not supported for MarlinDocumentStream");
    }

    public Document getQueryPlan() {
        throw new UnsupportedOperationException();
    }

    private class PartitionScanner
    implements Callable<Void> {
        DocumentStream docStream;
        MarlinDocumentStream marlinStream;
        List<Integer> pathIds;

        PartitionScanner(DocumentStream dStream, MarlinDocumentStream mStream, List<Integer> pIds) {
            this.docStream = dStream;
            this.marlinStream = mStream;
            this.pathIds = pIds;
        }

        @Override
        public Void call() throws Exception {
            try {
                Iterator iter = this.docStream.iterator();
                Marlinserver.MarlinInternalDefaults mdef = Marlinserver.MarlinInternalDefaults.getDefaultInstance();
                while (iter.hasNext()) {
                    Document dbDoc = (Document)iter.next();
                    StreamsDocumentTranslator t = new StreamsDocumentTranslator(dbDoc, this.pathIds);
                    DocumentListNode newNode = new DocumentListNode(mdef.getMaxMsgsPerRow());
                    int totalBatchSize = 0;
                    while (t.hasNext()) {
                        StreamsDocument doc = (StreamsDocument)t.next();
                        totalBatchSize += doc.size();
                        newNode.addDocument((Document)doc);
                    }
                    newNode.setBatchSize(totalBatchSize);
                    this.marlinStream.addScannedDocuments(newNode);
                }
                this.docStream.close();
                LOG.debug("Num scanners active = " + this.marlinStream.markScanComplete());
            }
            catch (Exception e) {
                LOG.error("Error while scanning stream: " + e.getCause());
                throw e;
            }
            return null;
        }
    }

    private class DocumentListNode {
        Document[] docs;
        int totalDocs;
        int consumedDocs;
        int batchSize;
        DocumentListNode next;

        public DocumentListNode(int maxMsgsPerRow) {
            this.docs = new Document[maxMsgsPerRow];
            this.totalDocs = 0;
            this.consumedDocs = 0;
            this.batchSize = 0;
            this.next = null;
        }

        public void addDocument(Document doc) {
            assert (this.totalDocs < this.docs.length);
            this.docs[this.totalDocs++] = doc;
        }

        public int getNumTotalDocuments() {
            return this.totalDocs - this.consumedDocs;
        }

        public Document getNextDocument() {
            if (this.consumedDocs >= this.totalDocs) {
                return null;
            }
            Document doc = this.docs[this.consumedDocs];
            this.docs[this.consumedDocs] = null;
            ++this.consumedDocs;
            return doc;
        }

        public void setBatchSize(int size) {
            this.batchSize = size;
        }

        public int getBatchSize() {
            return this.batchSize;
        }

        public void setNextDocumentList(DocumentListNode node) {
            this.next = node;
        }

        public DocumentListNode getNextDocumentList() {
            return this.next;
        }
    }
}

