/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.ws.emr.hadoop.fs.dynamodb.impl;

import com.amazon.ws.emr.hadoop.fs.concurrent.Consumer;
import com.amazon.ws.emr.hadoop.fs.concurrent.Producer;
import com.amazon.ws.emr.hadoop.fs.concurrent.ProducerConsumerExecutor;
import com.amazon.ws.emr.hadoop.fs.dynamodb.Entity;
import com.amazon.ws.emr.hadoop.fs.dynamodb.impl.NativeDynamoDBEntityStore;
import com.amazon.ws.emr.hadoop.fs.dynamodb.impl.NativeDynamoDBRateLimiter;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.base.Optional;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.collect.AbstractIterator;
import com.amazon.ws.emr.hadoop.fs.shaded.com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class NativeDynamoDBDumpResult
implements Iterable<Entity> {
    private static final int PRODUCER_COUNT = 5;
    private final ScanRequest originalScanRequest;
    private final NativeDynamoDBRateLimiter rateLimiter;
    private final AmazonDynamoDB dynamoDB;

    public NativeDynamoDBDumpResult(AmazonDynamoDB dynamoDB, NativeDynamoDBRateLimiter rateLimiter, ScanRequest scanRequest) {
        this.originalScanRequest = scanRequest;
        this.rateLimiter = rateLimiter;
        this.dynamoDB = dynamoDB;
    }

    @Override
    public Iterator<Entity> iterator() {
        final ArrayBlockingQueue<Optional<Entity>> queue = new ArrayBlockingQueue<Optional<Entity>>(1000);
        AbstractIterator<Entity> iterator = new AbstractIterator<Entity>(){

            @Override
            protected Entity computeNext() {
                Entity entity;
                try {
                    entity = (Entity)((Optional)queue.take()).orNull();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                if (entity == null) {
                    return (Entity)this.endOfData();
                }
                return entity;
            }
        };
        final ProducerConsumerExecutor executor = new ProducerConsumerExecutor();
        final EntityConsumer consumer = new EntityConsumer(queue);
        final ArrayList<EntityProducer> producers = new ArrayList<EntityProducer>();
        for (int i = 0; i < 5; ++i) {
            ScanRequest scanRequest = new ScanRequest().withTableName(this.originalScanRequest.getTableName()).withLimit(this.originalScanRequest.getLimit()).withExclusiveStartKey(this.originalScanRequest.getExclusiveStartKey()).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withScanFilter(this.originalScanRequest.getScanFilter()).withSegment(i).withTotalSegments(5);
            producers.add(new EntityProducer(this.dynamoDB, scanRequest));
        }
        Thread executionThread = new Thread(new Runnable(){

            @Override
            public void run() {
                executor.execute(producers, Lists.newArrayList(consumer));
            }
        });
        executionThread.setDaemon(true);
        executionThread.setName("NativeDynamoDBDumpResult|" + Integer.toString(System.identityHashCode(iterator)));
        executionThread.start();
        return iterator;
    }

    private class EntityConsumer
    extends Consumer<Entity> {
        private BlockingQueue<Optional<Entity>> queue;

        public EntityConsumer(BlockingQueue<Optional<Entity>> queue) {
            this.queue = queue;
        }

        @Override
        public void beforeConsumption() {
        }

        @Override
        public void consume(Entity value) {
            try {
                this.queue.put(Optional.of(value));
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void afterConsumption() {
            try {
                Optional absent = Optional.absent();
                this.queue.put(absent);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public class EntityProducer
    extends Producer<Entity> {
        private ScanRequest scanRequest;
        private AmazonDynamoDB dynamoDB;
        private boolean moreToScan = true;
        private Iterator<Entity> iterator;

        public EntityProducer(AmazonDynamoDB dynamoDB, ScanRequest scanRequest) {
            this.dynamoDB = dynamoDB;
            this.scanRequest = scanRequest;
        }

        @Override
        protected boolean canProduce() {
            if (this.iterator == null || !this.iterator.hasNext()) {
                List<Entity> batch = this.getNextBatch();
                if (batch == null || batch.isEmpty()) {
                    return false;
                }
                this.iterator = batch.iterator();
            }
            return true;
        }

        @Override
        protected Entity produce() {
            return this.iterator.next();
        }

        private List<Entity> getNextBatch() {
            ArrayList<Entity> entities;
            if (!this.moreToScan) {
                return null;
            }
            do {
                NativeDynamoDBDumpResult.this.rateLimiter.beforeRead();
                ScanResult scanResult = this.dynamoDB.scan(this.scanRequest);
                NativeDynamoDBDumpResult.this.rateLimiter.afterRead(scanResult.getConsumedCapacity());
                if (scanResult.getLastEvaluatedKey() == null) {
                    this.moreToScan = false;
                } else {
                    this.scanRequest.setExclusiveStartKey(scanResult.getLastEvaluatedKey());
                }
                entities = new ArrayList<Entity>(scanResult.getItems().size());
                for (Map<String, AttributeValue> item : scanResult.getItems()) {
                    if (item.get("hashKey").getS().equals("MultiKeyStoreTag") && item.get("rangeKey").getS().equals("TableRole")) continue;
                    entities.add(NativeDynamoDBEntityStore.itemToEntity(item));
                }
            } while (this.moreToScan && entities.isEmpty());
            return entities;
        }
    }
}

