/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.api.PartitionFetchInfo;
import io.confluent.kafka.cluster.Broker;
import io.confluent.kafka.cluster.EndPoint;
import io.confluent.kafka.common.TopicAndPartition;
import io.confluent.kafka.javaapi.FetchRequest;
import io.confluent.kafka.javaapi.FetchResponse;
import io.confluent.kafka.javaapi.message.ByteBufferMessageSet;
import io.confluent.kafka.message.MessageAndMetadata;
import io.confluent.kafka.message.MessageAndOffset;
import io.confluent.kafka.serializer.Decoder;
import io.confluent.kafka.serializer.DefaultDecoder;
import io.confluent.kafka.shaded.serializers.KafkaAvroDecoder;
import io.confluent.kafka.shaded.serializers.KafkaJsonDecoder;
import io.confluent.kafka.utils.VerifiableProperties;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.SimpleConsumerConfig;
import io.confluent.kafkarest.SimpleConsumerFactory;
import io.confluent.kafkarest.SimpleConsumerPool;
import io.confluent.kafkarest.SimpleFetcher;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.converters.AvroConverter;
import io.confluent.kafkarest.entities.AvroConsumerRecord;
import io.confluent.kafkarest.entities.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonConsumerRecord;
import io.confluent.org.apache.kafka.common.record.TimestampType;
import io.confluent.org.apache.kafka.common.security.auth.SecurityProtocol;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

public class SimpleConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerManager.class);
    private final int maxPoolSize;
    private final int poolInstanceAvailabilityTimeoutMs;
    private final Time time;
    private final MetadataObserver mdObserver;
    private final SimpleConsumerFactory simpleConsumerFactory;
    private final ConcurrentMap<Broker, SimpleConsumerPool> simpleConsumersPools;
    private AtomicInteger correlationId = new AtomicInteger(0);
    private final Decoder<Object> avroDecoder;
    private final Decoder<byte[]> binaryDecoder;
    private final Decoder<Object> jsonDecoder;

    public SimpleConsumerManager(KafkaRestConfig config, MetadataObserver mdObserver, SimpleConsumerFactory simpleConsumerFactory) {
        this.mdObserver = mdObserver;
        this.simpleConsumerFactory = simpleConsumerFactory;
        this.maxPoolSize = config.getInt("simpleconsumer.pool.size.max");
        this.poolInstanceAvailabilityTimeoutMs = config.getInt("simpleconsumer.pool.timeout.ms");
        this.time = config.getTime();
        this.simpleConsumersPools = new ConcurrentHashMap<Broker, SimpleConsumerPool>();
        Properties props = new Properties();
        props.setProperty("schema.registry.url", config.getString("schema.registry.url"));
        props.putAll((Map<?, ?>)config.originalsWithPrefix("schema.registry", false));
        this.avroDecoder = new KafkaAvroDecoder(new VerifiableProperties(props));
        this.binaryDecoder = new DefaultDecoder(new VerifiableProperties());
        this.jsonDecoder = new KafkaJsonDecoder<Object>(new VerifiableProperties());
    }

    private SimpleConsumerPool createSimpleConsumerPool() {
        return new SimpleConsumerPool(this.maxPoolSize, this.poolInstanceAvailabilityTimeoutMs, this.time, this.simpleConsumerFactory);
    }

    private SimpleFetcher getSimpleFetcher(Broker broker) {
        SimpleConsumerPool pool = (SimpleConsumerPool)this.simpleConsumersPools.get(broker);
        if (pool == null) {
            this.simpleConsumersPools.putIfAbsent(broker, this.createSimpleConsumerPool());
            pool = (SimpleConsumerPool)this.simpleConsumersPools.get(broker);
        }
        for (EndPoint ep : JavaConversions.asJavaCollection(broker.endPoints())) {
            if (ep.securityProtocol() != SecurityProtocol.PLAINTEXT) continue;
            return pool.get(ep.host(), ep.port());
        }
        throw Errors.noSslSupportException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consume(String topicName, int partitionId, long offset, long count, EmbeddedFormat embeddedFormat, ConsumerReadCallback callback) {
        ArrayList<ConsumerRecord> records = null;
        RestException exception = null;
        SimpleFetcher simpleFetcher = null;
        try {
            Broker broker = this.mdObserver.getLeader(topicName, partitionId);
            simpleFetcher = this.getSimpleFetcher(broker);
            records = new ArrayList<ConsumerRecord>();
            int fetchIterations = 0;
            block11: while (count > 0L) {
                log.debug("Simple consumer " + simpleFetcher.clientId() + ": fetch " + ++fetchIterations + "; " + count + " messages remaining");
                ByteBufferMessageSet messageAndOffsets = this.fetchRecords(topicName, partitionId, offset, simpleFetcher);
                if (!messageAndOffsets.iterator().hasNext()) {
                    break;
                }
                for (MessageAndOffset messageAndOffset : messageAndOffsets) {
                    if (messageAndOffset.offset() < offset) continue;
                    records.add(this.createConsumerRecord(messageAndOffset, topicName, partitionId, embeddedFormat));
                    ++offset;
                    if (--count != 0L) continue;
                    continue block11;
                }
            }
        }
        catch (Throwable e) {
            exception = e instanceof RestException ? (RestException)e : Errors.kafkaErrorException((Throwable)e);
        }
        finally {
            if (simpleFetcher != null) {
                try {
                    simpleFetcher.close();
                }
                catch (Exception e) {
                    log.error("Unable to release SimpleConsumer {} into the pool", (Object)simpleFetcher.clientId(), (Object)e);
                }
            }
        }
        callback.onCompletion(records, (Exception)((Object)exception));
    }

    private BinaryConsumerRecord createBinaryConsumerRecord(MessageAndOffset messageAndOffset, String topicName, int partitionId) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = new MessageAndMetadata<byte[], byte[]>(topicName, partitionId, messageAndOffset.message(), messageAndOffset.offset(), this.binaryDecoder, this.binaryDecoder, 0L, TimestampType.CREATE_TIME);
        return new BinaryConsumerRecord(topicName, messageAndMetadata.key(), messageAndMetadata.message(), partitionId, messageAndOffset.offset());
    }

    private AvroConsumerRecord createAvroConsumerRecord(MessageAndOffset messageAndOffset, String topicName, int partitionId) {
        MessageAndMetadata<Object, Object> messageAndMetadata = new MessageAndMetadata<Object, Object>(topicName, partitionId, messageAndOffset.message(), messageAndOffset.offset(), this.avroDecoder, this.avroDecoder, 0L, TimestampType.CREATE_TIME);
        return new AvroConsumerRecord(topicName, AvroConverter.toJson((Object)messageAndMetadata.key()).json, AvroConverter.toJson((Object)messageAndMetadata.message()).json, partitionId, messageAndOffset.offset());
    }

    private JsonConsumerRecord createJsonConsumerRecord(MessageAndOffset messageAndOffset, String topicName, int partitionId) {
        MessageAndMetadata<Object, Object> messageAndMetadata = new MessageAndMetadata<Object, Object>(topicName, partitionId, messageAndOffset.message(), messageAndOffset.offset(), this.jsonDecoder, this.jsonDecoder, 0L, TimestampType.CREATE_TIME);
        return new JsonConsumerRecord(topicName, messageAndMetadata.key(), messageAndMetadata.message(), partitionId, messageAndOffset.offset());
    }

    private ConsumerRecord createConsumerRecord(MessageAndOffset messageAndOffset, String topicName, int partitionId, EmbeddedFormat embeddedFormat) {
        switch (embeddedFormat) {
            case BINARY: {
                return this.createBinaryConsumerRecord(messageAndOffset, topicName, partitionId);
            }
            case AVRO: {
                return this.createAvroConsumerRecord(messageAndOffset, topicName, partitionId);
            }
            case JSON: {
                return this.createJsonConsumerRecord(messageAndOffset, topicName, partitionId);
            }
        }
        throw new RestServerErrorException("Invalid embedded format for new consumer.", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
    }

    private ByteBufferMessageSet fetchRecords(String topicName, int partitionId, long offset, SimpleFetcher simpleFetcher) {
        SimpleConsumerConfig simpleConsumerConfig = this.simpleConsumerFactory.getSimpleConsumerConfig();
        HashMap<TopicAndPartition, PartitionFetchInfo> requestInfo = new HashMap<TopicAndPartition, PartitionFetchInfo>();
        requestInfo.put(new TopicAndPartition(topicName, partitionId), new PartitionFetchInfo(offset, simpleConsumerConfig.fetchMessageMaxBytes()));
        int corId = this.correlationId.incrementAndGet();
        FetchRequest req = new FetchRequest(corId, simpleFetcher.clientId(), simpleConsumerConfig.fetchWaitMaxMs(), simpleConsumerConfig.fetchMinBytes(), requestInfo);
        FetchResponse fetchResponse = simpleFetcher.fetch(req);
        if (fetchResponse.hasError()) {
            short kafkaErrorCode = fetchResponse.errorCode(topicName, partitionId);
            throw Errors.kafkaErrorException((Throwable)new Exception("Fetch response contains an error code: " + kafkaErrorCode));
        }
        return fetchResponse.messageSet(topicName, partitionId);
    }

    public void shutdown() {
        for (SimpleConsumerPool pool : this.simpleConsumersPools.values()) {
            pool.shutdown();
        }
    }
}

