/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.registry.impl;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.SlotZnode;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hive.com.google.common.base.Preconditions;
import org.apache.hive.com.google.common.collect.Lists;
import org.apache.hive.com.google.common.collect.Sets;
import org.apache.hive.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapZookeeperRegistryImpl
implements ServiceRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
    private static final String IPC_SERVICES = "services";
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
    private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
    private static final String SASL_NAMESPACE = "llap-sasl";
    private static final String UNSECURE_NAMESPACE = "llap-unsecure";
    private static final String USER_SCOPE_PATH_PREFIX = "user-";
    private static final String DISABLE_MESSAGE = "Set " + HiveConf.ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation";
    private static final String WORKER_PREFIX = "worker-";
    private static final String SLOT_PREFIX = "slot-";
    private final Configuration conf;
    private final CuratorFramework zooKeeperClient;
    private final String userPathPrefix;
    private final String workersPath;
    private String userNameFromPrincipal;
    private PersistentEphemeralNode znode;
    private SlotZnode slotZnode;
    private String znodePath;
    private final RegistryUtils.ServiceRecordMarshal encoder;
    private DynamicServiceInstanceSet instances;
    private PathChildrenCache instancesCache;
    private static final UUID uniq = UUID.randomUUID();
    private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
    private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
    private final Map<String, Set<ServiceInstance>> pathToInstanceCache;
    private final Map<String, Set<ServiceInstance>> nodeToInstanceCache;
    private final Lock instanceCacheLock = new ReentrantLock();
    private static final String hostname;

    public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
        this.conf = new Configuration(conf);
        this.conf.addResource("yarn-site.xml");
        String zkEnsemble = this.getQuorumServers(this.conf);
        this.encoder = new RegistryUtils.ServiceRecordMarshal();
        int sessionTimeout = (int)HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
        int baseSleepTime = (int)HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
        int maxRetries = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
        this.userPathPrefix = USER_SCOPE_PATH_PREFIX + this.getZkPathUser(this.conf);
        this.workersPath = "/" + this.userPathPrefix + "/" + instanceName + "/workers";
        this.instancesCache = null;
        this.instances = null;
        this.stateChangeListeners = new HashSet<ServiceInstanceStateChangeListener>();
        this.pathToInstanceCache = new ConcurrentHashMap<String, Set<ServiceInstance>>();
        this.nodeToInstanceCache = new ConcurrentHashMap<String, Set<ServiceInstance>>();
        final boolean isSecure = UserGroupInformation.isSecurityEnabled();
        ACLProvider zooKeeperAclProvider = new ACLProvider(){

            public List<ACL> getDefaultAcl() {
                LOG.warn("getDefaultAcl was called");
                return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
            }

            public List<ACL> getAclForPath(String path) {
                if (!isSecure || path == null || !path.contains(LlapZookeeperRegistryImpl.this.userPathPrefix)) {
                    return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
                }
                return LlapZookeeperRegistryImpl.createSecureAcls();
            }
        };
        String rootNs = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_ZK_REGISTRY_NAMESPACE);
        if (rootNs == null) {
            rootNs = isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE;
        }
        this.zooKeeperClient = CuratorFrameworkFactory.builder().connectString(zkEnsemble).sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider).namespace(rootNs).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
        LOG.info("Llap Zookeeper Registry is enabled with registryid: " + instanceName);
    }

    private static List<ACL> createSecureAcls() {
        ArrayList<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
        nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
        return nodeAcls;
    }

    private String getQuorumServers(Configuration conf) {
        String[] hosts = conf.getTrimmedStrings(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
        String port = conf.get(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
        StringBuilder quorum = new StringBuilder();
        for (int i = 0; i < hosts.length; ++i) {
            quorum.append(hosts[i].trim());
            if (!hosts[i].contains(":")) {
                quorum.append(":");
                quorum.append(port);
            }
            if (i == hosts.length - 1) continue;
            quorum.append(",");
        }
        return quorum.toString();
    }

    private String getZkPathUser(Configuration conf) {
        String user = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
        return user;
    }

    public Endpoint getRpcEndpoint() {
        int rpcPort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
        return RegistryTypeUtils.ipcEndpoint((String)IPC_LLAP, (InetSocketAddress)new InetSocketAddress(hostname, rpcPort));
    }

    public Endpoint getShuffleEndpoint() {
        int shufflePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
        return RegistryTypeUtils.inetAddrEndpoint((String)IPC_SHUFFLE, (String)"tcp", (String)hostname, (int)shufflePort);
    }

    public Endpoint getServicesEndpoint() {
        int servicePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT);
        boolean isSSL = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_SSL);
        String scheme = isSSL ? "https" : "http";
        try {
            URL serviceURL = new URL(scheme, hostname, servicePort, "");
            return RegistryTypeUtils.webEndpoint((String)IPC_SERVICES, (URI[])new URI[]{serviceURL.toURI()});
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("llap service URI for " + hostname + " is invalid", e);
        }
    }

    public Endpoint getMngEndpoint() {
        return RegistryTypeUtils.ipcEndpoint((String)IPC_MNG, (InetSocketAddress)new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
    }

    public Endpoint getOutputFormatEndpoint() {
        return RegistryTypeUtils.ipcEndpoint((String)IPC_OUTPUTFORMAT, (InetSocketAddress)new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
    }

    @Override
    public String register() throws IOException {
        ServiceRecord srv = new ServiceRecord();
        Endpoint rpcEndpoint = this.getRpcEndpoint();
        srv.addInternalEndpoint(rpcEndpoint);
        srv.addInternalEndpoint(this.getMngEndpoint());
        srv.addInternalEndpoint(this.getShuffleEndpoint());
        srv.addExternalEndpoint(this.getServicesEndpoint());
        srv.addInternalEndpoint(this.getOutputFormatEndpoint());
        for (Map.Entry kv : this.conf) {
            if (!((String)kv.getKey()).startsWith("llap.") && !((String)kv.getKey()).startsWith("hive.llap.")) continue;
            srv.set((String)kv.getKey(), kv.getValue());
        }
        srv.set(UNIQUE_IDENTIFIER, (Object)uniq.toString());
        try {
            this.znode = new PersistentEphemeralNode(this.zooKeeperClient, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, this.workersPath + "/" + WORKER_PREFIX, this.encoder.toBytes((Object)srv));
            this.znode.start();
            long znodeCreationTimeout = 120L;
            if (!this.znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
            }
            this.znodePath = this.znode.getActualPath();
            this.slotZnode = new SlotZnode(this.zooKeeperClient, this.workersPath, SLOT_PREFIX, WORKER_PREFIX, uniq.toString());
            if (!this.slotZnode.start(znodeCreationTimeout, TimeUnit.SECONDS)) {
                throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
            }
            if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_VALIDATE_ACLS)) {
                try {
                    this.checkAndSetAcls();
                }
                catch (Exception ex) {
                    throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, ex);
                }
            }
            if (this.zooKeeperClient.checkExists().forPath(this.znodePath) == null) {
                throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
            }
            LOG.info("Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}, webui: {}, mgmt: {}, znodePath: {} ", rpcEndpoint, this.getShuffleEndpoint(), this.getServicesEndpoint(), this.getMngEndpoint(), this.znodePath);
        }
        catch (Exception e) {
            LOG.error("Unable to create a znode for this server instance", e);
            CloseableUtils.closeQuietly((Closeable)this.znode);
            CloseableUtils.closeQuietly((Closeable)this.slotZnode);
            throw e instanceof IOException ? (IOException)e : new IOException(e);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created zknode with path: {} service record: {}", (Object)this.znodePath, (Object)srv);
        }
        return uniq.toString();
    }

    private void checkAndSetAcls() throws Exception {
        if (!UserGroupInformation.isSecurityEnabled()) {
            return;
        }
        String pathToCheck = this.workersPath;
        List acls = (List)this.zooKeeperClient.getACL().forPath(pathToCheck);
        if (acls == null || acls.isEmpty()) {
            LOG.warn("No ACLs on " + pathToCheck + "; setting up ACLs. " + DISABLE_MESSAGE);
            this.setUpAcls(pathToCheck);
            return;
        }
        assert (this.userNameFromPrincipal != null);
        Id currentUser = new Id("sasl", this.userNameFromPrincipal);
        for (ACL acl : acls) {
            if ((acl.getPerms() & 0xFFFFFFFE) == 0 || currentUser.equals((Object)acl.getId())) continue;
            LOG.warn("The ACL " + acl + " is unnacceptable for " + pathToCheck + "; setting up ACLs. " + DISABLE_MESSAGE);
            this.setUpAcls(pathToCheck);
            return;
        }
    }

    private void setUpAcls(String path) throws Exception {
        List<ACL> acls = LlapZookeeperRegistryImpl.createSecureAcls();
        LinkedList<String> paths = new LinkedList<String>();
        paths.add(path);
        while (!paths.isEmpty()) {
            String currentPath = (String)paths.poll();
            List children = (List)this.zooKeeperClient.getChildren().forPath(currentPath);
            if (children != null) {
                for (String child : children) {
                    paths.add(currentPath + "/" + child);
                }
            }
            ((BackgroundPathable)this.zooKeeperClient.setACL().withACL(acls)).forPath(currentPath);
        }
    }

    @Override
    public void unregister() throws IOException {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToCache(String path, String host, ServiceInstance instance) {
        this.instanceCacheLock.lock();
        try {
            this.putInCache(path, this.pathToInstanceCache, instance);
            this.putInCache(host, this.nodeToInstanceCache, instance);
        }
        finally {
            this.instanceCacheLock.unlock();
        }
        LOG.debug("Added path={}, host={} instance={} to cache. pathToInstanceCache:size={}, nodeToInstanceCache:size={}", path, host, instance, this.pathToInstanceCache.size(), this.nodeToInstanceCache.size());
    }

    private void removeFromCache(String path, String host) {
        this.instanceCacheLock.lock();
        try {
            this.pathToInstanceCache.remove(path);
            this.nodeToInstanceCache.remove(host);
        }
        finally {
            this.instanceCacheLock.unlock();
        }
        LOG.debug("Removed path={}, host={} from cache. pathToInstanceCache:size={}, nodeToInstanceCache:size={}", path, host, this.pathToInstanceCache.size(), this.nodeToInstanceCache.size());
    }

    private void putInCache(String key, Map<String, Set<ServiceInstance>> cache, ServiceInstance instance) {
        Set<ServiceInstance> instanceSet = cache.get(key);
        if (instanceSet == null) {
            instanceSet = Sets.newHashSet();
            cache.put(key, instanceSet);
        }
        instanceSet.add(instance);
    }

    private static String extractWorkerIdFromSlot(ChildData childData) {
        return new String(childData.getData(), SlotZnode.CHARSET);
    }

    private static String extractNodeName(ChildData childData) {
        String nodeName = childData.getPath();
        int ix = nodeName.lastIndexOf("/");
        if (ix >= 0) {
            nodeName = nodeName.substring(ix + 1);
        }
        return nodeName;
    }

    private ServiceInstance extractServiceInstance(PathChildrenCacheEvent event, ChildData childData) {
        byte[] data = childData.getData();
        if (data == null) {
            return null;
        }
        try {
            ServiceRecord srv = (ServiceRecord)this.encoder.fromBytes(event.getData().getPath(), data);
            return new DynamicServiceInstance(srv);
        }
        catch (IOException e) {
            LOG.error("Unable to decode data for zknode: {}. Dropping notification of type: {}", (Object)childData.getPath(), (Object)event.getType());
            return null;
        }
    }

    @Override
    public ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException {
        this.checkPathChildrenCache(clusterReadyTimeoutMs);
        if (this.instances == null) {
            this.instances = new DynamicServiceInstanceSet(this.instancesCache);
        }
        return this.instances;
    }

    @Override
    public ApplicationId getApplicationId() throws IOException {
        this.getInstances("LLAP", 0L);
        return this.instances.getApplicationId();
    }

    @Override
    public synchronized void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException {
        this.checkPathChildrenCache(0L);
        this.stateChangeListeners.add(listener);
    }

    private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) throws IOException {
        Preconditions.checkArgument(this.zooKeeperClient != null && this.zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started");
        if (this.instancesCache != null) {
            return;
        }
        ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StateChangeNotificationHandler").build());
        long startTimeNs = System.nanoTime();
        long deltaNs = clusterReadyTimeoutMs * 1000000L;
        long sleepTimeMs = Math.min(16L, clusterReadyTimeoutMs);
        while (true) {
            PathChildrenCache instancesCache = new PathChildrenCache(this.zooKeeperClient, this.workersPath, true);
            instancesCache.getListenable().addListener((Object)new InstanceStateChangeListener(), (Executor)tp);
            try {
                instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                this.instancesCache = instancesCache;
            }
            catch (KeeperException.InvalidACLException e) {
                CloseableUtils.closeQuietly((Closeable)instancesCache);
                long elapsedNs = System.nanoTime() - startTimeNs;
                if (deltaNs == 0L || deltaNs <= elapsedNs) {
                    LOG.error("Unable to start curator PathChildrenCache", e);
                    throw new IOException(e);
                }
                LOG.warn("The cluster is not started yet (InvalidACL); will retry");
                try {
                    Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs) / 1000000L));
                }
                catch (InterruptedException e1) {
                    LOG.error("Interrupted while retrying the PathChildrenCache startup");
                    throw new IOException(e1);
                }
                sleepTimeMs <<= 1;
                continue;
            }
            catch (Exception e) {
                CloseableUtils.closeQuietly((Closeable)instancesCache);
                LOG.error("Unable to start curator PathChildrenCache", e);
                throw new IOException(e);
            }
            break;
        }
    }

    @Override
    public void start() throws IOException {
        if (this.zooKeeperClient != null) {
            this.setupZookeeperAuth(this.conf);
            this.zooKeeperClient.start();
        }
        CloseableUtils.class.getName();
    }

    @Override
    public void stop() throws IOException {
        CloseableUtils.closeQuietly((Closeable)this.znode);
        CloseableUtils.closeQuietly((Closeable)this.slotZnode);
        CloseableUtils.closeQuietly((Closeable)this.instancesCache);
        CloseableUtils.closeQuietly((Closeable)this.zooKeeperClient);
    }

    private void setupZookeeperAuth(Configuration conf) throws IOException {
        if (UserGroupInformation.isSecurityEnabled() && LlapProxy.isDaemon()) {
            LOG.info("UGI security is enabled. Setting up ZK auth.");
            String llapPrincipal = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_KERBEROS_PRINCIPAL);
            if (llapPrincipal == null || llapPrincipal.isEmpty()) {
                throw new IOException("Llap Kerberos principal is empty");
            }
            String llapKeytab = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
            if (llapKeytab == null || llapKeytab.isEmpty()) {
                throw new IOException("Llap Kerberos keytab is empty");
            }
            this.setZookeeperClientKerberosJaasConfig(llapPrincipal, llapKeytab);
        } else {
            LOG.info("UGI security is not enabled, or non-daemon environment. Skipping setting up ZK auth.");
        }
    }

    private void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException {
        String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
        System.setProperty("zookeeper.sasl.clientconfig", "LlapZooKeeperClient");
        principal = SecurityUtil.getServerPrincipal((String)principal, (String)"0.0.0.0");
        this.userNameFromPrincipal = LlapUtil.getUserNameFromPrincipal(principal);
        JaasConfiguration jaasConf = new JaasConfiguration("LlapZooKeeperClient", principal, keyTabFile);
        javax.security.auth.login.Configuration.setConfiguration(jaasConf);
    }

    static {
        String localhost = "localhost";
        try {
            localhost = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        hostname = localhost;
    }

    private static class JaasConfiguration
    extends javax.security.auth.login.Configuration {
        private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration.getConfiguration();
        private final String loginContextName;
        private final String principal;
        private final String keyTabFile;

        public JaasConfiguration(String llapLoginContextName, String principal, String keyTabFile) {
            this.loginContextName = llapLoginContextName;
            this.principal = principal;
            this.keyTabFile = keyTabFile;
        }

        @Override
        public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
            if (this.loginContextName.equals(appName)) {
                HashMap<String, String> krbOptions = new HashMap<String, String>();
                krbOptions.put("doNotPrompt", "true");
                krbOptions.put("storeKey", "true");
                krbOptions.put("useKeyTab", "true");
                krbOptions.put("principal", this.principal);
                krbOptions.put("keyTab", this.keyTabFile);
                krbOptions.put("refreshKrb5Config", "true");
                AppConfigurationEntry llapZooKeeperClientEntry = new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(), AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, krbOptions);
                return new AppConfigurationEntry[]{llapZooKeeperClientEntry};
            }
            if (this.baseConfig != null) {
                return this.baseConfig.getAppConfigurationEntry(appName);
            }
            return null;
        }
    }

    private class InstanceStateChangeListener
    implements PathChildrenCacheListener {
        private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);

        private InstanceStateChangeListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
            InstanceStateChangeListener instanceStateChangeListener = this;
            synchronized (instanceStateChangeListener) {
                ChildData childData = event.getData();
                if (childData == null) {
                    return;
                }
                String nodeName = LlapZookeeperRegistryImpl.extractNodeName(childData);
                if (!nodeName.startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                    return;
                }
                this.LOG.info("{} for zknode {} in llap namespace", (Object)event.getType(), (Object)childData.getPath());
                ServiceInstance instance = LlapZookeeperRegistryImpl.this.extractServiceInstance(event, childData);
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        LlapZookeeperRegistryImpl.this.addToCache(childData.getPath(), instance.getHost(), instance);
                        for (ServiceInstanceStateChangeListener listener : LlapZookeeperRegistryImpl.this.stateChangeListeners) {
                            listener.onCreate(instance);
                        }
                        break;
                    }
                    case CHILD_UPDATED: {
                        LlapZookeeperRegistryImpl.this.addToCache(childData.getPath(), instance.getHost(), instance);
                        for (ServiceInstanceStateChangeListener listener : LlapZookeeperRegistryImpl.this.stateChangeListeners) {
                            listener.onUpdate(instance);
                        }
                        break;
                    }
                    case CHILD_REMOVED: {
                        LlapZookeeperRegistryImpl.this.removeFromCache(childData.getPath(), instance.getHost());
                        for (ServiceInstanceStateChangeListener listener : LlapZookeeperRegistryImpl.this.stateChangeListeners) {
                            listener.onRemove(instance);
                        }
                        break;
                    }
                }
            }
        }
    }

    private class DynamicServiceInstanceSet
    implements ServiceInstanceSet {
        private final PathChildrenCache instancesCache;

        public DynamicServiceInstanceSet(PathChildrenCache cache) {
            this.instancesCache = cache;
            this.populateCache();
        }

        private void populateCache() {
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] data = this.getWorkerData(childData);
                if (data == null) continue;
                try {
                    ServiceRecord srv = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), data);
                    DynamicServiceInstance instance = new DynamicServiceInstance(srv);
                    LlapZookeeperRegistryImpl.this.addToCache(childData.getPath(), instance.getHost(), instance);
                }
                catch (IOException e) {
                    LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", (Object)childData.getPath());
                }
            }
        }

        @Override
        public Collection<ServiceInstance> getAll() {
            HashSet<ServiceInstance> instances = new HashSet<ServiceInstance>();
            for (Set instanceSet : LlapZookeeperRegistryImpl.this.pathToInstanceCache.values()) {
                instances.addAll(instanceSet);
            }
            return instances;
        }

        public ApplicationId getApplicationId() {
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] data = this.getWorkerData(childData);
                if (data == null) continue;
                ServiceRecord sr = null;
                try {
                    sr = (ServiceRecord)LlapZookeeperRegistryImpl.this.encoder.fromBytes(childData.getPath(), data);
                }
                catch (IOException e) {
                    LOG.error("Unable to decode data for zkpath: {}. Ignoring from current instances list..", (Object)childData.getPath());
                    continue;
                }
                String containerStr = sr.get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
                if (containerStr == null || containerStr.isEmpty()) continue;
                return ContainerId.fromString((String)containerStr).getApplicationAttemptId().getApplicationId();
            }
            return null;
        }

        private byte[] getWorkerData(ChildData childData) {
            if (childData == null) {
                return null;
            }
            byte[] data = childData.getData();
            if (data == null) {
                return null;
            }
            if (!LlapZookeeperRegistryImpl.extractNodeName(childData).startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                return null;
            }
            return data;
        }

        @Override
        public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) {
            HashMap<String, Long> slotByWorker = new HashMap<String, Long>();
            HashSet<ServiceInstance> unsorted = Sets.newHashSet();
            for (ChildData childData : this.instancesCache.getCurrentData()) {
                byte[] data;
                if (childData == null || (data = childData.getData()) == null) continue;
                String nodeName = LlapZookeeperRegistryImpl.extractNodeName(childData);
                if (nodeName.startsWith(LlapZookeeperRegistryImpl.WORKER_PREFIX)) {
                    Set instances = (Set)LlapZookeeperRegistryImpl.this.pathToInstanceCache.get(childData.getPath());
                    if (instances == null) continue;
                    unsorted.addAll(instances);
                    continue;
                }
                if (nodeName.startsWith(LlapZookeeperRegistryImpl.SLOT_PREFIX)) {
                    slotByWorker.put(LlapZookeeperRegistryImpl.extractWorkerIdFromSlot(childData), Long.parseLong(nodeName.substring(LlapZookeeperRegistryImpl.SLOT_PREFIX.length())));
                    continue;
                }
                LOG.info("Ignoring unknown node {}", (Object)childData.getPath());
            }
            TreeMap<Long, ServiceInstance> sorted = new TreeMap<Long, ServiceInstance>();
            long maxSlot = Long.MIN_VALUE;
            for (ServiceInstance worker : unsorted) {
                Long slot = (Long)slotByWorker.get(worker.getWorkerIdentity());
                if (slot == null) {
                    LOG.info("Unknown slot for {}", (Object)worker.getWorkerIdentity());
                    continue;
                }
                maxSlot = Math.max(maxSlot, slot);
                sorted.put(slot, worker);
            }
            if (consistentIndexes) {
                TreeMap<Long, InactiveServiceInstance> dummies = new TreeMap<Long, InactiveServiceInstance>();
                Iterator keyIter = sorted.keySet().iterator();
                long expected = 0L;
                Long ts = null;
                while (keyIter.hasNext()) {
                    Long slot = (Long)keyIter.next();
                    assert (slot >= expected);
                    while (slot > expected) {
                        if (ts == null) {
                            ts = System.nanoTime();
                        }
                        dummies.put(expected, new InactiveServiceInstance("inactive-" + expected + "-" + ts));
                        ++expected;
                    }
                    ++expected;
                }
                sorted.putAll(dummies);
            }
            return sorted.values();
        }

        @Override
        public ServiceInstance getInstance(String name) {
            Collection<ServiceInstance> instances = this.getAll();
            for (ServiceInstance instance : instances) {
                if (!instance.getWorkerIdentity().equals(name)) continue;
                return instance;
            }
            return null;
        }

        @Override
        public Set<ServiceInstance> getByHost(String host) {
            Set byHost = (Set)LlapZookeeperRegistryImpl.this.nodeToInstanceCache.get(host);
            Set<Object> set = byHost = byHost == null ? Sets.newHashSet() : byHost;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
            }
            return byHost;
        }

        @Override
        public int size() {
            return LlapZookeeperRegistryImpl.this.nodeToInstanceCache.size();
        }
    }

    private class DynamicServiceInstance
    implements ServiceInstance {
        private final ServiceRecord srv;
        private final String host;
        private final int rpcPort;
        private final int mngPort;
        private final int shufflePort;
        private final int outputFormatPort;
        private final String serviceAddress;
        private final Resource resource;

        public DynamicServiceInstance(ServiceRecord srv) throws IOException {
            this.srv = srv;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Working with ServiceRecord: {}", (Object)srv);
            }
            Endpoint shuffle = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_SHUFFLE);
            Endpoint rpc = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_LLAP);
            Endpoint mng = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_MNG);
            Endpoint outputFormat = srv.getInternalEndpoint(LlapZookeeperRegistryImpl.IPC_OUTPUTFORMAT);
            Endpoint services = srv.getExternalEndpoint(LlapZookeeperRegistryImpl.IPC_SERVICES);
            this.host = RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"host");
            this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"port"));
            this.mngPort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)mng.addresses.get(0)), (String)"port"));
            this.shufflePort = Integer.parseInt(RegistryTypeUtils.getAddressField((Map)((Map)shuffle.addresses.get(0)), (String)"port"));
            this.outputFormatPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map)((Map)outputFormat.addresses.get(0)), (String)"port"));
            this.serviceAddress = RegistryTypeUtils.getAddressField((Map)((Map)services.addresses.get(0)), (String)"uri");
            String memStr = srv.get(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, "");
            String coreStr = srv.get(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, "");
            try {
                this.resource = Resource.newInstance((int)Integer.parseInt(memStr), (int)Integer.parseInt(coreStr));
            }
            catch (NumberFormatException ex) {
                throw new IOException("Invalid resource configuration for a LLAP node: memory " + memStr + ", vcores " + coreStr);
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            DynamicServiceInstance other = (DynamicServiceInstance)o;
            return this.getWorkerIdentity().equals(other.getWorkerIdentity());
        }

        public int hashCode() {
            return this.getWorkerIdentity().hashCode();
        }

        @Override
        public String getWorkerIdentity() {
            return this.srv.get(LlapZookeeperRegistryImpl.UNIQUE_IDENTIFIER);
        }

        @Override
        public String getHost() {
            return this.host;
        }

        @Override
        public int getRpcPort() {
            return this.rpcPort;
        }

        @Override
        public int getShufflePort() {
            return this.shufflePort;
        }

        @Override
        public String getServicesAddress() {
            return this.serviceAddress;
        }

        @Override
        public Map<String, String> getProperties() {
            return this.srv.attributes();
        }

        @Override
        public Resource getResource() {
            return this.resource;
        }

        public String toString() {
            return "DynamicServiceInstance [id=" + this.getWorkerIdentity() + ", host=" + this.host + ":" + this.rpcPort + " with resources=" + this.getResource() + ", shufflePort=" + this.getShufflePort() + ", servicesAddress=" + this.getServicesAddress() + ", mgmtPort=" + this.getManagementPort() + "]";
        }

        @Override
        public int getManagementPort() {
            return this.mngPort;
        }

        @Override
        public int getOutputFormatPort() {
            return this.outputFormatPort;
        }
    }
}

