/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.curator.discovery;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.utils.CloseableUtils;

@ManageLifecycle
public class CuratorDruidNodeDiscoveryProvider
extends DruidNodeDiscoveryProvider {
    private static final Logger log = new Logger(CuratorDruidNodeDiscoveryProvider.class);
    private final CuratorFramework curatorFramework;
    private final ZkPathsConfig config;
    private final ObjectMapper jsonMapper;
    private ExecutorService listenerExecutor;
    private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new ConcurrentLinkedQueue();
    private final LifecycleLock lifecycleLock = new LifecycleLock();

    @Inject
    public CuratorDruidNodeDiscoveryProvider(CuratorFramework curatorFramework, ZkPathsConfig config, @Json ObjectMapper jsonMapper) {
        this.curatorFramework = curatorFramework;
        this.config = config;
        this.jsonMapper = jsonMapper;
    }

    @Override
    public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole) {
        Preconditions.checkState((boolean)this.lifecycleLock.isStarted());
        log.debug("Creating a NodeDiscoverer for node [%s] and role [%s]", new Object[]{node, nodeRole});
        NodeDiscoverer nodeDiscoverer = new NodeDiscoverer(this.config, this.jsonMapper, this.curatorFramework, node, nodeRole);
        this.nodeDiscoverers.add(nodeDiscoverer);
        return () -> nodeDiscoverer.nodeDiscovered();
    }

    @Override
    public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole) {
        Preconditions.checkState((boolean)this.lifecycleLock.isStarted());
        return this.nodeRoleWatchers.computeIfAbsent(nodeRole, role -> {
            log.debug("Creating NodeRoleWatcher for nodeRole [%s].", new Object[]{role});
            NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(this.listenerExecutor, this.curatorFramework, this.config.getInternalDiscoveryPath(), this.jsonMapper, (NodeRole)((Object)role));
            log.debug("Created NodeRoleWatcher for nodeRole [%s].", new Object[]{role});
            return nodeRoleWatcher;
        });
    }

    @LifecycleStart
    public void start() {
        if (!this.lifecycleLock.canStart()) {
            throw new ISE("can't start.", new Object[0]);
        }
        try {
            this.listenerExecutor = Execs.singleThreaded((String)"CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
            log.debug("Started.", new Object[0]);
            this.lifecycleLock.started();
        }
        finally {
            this.lifecycleLock.exitStart();
        }
    }

    @LifecycleStop
    public void stop() throws IOException {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        log.debug("Stopping.", new Object[0]);
        Closer closer = Closer.create();
        closer.registerAll(this.nodeRoleWatchers.values());
        closer.registerAll(this.nodeDiscoverers);
        CloseableUtils.closeBoth((Closeable)closer, this.listenerExecutor::shutdownNow);
    }

    private static class NodeDiscoverer
    implements Closeable {
        private final ObjectMapper jsonMapper;
        private final NodeCache nodeCache;
        private final NodeRole nodeRole;

        private NodeDiscoverer(ZkPathsConfig config, ObjectMapper jsonMapper, CuratorFramework curatorFramework, DruidNode node, NodeRole nodeRole) {
            this.jsonMapper = jsonMapper;
            String path = CuratorDruidNodeAnnouncer.makeNodeAnnouncementPath(config, nodeRole, node);
            this.nodeCache = new NodeCache(curatorFramework, path, true);
            this.nodeRole = nodeRole;
            try {
                this.nodeCache.start(true);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private boolean nodeDiscovered() {
            DiscoveryDruidNode druidNode;
            ChildData currentChild = this.nodeCache.getCurrentData();
            if (currentChild == null) {
                return false;
            }
            byte[] data = currentChild.getData();
            try {
                druidNode = (DiscoveryDruidNode)this.jsonMapper.readValue(data, DiscoveryDruidNode.class);
            }
            catch (IOException e) {
                log.error((Throwable)e, "Exception occurred when reading node's value", new Object[0]);
                return false;
            }
            if (!this.nodeRole.equals((Object)druidNode.getNodeRole())) {
                log.error("Node[%s] of role[%s] add is discovered by node watcher of different node role. Ignored.", new Object[]{druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName()});
                return false;
            }
            log.info("Node[%s] of role[%s] appeared.", new Object[]{druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName()});
            return true;
        }

        @Override
        public void close() throws IOException {
            this.nodeCache.close();
        }
    }

    private static class NodeRoleWatcher
    implements DruidNodeDiscovery,
    Closeable {
        private static final Logger log = new Logger(NodeRoleWatcher.class);
        private final CuratorFramework curatorFramework;
        private final NodeRole nodeRole;
        private final ObjectMapper jsonMapper;
        private final ConcurrentMap<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<String, DiscoveryDruidNode>();
        private final Collection<DiscoveryDruidNode> unmodifiableNodes = Collections.unmodifiableCollection(this.nodes.values());
        private final PathChildrenCache cache;
        private final ExecutorService cacheExecutor;
        private final ExecutorService listenerExecutor;
        private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<DruidNodeDiscovery.Listener>();
        private final Object lock = new Object();
        private final CountDownLatch cacheInitialized = new CountDownLatch(1);

        NodeRoleWatcher(ExecutorService listenerExecutor, CuratorFramework curatorFramework, String basePath, ObjectMapper jsonMapper, NodeRole nodeRole) {
            this.listenerExecutor = listenerExecutor;
            this.curatorFramework = curatorFramework;
            this.nodeRole = nodeRole;
            this.jsonMapper = jsonMapper;
            this.cacheExecutor = Execs.singleThreaded((String)StringUtils.format((String)"NodeRoleWatcher[%s]", (Object[])new Object[]{nodeRole}));
            this.cache = new PathChildrenCacheFactory.Builder().withCacheData(true).withCompressed(true).withExecutorService(this.cacheExecutor).build().make(curatorFramework, ZKPaths.makePath((String)basePath, (String)nodeRole.toString()));
            try {
                this.cache.getListenable().addListener((client, event) -> this.handleChildEvent(event));
                this.cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }

        @Override
        public void close() throws IOException {
            CloseableUtils.closeBoth((Closeable)this.cache, this.cacheExecutor::shutdownNow);
        }

        @Override
        public Collection<DiscoveryDruidNode> getAllNodes() {
            boolean nodeViewInitialized;
            try {
                nodeViewInitialized = this.cacheInitialized.await(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                nodeViewInitialized = false;
            }
            if (!nodeViewInitialized) {
                log.info("Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.", new Object[]{this.nodeRole.getJsonName()});
            }
            return this.unmodifiableNodes;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void registerListener(DruidNodeDiscovery.Listener listener) {
            Object object = this.lock;
            synchronized (object) {
                if (this.cacheInitialized.getCount() == 0L) {
                    this.safeSchedule(() -> {
                        listener.nodesAdded(this.unmodifiableNodes);
                        listener.nodeViewInitialized();
                    }, "Exception occured in nodesAdded([%s]) in listener [%s].", this.unmodifiableNodes, listener);
                }
                this.nodeListeners.add(listener);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void handleChildEvent(PathChildrenCacheEvent event) {
            Object object = this.lock;
            synchronized (object) {
                try {
                    switch (event.getType()) {
                        case CHILD_ADDED: {
                            this.childAdded(event);
                            break;
                        }
                        case CHILD_REMOVED: {
                            this.childRemoved(event);
                            break;
                        }
                        case INITIALIZED: {
                            this.cacheInitialized();
                            break;
                        }
                        default: {
                            log.warn("Ignored event type[%s] for node watcher of role[%s].", new Object[]{event.getType(), this.nodeRole.getJsonName()});
                            break;
                        }
                    }
                }
                catch (Exception ex) {
                    log.error((Throwable)ex, "Unknown error in node watcher of role[%s].", new Object[]{this.nodeRole.getJsonName()});
                }
            }
        }

        @GuardedBy(value="lock")
        void childAdded(PathChildrenCacheEvent event) throws IOException {
            byte[] data = this.getZkDataForNode(event.getData());
            if (data == null) {
                log.error("Failed to get data for path [%s]. Ignoring a child addition event.", new Object[]{event.getData().getPath()});
                return;
            }
            DiscoveryDruidNode druidNode = (DiscoveryDruidNode)this.jsonMapper.readValue(data, DiscoveryDruidNode.class);
            if (!this.nodeRole.equals((Object)druidNode.getNodeRole())) {
                log.error("Node[%s] of role[%s] addition ignored due to mismatched role (expected role[%s]).", new Object[]{druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName(), this.nodeRole.getJsonName()});
                return;
            }
            log.info("Node[%s] of role[%s] detected.", new Object[]{druidNode.getDruidNode().getUriToUse(), this.nodeRole.getJsonName()});
            this.addNode(druidNode);
        }

        @GuardedBy(value="lock")
        private void addNode(DiscoveryDruidNode druidNode) {
            DiscoveryDruidNode prev = this.nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
            if (prev == null) {
                if (this.cacheInitialized.getCount() == 0L) {
                    ImmutableList newNode = ImmutableList.of((Object)druidNode);
                    for (DruidNodeDiscovery.Listener listener : this.nodeListeners) {
                        this.safeSchedule(() -> NodeRoleWatcher.lambda$addNode$2(listener, (List)newNode), "Exception occured in nodeAdded(node=[%s]) in listener [%s].", druidNode.getDruidNode().getHostAndPortToUse(), listener);
                    }
                }
            } else {
                log.error("Node[%s] of role[%s] discovered but existed already [%s].", new Object[]{druidNode.getDruidNode().getUriToUse(), this.nodeRole.getJsonName(), prev});
            }
        }

        @GuardedBy(value="lock")
        private void childRemoved(PathChildrenCacheEvent event) throws IOException {
            byte[] data = event.getData().getData();
            if (data == null) {
                log.error("Failed to get data for path [%s]. Ignoring a child removal event.", new Object[]{event.getData().getPath()});
                return;
            }
            DiscoveryDruidNode druidNode = (DiscoveryDruidNode)this.jsonMapper.readValue(data, DiscoveryDruidNode.class);
            if (!this.nodeRole.equals((Object)druidNode.getNodeRole())) {
                log.error("Node[%s] of role[%s] removal ignored due to mismatched role (expected role[%s]).", new Object[]{druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName(), this.nodeRole.getJsonName()});
                return;
            }
            log.info("Node[%s] of role[%s] went offline.", new Object[]{druidNode.getDruidNode().getUriToUse(), this.nodeRole.getJsonName()});
            this.removeNode(druidNode);
        }

        @GuardedBy(value="lock")
        private void removeNode(DiscoveryDruidNode druidNode) {
            DiscoveryDruidNode prev = (DiscoveryDruidNode)this.nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
            if (prev == null) {
                log.error("Noticed disappearance of unknown druid node [%s] of role[%s].", new Object[]{druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName()});
                return;
            }
            if (this.cacheInitialized.getCount() == 0L) {
                ImmutableList nodeRemoved = ImmutableList.of((Object)druidNode);
                for (DruidNodeDiscovery.Listener listener : this.nodeListeners) {
                    this.safeSchedule(() -> NodeRoleWatcher.lambda$removeNode$3(listener, (List)nodeRemoved), "Exception occured in nodeRemoved(node[%s] of role[%s]) in listener [%s].", druidNode.getDruidNode().getUriToUse(), druidNode.getNodeRole().getJsonName(), listener);
                }
            }
        }

        @Nullable
        private byte[] getZkDataForNode(ChildData child) {
            try {
                return (byte[])((GetDataWatchBackgroundStatable)this.curatorFramework.getData().decompressed()).forPath(child.getPath());
            }
            catch (Exception ex) {
                log.error((Throwable)ex, "Exception while getting data for node %s", new Object[]{child.getPath()});
                return null;
            }
        }

        @GuardedBy(value="lock")
        private void cacheInitialized() {
            if (this.cacheInitialized.getCount() == 0L) {
                log.error("cache is already initialized. ignoring cache initialization event.", new Object[0]);
                return;
            }
            log.info("Node watcher of role[%s] is now initialized.", new Object[]{this.nodeRole.getJsonName()});
            for (DruidNodeDiscovery.Listener listener : this.nodeListeners) {
                this.safeSchedule(() -> {
                    listener.nodesAdded(this.unmodifiableNodes);
                    listener.nodeViewInitialized();
                }, "Exception occured in nodesAdded([%s]) in listener [%s].", this.unmodifiableNodes, listener);
            }
            this.cacheInitialized.countDown();
        }

        private void safeSchedule(Runnable runnable, String errMsgFormat, Object ... args) {
            this.listenerExecutor.submit(() -> {
                try {
                    runnable.run();
                }
                catch (Exception ex) {
                    log.error(errMsgFormat, args);
                }
            });
        }

        private static /* synthetic */ void lambda$removeNode$3(DruidNodeDiscovery.Listener listener, List nodeRemoved) {
            listener.nodesRemoved(nodeRemoved);
        }

        private static /* synthetic */ void lambda$addNode$2(DruidNodeDiscovery.Listener listener, List newNode) {
            listener.nodesAdded(newNode);
        }
    }
}

