/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl.admin;

import com.mapr.db.Admin;
import com.mapr.db.FamilyDescriptor;
import com.mapr.db.MapRDB;
import com.mapr.db.Table;
import com.mapr.db.TableDescriptor;
import com.mapr.db.impl.TableDescriptorImpl;
import com.mapr.fs.AceHelper;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.MapRTabletScanner;
import com.mapr.fs.jni.Errno;
import com.mapr.fs.jni.MapRConstants;
import com.mapr.fs.jni.MarlinJniAdmin;
import com.mapr.fs.proto.Common;
import com.mapr.fs.proto.Dbserver;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.fs.tables.CFPermissions;
import com.mapr.fs.tables.MapRAdmin;
import com.mapr.security.UnixUserGroupHelper;
import com.mapr.streams.impl.MarlinRowKeyDecoder;
import com.mapr.streams.impl.admin.AssignInfo;
import com.mapr.streams.impl.admin.CursorInfo;
import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.ojai.Document;
import org.ojai.DocumentConstants;
import org.ojai.DocumentStream;
import org.ojai.store.QueryCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinAdminImpl
extends MarlinJniAdmin {
    private static final Logger LOG = LoggerFactory.getLogger(MarlinAdminImpl.class);
    private static final int MAX_PARTITIONS = 4095;
    private static final int DEFAULT_PARTITIONS = 1;
    private static final int DEFAULT_TTL_SECS = 604800;
    private static final String COMPRESSION_RAW = "compression_raw";
    Marlinserver.MarlinInternalDefaults mdef;
    Admin dbAdmin;
    MapRAdmin maprAdmin;
    MapRFileSystem maprfs;
    long userId;
    private static ExecutorService topicFeedStatService = null;
    private static int numAdmins = 0;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MarlinAdminImpl(Configuration conf) throws IOException {
        this._clntPtr = this.OpenAdmin();
        if (this._clntPtr == 0L) {
            throw new IOException("Could not create MarlinAdminImpl");
        }
        this.mdef = Marlinserver.MarlinInternalDefaults.getDefaultInstance();
        this.dbAdmin = MapRDB.newAdmin();
        try {
            this.maprfs = new MapRFileSystem();
            this.maprfs.initialize(new URI("maprfs:///"), conf);
        }
        catch (IOException | URISyntaxException e) {
            throw e instanceof IOException ? (IOException)e : new IOException(e);
        }
        this.maprAdmin = new MapRAdmin(this.maprfs);
        String username = UserGroupInformation.getCurrentUser().getUserName();
        UnixUserGroupHelper userInfo = new UnixUserGroupHelper();
        this.userId = userInfo.getUserId(username);
        Class<MarlinAdminImpl> clazz = MarlinAdminImpl.class;
        synchronized (MarlinAdminImpl.class) {
            if (topicFeedStatService == null) {
                topicFeedStatService = Executors.newFixedThreadPool(16);
            }
            ++numAdmins;
            // ** MonitorExit[var4_5] (shouldn't be in output)
            return;
        }
    }

    public void createStream(String streamName, MStreamDescriptor sdesc) throws IOException, IllegalArgumentException {
        if (streamName == null) {
            throw new IllegalArgumentException("streamName cannot be null");
        }
        String[] streamNameSplits = streamName.split("/");
        String strippedStreamName = streamNameSplits.length >= 1 ? streamNameSplits[streamNameSplits.length - 1] : streamName;
        if (strippedStreamName.contains(":")) {
            throw new IllegalArgumentException("streamName " + strippedStreamName + " cannot contain ':'");
        }
        if (sdesc == null) {
            throw new IllegalArgumentException("MStreamDescriptor cannot be null");
        }
        this.validateStreamDesc(sdesc);
        int numPartitions = 1;
        if (sdesc.hasDefaultPartitions()) {
            numPartitions = sdesc.getDefaultPartitions();
        }
        long timeToLiveSecs = 604800L;
        if (sdesc.hasTimeToLiveSec()) {
            timeToLiveSecs = sdesc.getTimeToLiveSec();
        }
        boolean autoCreateTopics = true;
        if (sdesc.hasAutoCreateTopics()) {
            autoCreateTopics = sdesc.getAutoCreateTopics();
        }
        FamilyDescriptor.Compression ctype = FamilyDescriptor.Compression.Inherited;
        if (sdesc.hasCompressionAlgo()) {
            ctype = this.compressionNameToType(sdesc.getCompressionAlgo());
        }
        boolean clientCompression = true;
        if (sdesc.hasClientCompression()) {
            clientCompression = sdesc.getClientCompression();
        }
        String[] splits = this.getSplits(numPartitions);
        TableDescriptorImpl desc = (TableDescriptorImpl)MapRDB.newTableDescriptor((String)streamName);
        desc.setInsertionOrder(false);
        desc.setMaxValueSizeInMemIndex(Integer.MAX_VALUE);
        desc.setClientCompression(clientCompression);
        desc.setStream();
        desc.setStreamAutoCreate(autoCreateTopics);
        desc.setStreamDefaultPartitions(numPartitions);
        FamilyDescriptor fdesc = MapRDB.newFamilyDescriptor().setName(this.mdef.getCfMessages()).setTTL(timeToLiveSecs).setCompression(ctype);
        desc.addFamily(fdesc);
        fdesc = MapRDB.newFamilyDescriptor().setName(this.mdef.getCfTopicMeta()).setJsonFieldPath(this.mdef.getCfTopicMeta()).setCompression(FamilyDescriptor.Compression.None);
        desc.addFamily(fdesc);
        fdesc = MapRDB.newFamilyDescriptor().setName(this.mdef.getCfCursors()).setJsonFieldPath(this.mdef.getCfCursors()).setCompression(FamilyDescriptor.Compression.None);
        desc.addFamily(fdesc);
        fdesc = MapRDB.newFamilyDescriptor().setName(this.mdef.getCfFeedAssigns()).setJsonFieldPath(this.mdef.getCfFeedAssigns()).setCompression(FamilyDescriptor.Compression.None);
        desc.addFamily(fdesc);
        this.dbAdmin.createTable((TableDescriptor)desc, splits);
        Map<String, String> tablePerms = this.buildTablePermissions(sdesc);
        List<CFPermissions> cfPerms = this.buildFamilyPermissions(sdesc, null);
        this.setPermissions(streamName, tablePerms, cfPerms);
    }

    public void editStream(String streamName, MStreamDescriptor sdesc) throws IOException, IllegalArgumentException {
        if (streamName == null) {
            throw new IllegalArgumentException("streamName cannot be null");
        }
        if (sdesc == null) {
            throw new IllegalArgumentException("MStreamDescriptor cannot be null");
        }
        this.validateStreamDesc(sdesc);
        TableDescriptorImpl desc = this.checkStreamAndGetTableDescriptor(streamName);
        TableDescriptorImpl modDesc = (TableDescriptorImpl)MapRDB.newTableDescriptor((boolean)false);
        boolean alterTable = false;
        FamilyDescriptor fMsgs = MapRDB.newFamilyDescriptor();
        boolean alterFamily = false;
        if (sdesc.hasDefaultPartitions()) {
            int numPartitionsPerTopic = sdesc.getDefaultPartitions();
            modDesc.setStreamDefaultPartitions(numPartitionsPerTopic);
            alterTable = true;
        }
        if (sdesc.hasTimeToLiveSec()) {
            long timeToLiveSecs = sdesc.getTimeToLiveSec();
            fMsgs.setTTL(timeToLiveSecs);
            alterFamily = true;
        }
        if (sdesc.hasAutoCreateTopics()) {
            boolean autoCreateTopics = sdesc.getAutoCreateTopics();
            modDesc.setStreamAutoCreate(autoCreateTopics);
            alterTable = true;
        }
        if (sdesc.hasCompressionAlgo()) {
            FamilyDescriptor.Compression ctype = this.compressionNameToType(sdesc.getCompressionAlgo());
            fMsgs.setCompression(ctype);
            alterFamily = true;
        }
        if (sdesc.hasClientCompression()) {
            boolean val = sdesc.getClientCompression();
            modDesc.setClientCompression(val);
            alterTable = true;
        }
        if (alterTable) {
            modDesc.setPath(streamName);
            modDesc.setFamilies(desc.getFamilies());
            this.dbAdmin.alterTable((TableDescriptor)modDesc);
        }
        if (alterFamily) {
            this.dbAdmin.alterFamily(new Path(streamName), this.mdef.getCfMessages(), fMsgs);
        }
        if (sdesc.hasAdminPerms() || sdesc.hasProducePerms() || sdesc.hasListenPerms() || sdesc.hasTopicPerms() || sdesc.hasCopyPerms() || sdesc.hasAdminPerms()) {
            MStreamDescriptor oldDesc = new MStreamDescriptor();
            this.populatePermissionsFromTable(streamName, oldDesc);
            if (!oldDesc.hasAdminPerms()) {
                throw new IOException("Access denied for table permissions");
            }
            Map<String, String> tablePerms = this.buildTablePermissions(sdesc);
            List<CFPermissions> cfPerms = this.buildFamilyPermissions(sdesc, oldDesc);
            this.setPermissions(streamName, tablePerms, cfPerms);
        }
    }

    public MStreamDescriptor getStreamDescriptor(String streamName) throws IOException, IllegalArgumentException {
        TableDescriptorImpl desc = this.checkStreamAndGetTableDescriptor(streamName);
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setAutoCreateTopics(desc.isStreamAutoCreate());
        sdesc.setDefaultPartitions(desc.getStreamDefaultPartitions());
        sdesc.setClientCompression(desc.getClientCompression());
        FamilyDescriptor fMsgs = desc.getFamily(this.mdef.getCfMessages());
        sdesc.setTimeToLiveSec(fMsgs.getTTL());
        sdesc.setCompressionAlgo(this.compressionTypeToName(fMsgs.getCompression()));
        this.populatePermissionsFromTable(streamName, sdesc);
        return sdesc;
    }

    public static void createStreamForCopy(String replicaStreamName, String srcStreamName) throws IOException, IllegalArgumentException {
        MarlinAdminImpl madmin = new MarlinAdminImpl(new Configuration());
        madmin.createStream(replicaStreamName, madmin.getStreamDescriptor(srcStreamName));
    }

    public void deleteStream(String streamName) throws IOException, IllegalArgumentException {
        this.checkStreamAndGetTableDescriptor(streamName);
        this.dbAdmin.deleteTable(streamName);
    }

    public int countTopics(String streamName) throws IOException, IllegalArgumentException {
        this.checkStreamAndGetTableDescriptor(streamName);
        HashMap topicMap = new HashMap();
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        int numTopics = 0;
        QueryCondition condition = MapRDB.newCondition().and().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER, this.mdef.getKeyPrefixTopicMeta()).is(DocumentConstants.ID_FIELD, QueryCondition.Op.LESS, this.mdef.getKeyPrefixTopicMetaEnd()).close().build();
        Table table = MapRDB.getTable((Path)new Path(streamName));
        DocumentStream recStream = table.find(condition, new String[]{this.mdef.getCfTopicMeta()});
        for (Document rec : recStream) {
            Marlinserver.MarlinTopicMetaEntry tmeta = this.jsonRecToTopicMeta(rec);
            if (tmeta.getIsDeleted()) continue;
            ++numTopics;
        }
        return numTopics;
    }

    private FamilyDescriptor.Compression compressionNameToType(String name) {
        if (name.equalsIgnoreCase("off")) {
            return FamilyDescriptor.Compression.None;
        }
        if (name.equalsIgnoreCase("lz4")) {
            return FamilyDescriptor.Compression.LZ4;
        }
        if (name.equalsIgnoreCase("lzf")) {
            return FamilyDescriptor.Compression.LZF;
        }
        if (name.equalsIgnoreCase("zlib")) {
            return FamilyDescriptor.Compression.ZLIB;
        }
        throw new IllegalArgumentException("Unknown compression type " + name);
    }

    private String compressionTypeToName(FamilyDescriptor.Compression ctype) {
        String name;
        switch (ctype) {
            case LZ4: {
                name = "lz4";
                break;
            }
            case LZF: {
                name = "lzf";
                break;
            }
            case ZLIB: {
                name = "zlib";
                break;
            }
            default: {
                name = "off";
            }
        }
        return name;
    }

    private void validateStreamDesc(MStreamDescriptor sdesc) throws IOException {
        long ttl;
        int numPartitions;
        if (sdesc.hasDefaultPartitions() && ((numPartitions = sdesc.getDefaultPartitions()) < 1 || numPartitions > 4095)) {
            throw new IllegalArgumentException("defaultPartitions has an invalid value " + numPartitions + ", it must between 1 and " + 4095);
        }
        if (sdesc.hasTimeToLiveSec() && (ttl = sdesc.getTimeToLiveSec()) < 0L) {
            throw new IllegalArgumentException("timeToLive has an invalid value " + ttl + ", it must be >= 0");
        }
        if (sdesc.hasCompressionAlgo()) {
            this.compressionNameToType(sdesc.getCompressionAlgo());
        }
        if (sdesc.hasProducePerms()) {
            this.validateAceExpression(sdesc.getProducePerms());
        }
        if (sdesc.hasListenPerms()) {
            this.validateAceExpression(sdesc.getListenPerms());
        }
        if (sdesc.hasTopicPerms()) {
            this.validateAceExpression(sdesc.getTopicPerms());
        }
        if (sdesc.hasCopyPerms()) {
            this.validateAceExpression(sdesc.getCopyPerms());
        }
        if (sdesc.hasAdminPerms()) {
            this.validateAceExpression(sdesc.getAdminPerms());
        }
    }

    private void validateAceExpression(String aceExpression) throws IOException {
        AceHelper.toPostfix((String)aceExpression);
    }

    private String[] getSplits(int numSplits) {
        String[] splits = new String[numSplits];
        for (int i = 0; i < numSplits; ++i) {
            splits[i] = String.format(this.mdef.getKeyPrefixFeedId() + this.mdef.getKeyFmtFeedId(), i);
        }
        return splits;
    }

    private TableDescriptorImpl checkStreamAndGetTableDescriptor(String streamName) throws IllegalArgumentException, IOException {
        TableDescriptorImpl desc = (TableDescriptorImpl)this.dbAdmin.getTableDescriptor(streamName);
        if (!desc.isStream()) {
            throw new IllegalArgumentException(streamName + " is not a stream");
        }
        return desc;
    }

    private Map<String, String> buildTablePermissions(MStreamDescriptor sdesc) {
        HashMap<String, String> tperms = new HashMap<String, String>();
        if (!sdesc.hasAdminPerms()) {
            return tperms;
        }
        String adminPerms = sdesc.getAdminPerms();
        for (String aceName : AceHelper.tblPermissionMap.values()) {
            tperms.put(aceName, adminPerms);
        }
        return tperms;
    }

    private String mergeAces(String ace1, String ace2) {
        String mergedAce = ace1.length() == 0 ? ace2 : (ace2.length() == 0 ? ace1 : (ace1.equalsIgnoreCase(ace2) ? ace1 : (ace1.equals("p") || ace2.equals("p") ? "p" : "(" + ace1 + ") | (" + ace2 + ")")));
        LOG.debug("Merged " + ace1 + " + " + ace2 + " to -> " + mergedAce);
        return mergedAce;
    }

    private List<CFPermissions> buildFamilyPermissions(MStreamDescriptor newDesc, MStreamDescriptor oldDesc) {
        String user = new String("u:" + this.userId);
        String producers = newDesc.hasProducePerms() ? newDesc.getProducePerms() : (oldDesc != null ? oldDesc.getProducePerms() : user);
        String listeners = newDesc.hasListenPerms() ? newDesc.getListenPerms() : (oldDesc != null ? oldDesc.getListenPerms() : user);
        String topicEditors = newDesc.hasTopicPerms() ? newDesc.getTopicPerms() : (oldDesc != null ? oldDesc.getTopicPerms() : user);
        String copiers = newDesc.hasCopyPerms() ? newDesc.getCopyPerms() : (oldDesc != null ? oldDesc.getCopyPerms() : user);
        String admins = newDesc.hasAdminPerms() ? newDesc.getAdminPerms() : (oldDesc != null ? oldDesc.getAdminPerms() : user);
        String allList = this.mergeAces(producers, listeners);
        if (!copiers.equals(producers) && !copiers.equals(listeners)) {
            allList = this.mergeAces(allList, copiers);
        }
        if (!(topicEditors.equals(producers) || topicEditors.equals(listeners) || topicEditors.equals(copiers))) {
            allList = this.mergeAces(allList, topicEditors);
        }
        if (!(admins.equals(producers) || admins.equals(listeners) || admins.equals(copiers) || admins.equals(topicEditors))) {
            allList = this.mergeAces(allList, admins);
        }
        CFPermissions msgsFamAces = new CFPermissions(this.mdef.getCfMessages());
        msgsFamAces.addCFPermission("readperm", this.mergeAces(listeners, copiers));
        msgsFamAces.addCFPermission("writeperm", this.mergeAces(producers, copiers));
        CFPermissions metaFamAces = new CFPermissions(this.mdef.getCfTopicMeta());
        metaFamAces.addCFPermission("readperm", allList);
        metaFamAces.addCFPermission("writeperm", this.mergeAces(topicEditors, copiers));
        CFPermissions cursorsFamAces = new CFPermissions(this.mdef.getCfCursors());
        cursorsFamAces.addCFPermission("readperm", allList);
        cursorsFamAces.addCFPermission("writeperm", this.mergeAces(listeners, copiers));
        CFPermissions assignFamAces = new CFPermissions(this.mdef.getCfFeedAssigns());
        assignFamAces.addCFPermission("readperm", allList);
        assignFamAces.addCFPermission("writeperm", this.mergeAces(listeners, copiers));
        msgsFamAces.addCFPermission("traverseperm", producers);
        cursorsFamAces.addCFPermission("traverseperm", listeners);
        metaFamAces.addCFPermission("traverseperm", topicEditors);
        assignFamAces.addCFPermission("traverseperm", copiers);
        ArrayList<CFPermissions> cfPermsList = new ArrayList<CFPermissions>(4);
        cfPermsList.add(msgsFamAces);
        cfPermsList.add(metaFamAces);
        cfPermsList.add(cursorsFamAces);
        cfPermsList.add(assignFamAces);
        return cfPermsList;
    }

    private void setPermissions(String streamName, Map<String, String> tablePermissions, List<CFPermissions> cfPermissions) throws IOException {
        Path streamPath = new Path(streamName);
        for (CFPermissions cfPerm : cfPermissions) {
            this.maprAdmin.setFamilyPermissions(streamPath, cfPerm.getFamily(), cfPerm);
        }
        if (!tablePermissions.isEmpty()) {
            this.maprAdmin.setTablePermissions(streamPath, tablePermissions);
        }
    }

    private void populatePermissionsFromTable(String streamName, MStreamDescriptor sdesc) throws IOException {
        Path streamPath = new Path(streamName);
        Map tablePerms = this.maprAdmin.getTablePermissions(streamPath);
        if (tablePerms.get("adminaccessperm") == null) {
            return;
        }
        sdesc.setAdminPerms((String)tablePerms.get("adminaccessperm"));
        String traversePermsName = new String("traverseperm");
        List cfPerms = this.maprAdmin.getFamilyPermissions(streamPath);
        for (CFPermissions cfp : cfPerms) {
            String cfName = cfp.getFamily();
            Map pmap = cfp.getCfPermissions();
            if (cfName.equals(this.mdef.getCfMessages())) {
                sdesc.setProducePerms((String)pmap.get(traversePermsName));
                continue;
            }
            if (cfName.equals(this.mdef.getCfCursors())) {
                sdesc.setListenPerms((String)pmap.get(traversePermsName));
                continue;
            }
            if (cfName.equals(this.mdef.getCfTopicMeta())) {
                sdesc.setTopicPerms((String)pmap.get(traversePermsName));
                continue;
            }
            if (!cfName.equals(this.mdef.getCfFeedAssigns())) continue;
            sdesc.setCopyPerms((String)pmap.get(traversePermsName));
        }
    }

    public void createTopic(String topicFullName) throws IOException {
        int err = this.CreateTopicWithDefaultFeeds(this._clntPtr, topicFullName);
        if (err != 0) {
            throw new IOException("Create topic failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
    }

    public void createTopic(String topicFullName, int nfeeds) throws IOException {
        int err = this.CreateTopic(this._clntPtr, topicFullName, nfeeds);
        if (err != 0) {
            throw new IOException("Create topic failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
    }

    public void editTopic(String topicFullName, int nfeeds) throws IOException {
        int err = this.EditTopic(this._clntPtr, topicFullName, nfeeds);
        if (err != 0) {
            throw new IOException("Edit topic failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
    }

    public void deleteTopic(String topicFullName) throws IOException {
        int err = this.DeleteTopic(this._clntPtr, topicFullName);
        if (err != 0) {
            throw new IOException("Delete topic failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
    }

    public Marlinserver.MarlinTopicMetaEntry getTopicMetaEntry(String topicFullName) throws IOException {
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        errVal.error = 0;
        byte[] entry = this.GetTopicMetaEntry(this._clntPtr, topicFullName, errVal);
        if (errVal.error != 0) {
            int err = errVal.error;
            throw new IOException("GetTopicMetaEntry failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
        Marlinserver.MarlinTopicMetaEntry mentry = Marlinserver.MarlinTopicMetaEntry.parseFrom((byte[])entry);
        return mentry;
    }

    private Marlinserver.TopicFeedStatInfo statTopicFeed(String topicFullName, int feedId, boolean headOnly) throws IOException {
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        errVal.error = 0;
        byte[] reply = this.StatTopicFeed(this._clntPtr, topicFullName, feedId, headOnly, errVal);
        if (errVal.error != 0) {
            throw new IOException("StatTopicFeed for topic " + topicFullName + " partitionId " + feedId + " failed with error : " + Errno.toString((int)errVal.error) + "(" + errVal.error + ")");
        }
        if (reply == null) {
            throw new IOException("StatTopicFeed for topic " + topicFullName + " returned null");
        }
        return Marlinserver.TopicFeedStatInfo.parseFrom((byte[])reply);
    }

    private void statTabletsAndPopulateFeedInfo(final String topicFullName, List<TopicFeedInfo> feedList, final boolean headOnly) throws IOException {
        LOG.debug("Begin : stat for all feeds");
        ArrayList<Future<Marlinserver.TopicFeedStatInfo>> futureResponses = new ArrayList<Future<Marlinserver.TopicFeedStatInfo>>();
        int i = 0;
        while (i < feedList.size()) {
            final int partitionId = i++;
            futureResponses.add(topicFeedStatService.submit(new Callable<Marlinserver.TopicFeedStatInfo>(){

                @Override
                public Marlinserver.TopicFeedStatInfo call() {
                    try {
                        LOG.debug("StatTopicFeed topic " + topicFullName + " partitionId " + partitionId);
                        return MarlinAdminImpl.this.statTopicFeed(topicFullName, partitionId, headOnly);
                    }
                    catch (Exception e) {
                        LOG.error(e.getMessage());
                        return null;
                    }
                }
            }));
        }
        int feedId = 0;
        for (Future future : futureResponses) {
            Marlinserver.TopicFeedStatInfo resp = null;
            try {
                resp = (Marlinserver.TopicFeedStatInfo)future.get();
                if (resp == null) {
                    throw new IOException("TopicFeedStat failed on one or more partitions");
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage());
                throw new IOException(e.toString());
            }
            feedList.get(feedId).updateStat(resp);
            ++feedId;
        }
    }

    public List<TopicFeedInfo> infoTopicCommon(String topicFullName, boolean headOnly) throws IOException {
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        errVal.error = 0;
        byte[] entry = this.GetTopicMetaEntry(this._clntPtr, topicFullName, errVal);
        if (errVal.error != 0) {
            int err = errVal.error;
            throw new IOException("GetTopicMetaEntry failed with error : " + Errno.toString((int)err) + " (" + err + ")");
        }
        LOG.debug("Begin : get topic meta");
        Marlinserver.MarlinTopicMetaEntry tmeta = Marlinserver.MarlinTopicMetaEntry.parseFrom((byte[])entry);
        if (tmeta.getIsDeleted()) {
            throw new IOException("Topic " + topicFullName + " is already deleted");
        }
        LOG.debug("Done : get topic meta");
        HashMap<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap = new HashMap<String, Marlinserver.MarlinTopicMetaEntry>();
        topicMetaMap.put(topicFullName, tmeta);
        ArrayList<TopicFeedInfo> statList = new ArrayList<TopicFeedInfo>();
        Iterator i$ = tmeta.getFeedIdsList().iterator();
        while (i$.hasNext()) {
            int feedId = (Integer)i$.next();
            statList.add(new TopicFeedInfo(feedId));
        }
        this.statTabletsAndPopulateFeedInfo(topicFullName, statList, headOnly);
        LOG.debug("Begin : get cursors");
        String[] fullPathSplits = topicFullName.split(":", 2);
        List<CursorInfo> cursorList = this.scanCursors(fullPathSplits[0], null, fullPathSplits[1], -1, topicMetaMap);
        for (CursorInfo ci : cursorList) {
            int feedId = ci.feedId();
            if (feedId >= statList.size()) continue;
            ((TopicFeedInfo)statList.get(feedId)).addCursor(ci);
        }
        LOG.debug("Done : get cursors");
        return statList;
    }

    public List<TopicFeedInfo> infoTopic(String topicFullName) throws IOException {
        return this.infoTopicCommon(topicFullName, false);
    }

    private Marlinserver.MarlinTopicMetaEntry jsonRecToTopicMeta(Document rec) {
        Boolean bv;
        LOG.debug("Key:: {} doc:: {}", (Object)rec.getIdString(), (Object)rec);
        Marlinserver.MarlinTopicMetaEntry.Builder tm = Marlinserver.MarlinTopicMetaEntry.newBuilder();
        Long lv = rec.getLongObj(this.mdef.getCfTopicMeta() + '.' + this.mdef.getFTopicUpdateSeq());
        if (lv != null) {
            tm.setUpdateSeq(lv.longValue());
        }
        if ((bv = rec.getBooleanObj(this.mdef.getCfTopicMeta() + '.' + this.mdef.getFTopicIsDeleted())) != null) {
            tm.setIsDeleted(bv.booleanValue());
        }
        if ((lv = rec.getLongObj(this.mdef.getCfTopicMeta() + '.' + this.mdef.getFTopicUniq())) != null) {
            long ltopicUniq = lv;
            tm.setTopicUniq((int)ltopicUniq);
        }
        List feedIds = rec.getList(this.mdef.getCfTopicMeta() + '.' + this.mdef.getFTopicFeedIds());
        for (Object obj : feedIds) {
            long feedLongId = (Long)obj;
            int feedId = (int)feedLongId;
            tm.addFeedIds(feedId);
        }
        return tm.build();
    }

    private String printableFid(Common.FidMsg fid) {
        return "" + fid.getCid() + "." + fid.getCinum() + "." + fid.getUniq();
    }

    private Marlinserver.TopicFeedStatResponse statFeedsOnTablet(String streamName, Common.FidMsg fid) throws IOException {
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        errVal.error = 0;
        byte[] reply = this.StatFeedsOnTablet(this._clntPtr, streamName, fid.getCid(), fid.getCinum(), fid.getUniq(), errVal);
        if (errVal.error != 0) {
            throw new IOException("StatFeedsOnTablet for tablet " + this.printableFid(fid) + " failed with error : " + Errno.toString((int)errVal.error) + "(" + errVal.error + ")");
        }
        if (reply == null) {
            throw new IOException("StatFeedsOnTablet for tablet " + this.printableFid(fid) + " returned null");
        }
        return Marlinserver.TopicFeedStatResponse.parseFrom((byte[])reply);
    }

    private List<Common.FidMsg> getFeedTabletFids(String streamName) throws IOException {
        List nextSet;
        MapRTabletScanner scanner = this.maprfs.getTabletScanner(new Path(streamName));
        ArrayList<Common.FidMsg> fidList = new ArrayList<Common.FidMsg>();
        boolean isFirst = true;
        while ((nextSet = scanner.nextSet()) != null) {
            for (Dbserver.TabletDesc tablet : nextSet) {
                if (isFirst) {
                    isFirst = false;
                    continue;
                }
                fidList.add(tablet.getFid());
            }
        }
        return fidList;
    }

    private void scanAllTabletsAndPopulateFeedInfo(final String streamName, Map<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap, Map<String, List<TopicFeedInfo>> topicStatMap) throws IOException {
        LOG.debug("Begin : Fetch list of tablets");
        List<Common.FidMsg> tabletFids = this.getFeedTabletFids(streamName);
        LOG.debug("Done : Fetch list of tablets");
        LOG.debug("Begin : stat for all tablets");
        ArrayList<Future<Marlinserver.TopicFeedStatResponse>> futureResponses = new ArrayList<Future<Marlinserver.TopicFeedStatResponse>>();
        for (final Common.FidMsg fidMsg : tabletFids) {
            futureResponses.add(topicFeedStatService.submit(new Callable<Marlinserver.TopicFeedStatResponse>(){

                @Override
                public Marlinserver.TopicFeedStatResponse call() {
                    try {
                        LOG.debug("StatFeedsOnTablet on tablet " + MarlinAdminImpl.this.printableFid(fidMsg));
                        return MarlinAdminImpl.this.statFeedsOnTablet(streamName, fidMsg);
                    }
                    catch (Exception e) {
                        LOG.error(e.getMessage());
                        return null;
                    }
                }
            }));
        }
        for (Future future : futureResponses) {
            Marlinserver.TopicFeedStatResponse resp = null;
            try {
                resp = (Marlinserver.TopicFeedStatResponse)future.get();
                if (resp == null) {
                    throw new IOException("TopicFeedStat failed on one or more tablets");
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage());
                throw new IOException(e.toString());
            }
            for (Marlinserver.TopicFeedStatInfo stat : resp.getFeedInfosList()) {
                String seqPrefix = stat.getSeqPrefix().substring(1);
                Document rdoc = MarlinRowKeyDecoder.decodeTopicFeedKey(seqPrefix);
                String topic = rdoc.getString(MarlinRowKeyDecoder.TOPIC);
                int feedId = rdoc.getInt(MarlinRowKeyDecoder.PARTITION);
                int uniq = rdoc.getInt(MarlinRowKeyDecoder.TOPIC_UNIQ);
                String topicFullName = streamName + ":" + topic;
                Marlinserver.MarlinTopicMetaEntry tmeta = topicMetaMap.get(topicFullName);
                if (tmeta == null) {
                    LOG.debug("missing entry in topicMetaMap for topic " + topic);
                    continue;
                }
                if (tmeta.getTopicUniq() != uniq) {
                    LOG.debug("mismatch in topicUniq for topic " + topic);
                    continue;
                }
                List<TopicFeedInfo> feedList = topicStatMap.get(topic);
                if (feedList == null || feedId >= feedList.size()) continue;
                feedList.get(feedId).updateStat(stat);
            }
        }
        LOG.debug("Done : stat for all tablets");
    }

    public Map<String, List<TopicFeedInfo>> listTopics(String streamName) throws IOException, IllegalArgumentException {
        this.checkStreamAndGetTableDescriptor(streamName);
        HashMap<String, List<TopicFeedInfo>> topicStatMap = new HashMap<String, List<TopicFeedInfo>>();
        MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
        QueryCondition condition = MapRDB.newCondition().and().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER, this.mdef.getKeyPrefixTopicMeta()).is(DocumentConstants.ID_FIELD, QueryCondition.Op.LESS, this.mdef.getKeyPrefixTopicMetaEnd()).close().build();
        Table table = MapRDB.getTable((Path)new Path(streamName));
        DocumentStream recStream = table.find(condition, new String[]{this.mdef.getCfTopicMeta()});
        HashMap<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap = new HashMap<String, Marlinserver.MarlinTopicMetaEntry>();
        LOG.debug("Begin : get topic names");
        for (Document rec : recStream) {
            Marlinserver.MarlinTopicMetaEntry tmeta = this.jsonRecToTopicMeta(rec);
            if (tmeta.getIsDeleted()) continue;
            String rowKey = rec.getIdString();
            String topicName = rowKey.substring(1);
            String topicFullName = streamName + ":" + topicName;
            topicMetaMap.put(topicFullName, tmeta);
            ArrayList<TopicFeedInfo> fstatList = new ArrayList<TopicFeedInfo>();
            Iterator i$ = tmeta.getFeedIdsList().iterator();
            while (i$.hasNext()) {
                int feedId = (Integer)i$.next();
                fstatList.add(new TopicFeedInfo(feedId));
            }
            topicStatMap.put(topicName, fstatList);
        }
        LOG.debug("Done : get topic names");
        this.scanAllTabletsAndPopulateFeedInfo(streamName, topicMetaMap, topicStatMap);
        LOG.debug("Begin : get cursors");
        List<CursorInfo> cursorList = this.scanCursors(streamName, null, null, -1, topicMetaMap);
        for (CursorInfo ci : cursorList) {
            int feedId = ci.feedId();
            List feedList = (List)topicStatMap.get(ci.topic());
            if (feedList == null || feedId >= feedList.size()) continue;
            ((TopicFeedInfo)feedList.get(feedId)).addCursor(ci);
        }
        LOG.debug("Done : get cursors");
        try {
            table.close();
        }
        catch (Exception e) {
            throw new IOException(e.getMessage());
        }
        return topicStatMap;
    }

    private DocumentStream GetCursorList(Table table, String listenerGID, String topic, String feedId) throws IOException {
        String regex = ".*" + topic + ".*" + feedId + ".*" + listenerGID + ".*";
        QueryCondition condition = MapRDB.newCondition().and().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER, this.mdef.getKeyCursorPrefix()).is(DocumentConstants.ID_FIELD, QueryCondition.Op.LESS, this.mdef.getKeyCursorPrefixEnd()).matches("_id", regex).close().build();
        return table.find(condition, new String[]{this.mdef.getCfCursors()});
    }

    private boolean FilterResult(Document rec, String streamName, String listenerGID, String topic, int feedId, CursorInfo ci, Map<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap) throws IOException {
        Document rowKeyDoc = MarlinRowKeyDecoder.decodeCursorKey(rec.getIdString());
        String rtopic = rowKeyDoc.getString(MarlinRowKeyDecoder.TOPIC);
        if (topic != null && !rtopic.equals(topic)) {
            return false;
        }
        Integer topicUniq = rowKeyDoc.getInt(MarlinRowKeyDecoder.TOPIC_UNIQ);
        String topicFullName = streamName + ":" + rtopic;
        Marlinserver.MarlinTopicMetaEntry tmeta = topicMetaMap.get(topicFullName);
        if (tmeta == null) {
            MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
            byte[] entry = this.GetTopicMetaEntry(this._clntPtr, topicFullName, errVal);
            if (errVal.error != 0) {
                return false;
            }
            tmeta = Marlinserver.MarlinTopicMetaEntry.parseFrom((byte[])entry);
            topicMetaMap.put(topicFullName, tmeta);
        }
        if (tmeta.getIsDeleted()) {
            return false;
        }
        if (tmeta.getTopicUniq() != topicUniq.intValue()) {
            return false;
        }
        Integer rfeedId = rowKeyDoc.getInt(MarlinRowKeyDecoder.PARTITION);
        if (feedId >= 0 && feedId != rfeedId) {
            return false;
        }
        String rlGId = rowKeyDoc.getString(MarlinRowKeyDecoder.CONSUMER_GROUP);
        if (listenerGID != null && !listenerGID.equals(rlGId)) {
            return false;
        }
        Long cursor = rec.getLongObj(this.mdef.getCfCursors() + '.' + this.mdef.getFCursor());
        if (cursor == null) {
            throw new IOException("invalid cursor value");
        }
        Long timestamp = rec.getLongObj(this.mdef.getCfCursors() + '.' + this.mdef.getFTimestamp());
        if (timestamp == null) {
            throw new IOException("invalid timestamp value");
        }
        ci.Init(streamName, rtopic, rlGId, rfeedId, cursor, timestamp);
        return true;
    }

    public List<CursorInfo> listCursors(String streamName, String listenerGID, String topicName, int feedId) throws IOException {
        if (listenerGID == null && topicName == null && feedId < 0) {
            return this.listAllCursors(streamName);
        }
        if (listenerGID == null && feedId < 0) {
            return this.listCursorsForTopic(streamName + ":" + topicName);
        }
        HashMap<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap = new HashMap<String, Marlinserver.MarlinTopicMetaEntry>();
        List<CursorInfo> ciList = this.scanCursors(streamName, listenerGID, topicName, feedId, topicMetaMap);
        HashMap<String, ArrayList<TopicFeedInfo>> topicMap = new HashMap<String, ArrayList<TopicFeedInfo>>();
        ArrayList<CursorInfo> cursorList = new ArrayList<CursorInfo>();
        for (CursorInfo ci : ciList) {
            String topicFullName = ci.streamName() + ":" + ci.topic();
            Marlinserver.MarlinTopicMetaEntry tmeta = (Marlinserver.MarlinTopicMetaEntry)topicMetaMap.get(topicFullName);
            if (tmeta == null) continue;
            ArrayList<TopicFeedInfo> sinfoList = (ArrayList<TopicFeedInfo>)topicMap.get(ci.topic());
            if (sinfoList == null) {
                sinfoList = new ArrayList<TopicFeedInfo>();
                Iterator i$ = tmeta.getFeedIdsList().iterator();
                while (i$.hasNext()) {
                    int idx = (Integer)i$.next();
                    sinfoList.add(new TopicFeedInfo(idx));
                }
                this.statTabletsAndPopulateFeedInfo(topicFullName, sinfoList, true);
                topicMap.put(ci.topic(), sinfoList);
            }
            TopicFeedInfo sinfo = (TopicFeedInfo)sinfoList.get(ci.feedId());
            ci.setTopicFeedInfo(sinfo);
            cursorList.add(ci);
        }
        return cursorList;
    }

    public List<CursorInfo> listAllCursors(String streamName) throws IOException {
        Map<String, List<TopicFeedInfo>> tfMap = this.listTopics(streamName);
        ArrayList<CursorInfo> cursorList = new ArrayList<CursorInfo>();
        for (List<TopicFeedInfo> feedInfoList : tfMap.values()) {
            for (TopicFeedInfo feedInfo : feedInfoList) {
                for (CursorInfo ci : feedInfo.cursorList()) {
                    ci.setTopicFeedInfo(feedInfo);
                    cursorList.add(ci);
                }
            }
        }
        return cursorList;
    }

    public List<CursorInfo> listCursorsForTopic(String topicFullName) throws IOException {
        List<TopicFeedInfo> feedList = this.infoTopicCommon(topicFullName, true);
        ArrayList<CursorInfo> cursorList = new ArrayList<CursorInfo>();
        for (TopicFeedInfo feedInfo : feedList) {
            for (CursorInfo ci : feedInfo.cursorList()) {
                ci.setTopicFeedInfo(feedInfo);
                cursorList.add(ci);
            }
        }
        return cursorList;
    }

    private List<CursorInfo> scanCursors(String streamName, String listenerGID, String topicName, int feedId, Map<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap) throws IOException {
        ArrayList<CursorInfo> cursorList = new ArrayList<CursorInfo>();
        Table table = MapRDB.getTable((Path)new Path(streamName));
        DocumentStream stream = this.GetCursorList(table, listenerGID == null ? "" : listenerGID, topicName == null ? "" : topicName, feedId >= 0 ? Integer.toString(feedId, 16) : "");
        for (Document rec : stream) {
            CursorInfo ci;
            boolean useRecord = this.FilterResult(rec, streamName, listenerGID, topicName, feedId, ci = new CursorInfo(), topicMetaMap);
            if (!useRecord) continue;
            cursorList.add(ci);
        }
        try {
            table.close();
        }
        catch (Exception e) {
            throw new IOException(e.getMessage());
        }
        return cursorList;
    }

    public void deleteCursors(String streamName, String listenerGID, String topicName, int feedId) throws IOException {
        Table table = MapRDB.getTable((Path)new Path(streamName));
        DocumentStream stream = this.GetCursorList(table, listenerGID == null ? "" : listenerGID, topicName == null ? "" : topicName, feedId >= 0 ? Integer.toString(feedId, 16) : "");
        HashMap<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap = new HashMap<String, Marlinserver.MarlinTopicMetaEntry>();
        for (Document rec : stream) {
            CursorInfo ci;
            boolean useRecord = this.FilterResult(rec, streamName, listenerGID, topicName, feedId, ci = new CursorInfo(), topicMetaMap);
            if (!useRecord) continue;
            String key = rec.getIdString();
            table.delete(key);
        }
        try {
            table.close();
        }
        catch (Exception e) {
            throw new IOException(e.getMessage());
        }
    }

    private DocumentStream GetAssignList(Table table, String listenerGID, String topic) throws IOException {
        String regex = ".*" + topic + ".*" + listenerGID + ".*";
        QueryCondition condition = MapRDB.newCondition().and().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER, this.mdef.getKeyAssignPrefix()).is(DocumentConstants.ID_FIELD, QueryCondition.Op.LESS, this.mdef.getKeyAssignPrefixEnd()).matches("_id", regex).close().build();
        return table.find(condition, new String[]{this.mdef.getCfFeedAssigns()});
    }

    public boolean FilterResult(Document rec, String streamName, String listenerGID, String topic, AssignInfo ai, Map<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap) throws IOException {
        Document rowKeyDoc = MarlinRowKeyDecoder.decodeAssignKey(rec.getIdString());
        String rtopic = rowKeyDoc.getString(MarlinRowKeyDecoder.TOPIC);
        if (topic != null && !rtopic.equals(topic)) {
            return false;
        }
        Integer topicUniq = rowKeyDoc.getInt(MarlinRowKeyDecoder.TOPIC_UNIQ);
        String topicFullName = streamName + ":" + rtopic;
        Marlinserver.MarlinTopicMetaEntry tmeta = topicMetaMap.get(topicFullName);
        if (tmeta == null) {
            MapRConstants.ErrorValue errVal = new MapRConstants.ErrorValue();
            byte[] entry = this.GetTopicMetaEntry(this._clntPtr, topicFullName, errVal);
            if (errVal.error != 0) {
                return false;
            }
            tmeta = Marlinserver.MarlinTopicMetaEntry.parseFrom((byte[])entry);
            topicMetaMap.put(topicFullName, tmeta);
        }
        if (tmeta.getIsDeleted()) {
            return false;
        }
        if (tmeta.getTopicUniq() != topicUniq.intValue()) {
            return false;
        }
        String rlGId = rowKeyDoc.getString(MarlinRowKeyDecoder.CONSUMER_GROUP);
        if (listenerGID != null && !listenerGID.equals(rlGId)) {
            return false;
        }
        Long assignSeqNum = rec.getLongObj(this.mdef.getCfFeedAssigns() + '.' + this.mdef.getFAssignSeqNum());
        Long numFeeds = rec.getLongObj(this.mdef.getCfFeedAssigns() + '.' + this.mdef.getFAssignFeeds());
        String[] listeners = new String[(int)numFeeds.longValue()];
        int numListeners = 0;
        ArrayList<List<Integer>> listenerAssignment = new ArrayList<List<Integer>>();
        for (int i = 0; i < (int)numFeeds.longValue(); ++i) {
            String feedStr = String.format("%s%03x", this.mdef.getKeyPrefixFeedId(), i);
            String assignment = rec.getString(this.mdef.getCfFeedAssigns() + '.' + feedStr);
            if (assignment == null) continue;
            boolean listenerFound = false;
            for (int j = 0; j < numListeners; ++j) {
                if (!listeners[j].equals(assignment)) continue;
                listenerFound = true;
                List assignments = (List)listenerAssignment.get(j);
                assignments.add(i);
                break;
            }
            if (listenerFound) continue;
            listeners[numListeners++] = assignment;
            ArrayList<Integer> assignments = new ArrayList<Integer>();
            listenerAssignment.add(assignments);
            assignments.add(i);
        }
        ai.Init(streamName, rtopic, rlGId, assignSeqNum, listeners, numListeners, listenerAssignment);
        return true;
    }

    public List<AssignInfo> listAssigns(String streamName, String listenerGID, String topicName) throws IOException {
        ArrayList<AssignInfo> assignList = new ArrayList<AssignInfo>();
        Table table = MapRDB.getTable((Path)new Path(streamName));
        DocumentStream stream = this.GetAssignList(table, listenerGID == null ? "" : listenerGID, topicName == null ? "" : topicName);
        HashMap<String, Marlinserver.MarlinTopicMetaEntry> topicMetaMap = new HashMap<String, Marlinserver.MarlinTopicMetaEntry>();
        for (Document rec : stream) {
            AssignInfo ai;
            boolean useRecord = this.FilterResult(rec, streamName, listenerGID, topicName, ai = new AssignInfo(), topicMetaMap);
            if (!useRecord) continue;
            assignList.add(ai);
        }
        try {
            table.close();
        }
        catch (Exception e) {
            throw new IOException(e.getMessage());
        }
        return assignList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this.CloseAdmin(this._clntPtr);
        this._clntPtr = 0L;
        Class<MarlinAdminImpl> clazz = MarlinAdminImpl.class;
        synchronized (MarlinAdminImpl.class) {
            if (--numAdmins == 0) {
                try {
                    topicFeedStatService.shutdown();
                    if (!topicFeedStatService.awaitTermination(60L, TimeUnit.SECONDS)) {
                        topicFeedStatService.shutdownNow();
                    }
                }
                catch (Exception e) {
                    topicFeedStatService.shutdownNow();
                }
                finally {
                    topicFeedStatService = null;
                }
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }
}

