/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.common.MessageStreamsExistException;
import io.confluent.kafka.consumer.KafkaStream;
import io.confluent.kafka.javaapi.consumer.ConsumerConnector;
import io.confluent.kafka.message.MessageAndMetadata;
import io.confluent.kafka.serializer.Decoder;
import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.ConsumerTopicState;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class ConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private KafkaRestConfig config;
    private ConsumerInstanceId instanceId;
    private ConsumerConnector consumer;
    private Map<String, ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> topics;
    volatile long expiration;
    private ReadWriteLock lock;

    public ConsumerState(KafkaRestConfig config, ConsumerInstanceId instanceId, ConsumerConnector consumer) {
        this.config = config;
        this.instanceId = instanceId;
        this.consumer = consumer;
        this.topics = new HashMap<String, ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>>();
        this.expiration = config.getTime().milliseconds() + (long)config.getInt("consumer.instance.timeout.ms").intValue();
        this.lock = new ReentrantReadWriteLock();
    }

    public ConsumerInstanceId getId() {
        return this.instanceId;
    }

    protected abstract Decoder<KafkaKeyT> getKeyDecoder();

    protected abstract Decoder<KafkaValueT> getValueDecoder();

    public abstract ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord(MessageAndMetadata<KafkaKeyT, KafkaValueT> var1);

    public void startRead(ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> topicState) {
        this.lock.readLock().lock();
        topicState.lock();
    }

    public void finishRead(ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> topicState) {
        topicState.unlock();
        this.lock.readLock().unlock();
    }

    public List<TopicPartitionOffset> commitOffsets() {
        this.lock.writeLock().lock();
        try {
            List<TopicPartitionOffset> result;
            this.consumer.commitOffsets();
            List<TopicPartitionOffset> list = result = this.getOffsets(true);
            return list;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void close() {
        this.lock.writeLock().lock();
        try {
            this.consumer.shutdown();
            this.consumer = null;
            this.topics = null;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public boolean expired(long nowMs) {
        return this.expiration <= nowMs;
    }

    public void updateExpiration() {
        this.expiration = this.config.getTime().milliseconds() + (long)this.config.getInt("consumer.instance.timeout.ms").intValue();
    }

    public KafkaRestConfig getConfig() {
        return this.config;
    }

    public void setConfig(KafkaRestConfig config) {
        this.config = config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> getOrCreateTopicState(String topic) {
        ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> state;
        this.lock.readLock().lock();
        try {
            if (this.topics == null) {
                ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState = null;
                return consumerTopicState;
            }
            state = this.topics.get(topic);
            if (state != null) {
                ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState = state;
                return consumerTopicState;
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        this.lock.writeLock().lock();
        try {
            if (this.topics == null) {
                state = null;
                return state;
            }
            state = this.topics.get(topic);
            if (state != null) {
                ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState = state;
                return consumerTopicState;
            }
            TreeMap<String, Integer> subscriptions = new TreeMap<String, Integer>();
            subscriptions.put(topic, 1);
            Map<String, List<KafkaStream<KafkaKeyT, KafkaValueT>>> streamsByTopic = this.consumer.createMessageStreams(subscriptions, this.getKeyDecoder(), this.getValueDecoder());
            KafkaStream<KafkaKeyT, KafkaValueT> stream = streamsByTopic.get(topic).get(0);
            state = new ConsumerTopicState(stream);
            this.topics.put(topic, state);
            ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState = state;
            return consumerTopicState;
        }
        catch (MessageStreamsExistException e) {
            throw Errors.consumerAlreadySubscribedException();
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TopicPartitionOffset> getOffsets(boolean updateCommitOffsets) {
        Vector<TopicPartitionOffset> result = new Vector<TopicPartitionOffset>();
        for (Map.Entry<String, ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> entry : this.topics.entrySet()) {
            ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> state = entry.getValue();
            state.lock();
            try {
                for (Map.Entry<Integer, Long> partEntry : state.getConsumedOffsets().entrySet()) {
                    Integer partition = partEntry.getKey();
                    Long offset = partEntry.getValue();
                    Long committedOffset = 0L;
                    if (updateCommitOffsets) {
                        state.getCommittedOffsets().put(partition, offset);
                        committedOffset = offset;
                    } else {
                        committedOffset = state.getCommittedOffsets().get(partition);
                    }
                    result.add(new TopicPartitionOffset(entry.getKey(), partition.intValue(), offset.longValue(), committedOffset == null ? -1L : committedOffset));
                }
            }
            finally {
                state.unlock();
            }
        }
        return result;
    }
}

