/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl.listener;

import com.google.protobuf.InvalidProtocolBufferException;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.jni.ListenerRecord;
import com.mapr.fs.jni.MapRUserInfo;
import com.mapr.fs.jni.MarlinJniListener;
import com.mapr.fs.jni.NativeData;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.MarlinClient;
import com.mapr.streams.impl.MarlinTopicInfo;
import com.mapr.streams.impl.listener.MarlinListener;
import com.mapr.streams.impl.listener.NativeDataParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinListenerImpl
extends MarlinJniListener {
    private static final Logger LOG = LoggerFactory.getLogger(MarlinListenerImpl.class);
    private boolean closed = false;
    private String defaultStreamName;
    private boolean recordStripStreamPath = false;

    private static void checkConsumerConfig(ConsumerConfig config) throws KafkaException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            String string = config.getString(mConfDef.getClientID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid client id configuration");
            throw e;
        }
        try {
            String e = config.getString(mConfDef.getGroupID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid group.id configuration");
            throw e;
        }
        try {
            boolean e = config.getBoolean(mConfDef.getAutoCommitEnabled());
        }
        catch (ConfigException e) {
            LOG.error("Invalid auto commit enabled configuration");
            throw e;
        }
        try {
            Long autoCommitInt = config.getLong(mConfDef.getAutoCommitInterval());
            if (autoCommitInt < 0L) {
                throw new ConfigException(mConfDef.getAutoCommitInterval() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoCommitInterval() + " configuration");
            throw e;
        }
        try {
            Long metadataMaxAge = config.getLong(mConfDef.getMetadataMaxAge());
            if (metadataMaxAge < 0L) {
                throw new ConfigException(mConfDef.getMetadataMaxAge() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getMetadataMaxAge() + " configuration");
            throw e;
        }
        try {
            Integer fetchMsgMaxBytes = config.getInt(mConfDef.getFetchMsgMaxBytesPerPartition());
            if (fetchMsgMaxBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMsgMaxBytesPerPartition() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMsgMaxBytesPerPartition() + " configuration");
            throw e;
        }
        try {
            Integer fetchMinBytes = config.getInt(mConfDef.getFetchMinBytes());
            if (fetchMinBytes < 0) {
                throw new ConfigException(mConfDef.getFetchMinBytes() + " cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getFetchMinBytes() + " configuration");
            throw e;
        }
        try {
            String e = config.getString(mConfDef.getAutoOffsetReset());
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getAutoOffsetReset() + " configuration");
            throw e;
        }
        try {
            boolean e = config.getBoolean(mConfDef.getRecordStripStreamPath());
        }
        catch (ConfigException e) {
            LOG.error(mConfDef.getRecordStripStreamPath() + " configuration");
            throw e;
        }
    }

    public MarlinListenerImpl(ConsumerConfig config) {
        MapRUserInfo userInfo;
        LOG.debug("Starting Streams Listener");
        MarlinListenerImpl.checkConsumerConfig(config);
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this.defaultStreamName = null;
        int rpcTimeout = 0;
        boolean hardMount = true;
        try {
            this.defaultStreamName = config.getString(mConfDef.getConsumerDefaultStream());
            this.recordStripStreamPath = config.getBoolean(mConfDef.getRecordStripStreamPath());
            rpcTimeout = config.getInt(mConfDef.getRpcTimeout());
            hardMount = config.getBoolean(mConfDef.getHardMount());
        }
        catch (ConfigException configException) {
            // empty catch block
        }
        try {
            userInfo = MapRFileSystem.CurrentUserInfo();
        }
        catch (IOException e) {
            throw new KafkaException("Could not create MarlinListener", (Throwable)e);
        }
        this._clntPtr = this.OpenListener(config.getString(mConfDef.getClientID()), config.getString(mConfDef.getGroupID()), rpcTimeout, hardMount, config.getBoolean(mConfDef.getAutoCommitEnabled()), config.getLong(mConfDef.getAutoCommitInterval()), config.getLong(mConfDef.getMetadataMaxAge()), config.getInt(mConfDef.getFetchMsgMaxBytesPerPartition()), config.getInt(mConfDef.getFetchMinBytes()), config.getInt(mConfDef.getFetchMaxWaitMs()), config.getString(mConfDef.getAutoOffsetReset()), this.defaultStreamName, config.getLong(mConfDef.getConsumerBufferMemory()), config.getBoolean(mConfDef.getNegativeOffsetOnEof()), userInfo);
        if (this._clntPtr == 0L) {
            throw new NetworkException("Could not create Consumer. Please ensure that the CLDB service is configured properly and is available");
        }
        LOG.debug("Streams listener created");
    }

    public Set<TopicPartition> assignment() {
        NativeData native_response = new NativeData();
        int err = this.AssignmentList(this._clntPtr, native_response);
        if (err != 0) {
            return new HashSet<TopicPartition>();
        }
        HashSet<TopicPartition> result = new HashSet<TopicPartition>();
        NativeDataParser nativeDataParser = new NativeDataParser(native_response);
        while (nativeDataParser.HasData()) {
            result.add(nativeDataParser.getNextTopicPartition());
        }
        return result;
    }

    public Set<String> subscription() {
        MarlinStringArrayWrapperImpl result = new MarlinStringArrayWrapperImpl();
        int err = this.SubscriptionList(this._clntPtr, result);
        if (err != 0) {
            return new HashSet<String>();
        }
        return result.GetStringSet();
    }

    public void subscribe(List<String> topics, ConsumerRebalanceListener callback) throws KafkaException {
        int err;
        String[] topicArr = new String[topics.size()];
        int i = 0;
        Iterator<String> iterator = topics.iterator();
        while (iterator.hasNext()) {
            String topic;
            topicArr[i] = topic = iterator.next();
            ++i;
        }
        MarlinRebalanceCallbackWrapperImpl rebalancecb = null;
        if (callback != null) {
            rebalancecb = new MarlinRebalanceCallbackWrapperImpl(callback);
        }
        if ((err = this.SubscribeTopics(this._clntPtr, topicArr, rebalancecb)) != 0) {
            throw MarlinClient.jniErrToException(err, "Could not subscribe to topics");
        }
    }

    public void assign(List<TopicPartition> partitions) {
        String[] topicArr = new String[partitions.size()];
        int[] feedIdArr = new int[partitions.size()];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.AssignFeeds(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not assign partitions");
        }
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        String patternString;
        MarlinRebalanceCallbackWrapperImpl rebalancecb = null;
        if (callback != null) {
            rebalancecb = new MarlinRebalanceCallbackWrapperImpl(callback);
        }
        if (!this.checkPatternValid(patternString = pattern.toString())) {
            throw MarlinClient.jniErrToException(22, "Could not subscribe, as invalid pattern " + patternString + " is passed");
        }
        int err = this.SubscribeRegex(this._clntPtr, patternString, rebalancecb);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not subscribe to pattern " + patternString);
        }
    }

    public void unsubscribe() {
        int err = this.Unsubscribe(this._clntPtr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "unsubscribe failed");
        }
    }

    public Map<TopicPartition, List<ListenerRecord>> poll(long timeout) {
        NativeData native_response = new NativeData();
        int err = this.Poll(this._clntPtr, timeout, native_response);
        if (err != 0 || native_response.error() != 0) {
            throw MarlinClient.jniErrToException(err, "poll failed");
        }
        NativeDataParser nativeDataParser = new NativeDataParser(native_response);
        return nativeDataParser.parseListenerRecords(this.recordStripStreamPath);
    }

    public void commitSync() {
        int err = this.CommitAll(this._clntPtr, true, null);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitSync()");
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitMap(offsets, true, null);
    }

    public void commitAsync() {
        int err = this.CommitAll(this._clntPtr, false, null);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitSync()");
        }
    }

    public void commitAsync(OffsetCommitCallback callback) {
        int err;
        MarlinCommitCallbackWrapperImpl cbwrapper = null;
        if (callback != null) {
            cbwrapper = new MarlinCommitCallbackWrapperImpl(callback);
        }
        if ((err = this.CommitAll(this._clntPtr, false, cbwrapper)) != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commitAsync(callback)");
        }
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        MarlinCommitCallbackWrapperImpl cbwrapper = null;
        if (callback != null) {
            cbwrapper = new MarlinCommitCallbackWrapperImpl(callback);
        }
        this.commitMap(offsets, false, cbwrapper);
    }

    private void commitMap(Map<TopicPartition, OffsetAndMetadata> offsets, boolean commitType, MarlinCommitCallbackWrapperImpl callback) {
        String[] topicArr = new String[offsets.size()];
        int[] feedIdArr = new int[offsets.size()];
        long[] offsetArr = new long[offsets.size()];
        int i = 0;
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i++] = entry.getValue().offset();
        }
        int err = this.Commit(this._clntPtr, topicArr, feedIdArr, offsetArr, commitType, callback);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not commit");
        }
    }

    private void seekInternal(long offset, TopicPartition ... partitions) throws KafkaException {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        long[] offsetArr = new long[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i] = partition.partition();
            offsetArr[i++] = offset;
        }
        int err = this.Seek(this._clntPtr, topicArr, feedIdArr, offsetArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not seek");
        }
    }

    public void seek(TopicPartition partition, long offset) {
        this.seekInternal(offset, partition);
    }

    public void seekToBeginning(TopicPartition ... partitions) {
        this.seekInternal(0L, partitions);
    }

    public void seekToEnd(TopicPartition ... partitions) {
        this.seekInternal(Long.MAX_VALUE, partitions);
    }

    public long position(TopicPartition partition) throws KafkaException {
        NativeData native_response = new NativeData();
        int err = this.QueryPosition(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not query position");
        }
        return native_response.long_data[0];
    }

    public OffsetAndMetadata committed(TopicPartition partition) throws KafkaException {
        NativeData native_response = new NativeData();
        int err = this.QueryCursor(this._clntPtr, partition.topic(), partition.partition(), native_response);
        if (err == 42) {
            return null;
        }
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Could not query committed offset");
        }
        long offset = native_response.long_data[0];
        return new OffsetAndMetadata(offset);
    }

    public List<PartitionInfo> getTopicInfo(String topic) throws KafkaException {
        int nfeeds = this.GetTopicInfo(this._clntPtr, topic);
        if (nfeeds < 0) {
            throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + -nfeeds);
        }
        MarlinTopicInfo result = new MarlinTopicInfo(topic, nfeeds);
        return result.getKafkaPartitionInfo();
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.listTopics((String)null);
    }

    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        String patternString = pattern.toString();
        if (!this.checkPatternValid(patternString)) {
            throw MarlinClient.jniErrToException(22, "Could not listTopics, as invalid pattern " + patternString + " is passed");
        }
        return this.listTopics(pattern.toString());
    }

    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        HashMap<String, List<PartitionInfo>> topicsToReturn = new HashMap<String, List<PartitionInfo>>();
        MarlinStringArrayWrapperImpl result = new MarlinStringArrayWrapperImpl();
        int err = this.GetTopicsFromStream(this._clntPtr, stream, result);
        if (err == 116) {
            err = this.GetTopicsFromStream(this._clntPtr, stream, result);
        }
        if (err != 0) {
            LOG.debug("Could not get list of topics for stream " + stream + ", err " + err);
            return topicsToReturn;
        }
        List<String> topics = result.GetStringList();
        int[] numParts = result.GetNumPartitions();
        if (topics.size() != numParts.length) {
            LOG.error("Could not get list of topics for stream " + stream + ", got " + topics.size() + " topics, but got " + numParts.length + " partitions");
            return topicsToReturn;
        }
        int i = 0;
        for (String topic : topics) {
            MarlinTopicInfo mti = new MarlinTopicInfo(topic, numParts[i]);
            topicsToReturn.put(topic, mti.getKafkaPartitionInfo());
            ++i;
        }
        return topicsToReturn;
    }

    public void pause(TopicPartition ... partitions) {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Pause(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Error while pausing topic partitions " + partitions);
        }
    }

    public void resume(TopicPartition ... partitions) {
        String[] topicArr = new String[partitions.length];
        int[] feedIdArr = new int[partitions.length];
        int i = 0;
        for (TopicPartition partition : partitions) {
            topicArr[i] = partition.topic();
            feedIdArr[i++] = partition.partition();
        }
        int err = this.Resume(this._clntPtr, topicArr, feedIdArr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "Error while resuming topic partitions " + partitions);
        }
    }

    public Marlinserver.JoinGroupResponse join(Marlinserver.JoinGroupDesc joinDesc, MarlinListener.MarlinJoinCallback cb) {
        byte[] serialDesc = joinDesc.toByteArray();
        MarlinJoinCallbackWrapperImpl cbWrapper = null;
        if (cb != null) {
            cbWrapper = new MarlinJoinCallbackWrapperImpl(cb);
        }
        byte[] resp = this.Join(this._clntPtr, serialDesc, cbWrapper);
        Marlinserver.JoinGroupResponse joinResp = null;
        try {
            joinResp = Marlinserver.JoinGroupResponse.parseFrom((byte[])resp);
        }
        catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            throw new KafkaException("Error parsing Join response");
        }
        return joinResp;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        MarlinListenerImpl marlinListenerImpl = this;
        synchronized (marlinListenerImpl) {
            if (this._clntPtr != 0L) {
                this.CloseListener(this._clntPtr);
            }
            this._clntPtr = 0L;
        }
    }

    public void wakeup() {
        this.Wakeup(this._clntPtr);
    }

    private boolean checkPatternValid(String pattern) {
        Object lastPart;
        int idx;
        String topic = pattern;
        int slashIdx = pattern.lastIndexOf(47);
        if (slashIdx >= 0 && (idx = ((String)(lastPart = pattern.substring(slashIdx + 1))).indexOf(58)) >= 0) {
            topic = ((String)lastPart).substring(idx + 1);
        }
        try {
            lastPart = Pattern.compile(topic);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    public class MarlinJoinCallbackWrapperImpl
    implements MarlinJniListener.MarlinJoinCallbackWrapper {
        private MarlinListener.MarlinJoinCallback joincb;

        public MarlinJoinCallbackWrapperImpl(MarlinListener.MarlinJoinCallback cb) {
            this.joincb = cb;
        }

        public void onJoin(byte[] data) {
            try {
                Marlinserver.JoinGroupInfo joinInfo = Marlinserver.JoinGroupInfo.parseFrom((byte[])data);
                this.joincb.onJoin(joinInfo);
            }
            catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
                return;
            }
        }

        public void onRejoin(byte[] data) {
            try {
                Marlinserver.JoinGroupInfo joinInfo = Marlinserver.JoinGroupInfo.parseFrom((byte[])data);
                this.joincb.onRejoin(joinInfo);
            }
            catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
                return;
            }
        }
    }

    public class MarlinCommitCallbackWrapperImpl
    implements MarlinJniListener.MarlinCommitCallbackWrapper {
        private OffsetCommitCallback commitcb;

        public MarlinCommitCallbackWrapperImpl(OffsetCommitCallback cb) {
            this.commitcb = cb;
        }

        public void onComplete(NativeData data, long[] offsets, int errorCode) {
            HashMap<TopicPartition, OffsetAndMetadata> feedAndOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
            int index = 0;
            NativeDataParser dataParser = new NativeDataParser(data);
            while (dataParser.HasData()) {
                feedAndOffsets.put(dataParser.getNextTopicPartition(), new OffsetAndMetadata(offsets[index]));
                ++index;
            }
            this.commitcb.onComplete(feedAndOffsets, (Exception)MarlinClient.jniErrToException(errorCode, null));
        }
    }

    public class MarlinRebalanceCallbackWrapperImpl
    implements MarlinJniListener.MarlinRebalanceCallbackWrapper {
        private ConsumerRebalanceListener rebalanceListener;

        public MarlinRebalanceCallbackWrapperImpl(ConsumerRebalanceListener rl) {
            this.rebalanceListener = rl;
        }

        public void onPartitionsAssigned(NativeData data) {
            if (this.rebalanceListener != null) {
                ArrayList<TopicPartition> feeds = new ArrayList<TopicPartition>();
                NativeDataParser dataParser = new NativeDataParser(data);
                while (dataParser.HasData()) {
                    feeds.add(dataParser.getNextTopicPartition());
                }
                this.rebalanceListener.onPartitionsAssigned(feeds);
            }
        }

        public void onPartitionsRevoked(NativeData data) {
            if (this.rebalanceListener != null) {
                ArrayList<TopicPartition> feeds = new ArrayList<TopicPartition>();
                NativeDataParser dataParser = new NativeDataParser(data);
                while (dataParser.HasData()) {
                    feeds.add(dataParser.getNextTopicPartition());
                }
                this.rebalanceListener.onPartitionsRevoked(feeds);
            }
        }
    }

    public class MarlinStringArrayWrapperImpl
    implements MarlinJniListener.MarlinStringArrayWrapper {
        private String topicNames;
        private int[] topicNameSizes;
        private int[] numPartitions;
        private int numTopics;

        public void SetStringArrayElements(String tn, int[] tns, int[] np, int nt) {
            this.topicNames = tn;
            this.topicNameSizes = tns;
            this.numPartitions = np;
            this.numTopics = nt;
        }

        public Set<String> GetStringSet() {
            HashSet<String> toReturn = new HashSet<String>();
            int offset = 0;
            for (int i = 0; i < this.numTopics; ++i) {
                String tname = this.topicNames.substring(offset, offset + this.topicNameSizes[i]);
                offset += this.topicNameSizes[i];
                toReturn.add(tname);
            }
            return toReturn;
        }

        public List<String> GetStringList() {
            ArrayList<String> toReturn = new ArrayList<String>(this.numTopics);
            int offset = 0;
            for (int i = 0; i < this.numTopics; ++i) {
                String tname = this.topicNames.substring(offset, offset + this.topicNameSizes[i]);
                offset += this.topicNameSizes[i];
                toReturn.add(tname);
            }
            return toReturn;
        }

        public int[] GetNumPartitions() {
            return this.numPartitions;
        }
    }
}

