/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarClientImpl
implements PulsarClient {
    private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
    private final ClientConfigurationData conf;
    private LookupService lookup;
    private final ConnectionPool cnxPool;
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private AtomicReference<State> state = new AtomicReference();
    private final IdentityHashMap<ProducerBase<?>, Boolean> producers;
    private final IdentityHashMap<ConsumerBase<?>, Boolean> consumers;
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private final EventLoopGroup eventLoopGroup;

    @Deprecated
    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
    }

    @Deprecated
    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup);
    }

    @Deprecated
    public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException {
        this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup, cnxPool);
    }

    public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
        this(conf, PulsarClientImpl.getEventLoopGroup(conf));
    }

    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
    }

    public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException {
        if (conf == null || StringUtils.isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
            throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
        }
        this.eventLoopGroup = eventLoopGroup;
        this.conf = conf;
        conf.getAuthentication().start();
        this.cnxPool = cnxPool;
        this.externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), PulsarClientImpl.getThreadFactory("pulsar-external-listener"));
        this.lookup = conf.getServiceUrl().startsWith("http") ? new HttpLookupService(conf, eventLoopGroup) : new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), this.externalExecutorProvider.getExecutor());
        this.timer = new HashedWheelTimer(PulsarClientImpl.getThreadFactory("pulsar-timer"), 1L, TimeUnit.MILLISECONDS);
        this.producers = Maps.newIdentityHashMap();
        this.consumers = Maps.newIdentityHashMap();
        this.state.set(State.Open);
    }

    public ClientConfigurationData getConfiguration() {
        return this.conf;
    }

    @Override
    public ProducerBuilder<byte[]> newProducer() {
        return new ProducerBuilderImpl<byte[]>(this, Schema.BYTES);
    }

    @Override
    public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
        return new ProducerBuilderImpl<T>(this, schema);
    }

    @Override
    public ConsumerBuilder<byte[]> newConsumer() {
        return new ConsumerBuilderImpl<byte[]>(this, Schema.BYTES);
    }

    @Override
    public <T> ConsumerBuilder<T> newConsumer(Schema<T> schema) {
        return new ConsumerBuilderImpl<T>(this, schema);
    }

    @Override
    public ReaderBuilder<byte[]> newReader() {
        return new ReaderBuilderImpl<byte[]>(this, Schema.BYTES);
    }

    @Override
    public <T> ReaderBuilder<T> newReader(Schema<T> schema) {
        return new ReaderBuilderImpl<T>(this, schema);
    }

    @Override
    public Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        try {
            ProducerConfigurationData conf = new ProducerConfigurationData();
            conf.setTopicName(topic);
            return this.createProducerAsync(conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public Producer<byte[]> createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException {
        if (conf == null) {
            throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
        }
        try {
            ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
            confData.setTopicName(topic);
            return this.createProducerAsync(confData).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Producer<byte[]>> createProducerAsync(String topic) {
        ProducerConfigurationData conf = new ProducerConfigurationData();
        conf.setTopicName(topic);
        return this.createProducerAsync(conf);
    }

    @Override
    public CompletableFuture<Producer<byte[]>> createProducerAsync(String topic, ProducerConfiguration conf) {
        ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
        confData.setTopicName(topic);
        return this.createProducerAsync(confData);
    }

    public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
        return this.createProducerAsync(conf, Schema.BYTES, null);
    }

    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
        return this.createProducerAsync(conf, schema, null);
    }

    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors<T> interceptors) {
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
        }
        if (schema instanceof AutoConsumeSchema) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("AutoConsumeSchema is only used by consumers to detect schemas automatically"));
        }
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed : state = " + (Object)((Object)this.state.get())));
        }
        String topic = conf.getTopicName();
        if (!TopicName.isValid(topic)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
        }
        if (schema instanceof AutoProduceBytesSchema) {
            AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema)schema;
            return this.lookup.getSchema(TopicName.get(conf.getTopicName())).thenCompose(schemaInfoOptional -> {
                if (schemaInfoOptional.isPresent()) {
                    autoProduceBytesSchema.setSchema(Schema.getSchema((SchemaInfo)((SchemaInfo)schemaInfoOptional.get())));
                } else {
                    autoProduceBytesSchema.setSchema(Schema.BYTES);
                }
                return this.createProducerAsync(topic, conf, schema, interceptors);
            });
        }
        return this.createProducerAsync(topic, conf, schema, interceptors);
    }

    private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic, ProducerConfigurationData conf, Schema<T> schema, ProducerInterceptors<T> interceptors) {
        CompletableFuture producerCreatedFuture = new CompletableFuture();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            ProducerBase producer = metadata.partitions > 1 ? new PartitionedProducerImpl(this, topic, conf, metadata.partitions, producerCreatedFuture, schema, interceptors) : new ProducerImpl(this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
            IdentityHashMap<ProducerBase<?>, Boolean> identityHashMap = this.producers;
            synchronized (identityHashMap) {
                this.producers.put(producer, Boolean.TRUE);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata: {}", (Object)topic, (Object)ex.getMessage());
            producerCreatedFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return producerCreatedFuture;
    }

    @Override
    public Consumer<byte[]> subscribe(String topic, String subscription) throws PulsarClientException {
        return this.subscribe(topic, subscription, new ConsumerConfiguration());
    }

    @Override
    public Consumer<byte[]> subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException {
        try {
            return this.subscribeAsync(topic, subscription, conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Consumer<byte[]>> subscribeAsync(String topic, String subscription) {
        ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<byte[]>();
        conf.getTopicNames().add(topic);
        conf.setSubscriptionName(subscription);
        return this.subscribeAsync(conf);
    }

    @Override
    public CompletableFuture<Consumer<byte[]>> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf) {
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
        }
        Object confData = conf.getConfigurationData().clone();
        ((ConsumerConfigurationData)confData).getTopicNames().add(topic);
        ((ConsumerConfigurationData)confData).setSubscriptionName(subscription);
        return this.subscribeAsync((ConsumerConfigurationData<byte[]>)confData);
    }

    public CompletableFuture<Consumer<byte[]>> subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
        return this.subscribeAsync(conf, Schema.BYTES, null);
    }

    public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        if (!conf.getTopicNames().stream().allMatch(TopicName::isValid)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
        }
        if (StringUtils.isBlank(conf.getSubscriptionName())) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
        }
        if (conf.isReadCompacted() && (!conf.getTopicNames().stream().allMatch(topic -> TopicName.get(topic).getDomain() == TopicDomain.persistent) || conf.getSubscriptionType() != SubscriptionType.Exclusive && conf.getSubscriptionType() != SubscriptionType.Failover)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Read compacted can only be used with exclusive of failover persistent subscriptions"));
        }
        if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Active consumer listener is only supported for failover subscription"));
        }
        if (conf.getTopicsPattern() != null) {
            if (!conf.getTopicNames().isEmpty()) {
                return FutureUtil.failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern"));
            }
            return this.patternTopicSubscribeAsync(conf, schema, interceptors);
        }
        if (conf.getTopicNames().size() == 1) {
            return this.singleTopicSubscribeAsync(conf, schema, interceptors);
        }
        return this.multiTopicSubscribeAsync(conf, schema, interceptors);
    }

    private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        if (schema instanceof AutoConsumeSchema) {
            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema)schema;
            return this.lookup.getSchema(TopicName.get(conf.getSingleTopic())).thenCompose(schemaInfoOptional -> {
                if (schemaInfoOptional.isPresent() && ((SchemaInfo)schemaInfoOptional.get()).getType() == SchemaType.AVRO) {
                    GenericSchema genericSchema = GenericSchema.of((SchemaInfo)((SchemaInfo)schemaInfoOptional.get()));
                    log.info("Auto detected schema for topic {} : {}", (Object)conf.getSingleTopic(), (Object)new String(((SchemaInfo)schemaInfoOptional.get()).getSchema(), StandardCharsets.UTF_8));
                    autoConsumeSchema.setSchema((Schema)genericSchema);
                    return this.doSingleTopicSubscribeAsync(conf, schema, interceptors);
                }
                return FutureUtil.failedFuture(new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
            });
        }
        return this.doSingleTopicSubscribeAsync(conf, schema, interceptors);
    }

    private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        CompletableFuture consumerSubscribedFuture = new CompletableFuture();
        String topic = conf.getSingleTopic();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            ExecutorService listenerThread = this.externalExecutorProvider.getExecutor();
            ConsumerBase consumer = metadata.partitions > 1 ? MultiTopicsConsumerImpl.createPartitionedConsumer(this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors) : new ConsumerImpl(this, topic, conf, listenerThread, -1, consumerSubscribedFuture, schema, interceptors);
            IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
            synchronized (identityHashMap) {
                this.consumers.put(consumer, Boolean.TRUE);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata", (Object)topic, ex);
            consumerSubscribedFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return consumerSubscribedFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        CompletableFuture consumerSubscribedFuture = new CompletableFuture();
        MultiTopicsConsumerImpl<T> consumer = new MultiTopicsConsumerImpl<T>(this, conf, this.externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
        IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            this.consumers.put(consumer, Boolean.TRUE);
        }
        return consumerSubscribedFuture;
    }

    public CompletableFuture<Consumer<byte[]>> patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> conf) {
        return this.patternTopicSubscribeAsync(conf, Schema.BYTES, null);
    }

    private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors interceptors) {
        String regex = conf.getTopicsPattern().pattern();
        PulsarApi.CommandGetTopicsOfNamespace.Mode subscriptionMode = conf.getSubscriptionTopicsMode();
        TopicName destination = TopicName.get(regex);
        NamespaceName namespaceName = destination.getNamespaceObject();
        CompletableFuture consumerSubscribedFuture = new CompletableFuture();
        ((CompletableFuture)this.lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> {
            if (log.isDebugEnabled()) {
                log.debug("Get topics under namespace {}, topics.size: {}", (Object)namespaceName.toString(), (Object)topics.size());
                topics.forEach(topicName -> log.debug("Get topics under namespace {}, topic: {}", (Object)namespaceName.toString(), topicName));
            }
            List<String> topicsList = PulsarClientImpl.topicsPatternFilter(topics, conf.getTopicsPattern());
            conf.getTopicNames().addAll(topicsList);
            PatternMultiTopicsConsumerImpl consumer = new PatternMultiTopicsConsumerImpl(conf.getTopicsPattern(), this, conf, this.externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, subscriptionMode, interceptors);
            IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
            synchronized (identityHashMap) {
                this.consumers.put(consumer, Boolean.TRUE);
            }
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get topics under namespace", (Object)namespaceName);
            consumerSubscribedFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return consumerSubscribedFuture;
    }

    public static List<String> topicsPatternFilter(List<String> original, Pattern topicsPattern) {
        Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern;
        return original.stream().map(TopicName::get).map(TopicName::toString).filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()).collect(Collectors.toList());
    }

    @Override
    public Reader<byte[]> createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException {
        try {
            return this.createReaderAsync(topic, startMessageId, conf).get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    public CompletableFuture<Reader<byte[]>> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf) {
        Object confData = conf.getReaderConfigurationData().clone();
        ((ReaderConfigurationData)confData).setTopicName(topic);
        ((ReaderConfigurationData)confData).setStartMessageId(startMessageId);
        return this.createReaderAsync((ReaderConfigurationData<byte[]>)confData);
    }

    public CompletableFuture<Reader<byte[]>> createReaderAsync(ReaderConfigurationData<byte[]> conf) {
        return this.createReaderAsync(conf, Schema.BYTES);
    }

    public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
        if (schema instanceof AutoConsumeSchema) {
            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema)schema;
            return this.lookup.getSchema(TopicName.get(conf.getTopicName())).thenCompose(schemaInfoOptional -> {
                if (schemaInfoOptional.isPresent() && ((SchemaInfo)schemaInfoOptional.get()).getType() == SchemaType.AVRO) {
                    GenericSchema genericSchema = GenericSchema.of((SchemaInfo)((SchemaInfo)schemaInfoOptional.get()));
                    log.info("Auto detected schema for topic {} : {}", (Object)conf.getTopicName(), (Object)new String(((SchemaInfo)schemaInfoOptional.get()).getSchema(), StandardCharsets.UTF_8));
                    autoConsumeSchema.setSchema((Schema)genericSchema);
                    return this.doCreateReaderAsync(conf, schema);
                }
                return FutureUtil.failedFuture(new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
            });
        }
        return this.doCreateReaderAsync(conf, schema);
    }

    <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
        if (this.state.get() != State.Open) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        if (conf == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
        }
        String topic = conf.getTopicName();
        if (!TopicName.isValid(topic)) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
        }
        if (conf.getStartMessageId() == null) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
        }
        CompletableFuture readerFuture = new CompletableFuture();
        ((CompletableFuture)this.getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received topic metadata. partitions: {}", (Object)topic, (Object)metadata.partitions);
            }
            if (metadata.partitions > 1) {
                readerFuture.completeExceptionally(new PulsarClientException("Topic reader cannot be created on a partitioned topic"));
                return;
            }
            CompletableFuture consumerSubscribedFuture = new CompletableFuture();
            ExecutorService listenerThread = this.externalExecutorProvider.getExecutor();
            ReaderImpl reader = new ReaderImpl(this, conf, listenerThread, consumerSubscribedFuture, schema);
            IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
            synchronized (identityHashMap) {
                this.consumers.put(reader.getConsumer(), Boolean.TRUE);
            }
            ((CompletableFuture)consumerSubscribedFuture.thenRun(() -> readerFuture.complete(reader))).exceptionally(ex -> {
                log.warn("[{}] Failed to get create topic reader", (Object)topic, ex);
                readerFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to get partitioned topic metadata", (Object)topic, ex);
            readerFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return readerFuture;
    }

    public CompletableFuture<Optional<SchemaInfo>> getSchema(String topic) {
        TopicName topicName;
        try {
            topicName = TopicName.get(topic);
        }
        catch (Throwable t) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name: " + topic));
        }
        return this.lookup.getSchema(topicName);
    }

    @Override
    public void close() throws PulsarClientException {
        try {
            this.closeAsync().get();
        }
        catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            }
            throw new PulsarClientException(t);
        }
        catch (InterruptedException e) {
            throw new PulsarClientException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        log.info("Client closing. URL: {}", (Object)this.lookup.getServiceUrl());
        if (!this.state.compareAndSet(State.Open, State.Closing)) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
        }
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        ArrayList futures = Lists.newArrayList();
        IdentityHashMap<HandlerState, Boolean> identityHashMap = this.producers;
        synchronized (identityHashMap) {
            ArrayList<ProducerBase<?>> producersToClose = Lists.newArrayList(this.producers.keySet());
            producersToClose.forEach(p -> futures.add(p.closeAsync()));
        }
        identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            ArrayList<ConsumerBase<?>> consumersToClose = Lists.newArrayList(this.consumers.keySet());
            consumersToClose.forEach(c -> futures.add(c.closeAsync()));
        }
        ((CompletableFuture)FutureUtil.waitForAll(futures).thenRun(() -> {
            try {
                this.shutdown();
                closeFuture.complete(null);
                this.state.set(State.Closed);
            }
            catch (PulsarClientException e) {
                closeFuture.completeExceptionally(e);
            }
        })).exceptionally(exception -> {
            closeFuture.completeExceptionally((Throwable)exception);
            return null;
        });
        return closeFuture;
    }

    @Override
    public void shutdown() throws PulsarClientException {
        try {
            this.lookup.close();
            this.cnxPool.close();
            this.timer.stop();
            this.externalExecutorProvider.shutdownNow();
            this.conf.getAuthentication().close();
        }
        catch (Throwable t) {
            log.warn("Failed to shutdown Pulsar client", t);
            throw new PulsarClientException(t);
        }
    }

    @Override
    public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException {
        log.info("Updating service URL to {}", (Object)serviceUrl);
        this.conf.setServiceUrl(serviceUrl);
        this.lookup.updateServiceUrl(serviceUrl);
        this.cnxPool.closeAllConnections();
    }

    protected CompletableFuture<ClientCnx> getConnection(String topic) {
        TopicName topicName = TopicName.get(topic);
        return this.lookup.getBroker(topicName).thenCompose(pair -> this.cnxPool.getConnection((InetSocketAddress)pair.getLeft(), (InetSocketAddress)pair.getRight()));
    }

    public Timer timer() {
        return this.timer;
    }

    ExecutorProvider externalExecutorProvider() {
        return this.externalExecutorProvider;
    }

    long newProducerId() {
        return this.producerIdGenerator.getAndIncrement();
    }

    long newConsumerId() {
        return this.consumerIdGenerator.getAndIncrement();
    }

    public long newRequestId() {
        return this.requestIdGenerator.getAndIncrement();
    }

    public ConnectionPool getCnxPool() {
        return this.cnxPool;
    }

    public EventLoopGroup eventLoopGroup() {
        return this.eventLoopGroup;
    }

    public LookupService getLookup() {
        return this.lookup;
    }

    public void reloadLookUp() throws PulsarClientException {
        this.lookup = this.conf.getServiceUrl().startsWith("http") ? new HttpLookupService(this.conf, this.eventLoopGroup) : new BinaryProtoLookupService(this, this.conf.getServiceUrl(), this.conf.isUseTls(), this.externalExecutorProvider.getExecutor());
    }

    public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
        return this.getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
    }

    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture;
        try {
            TopicName topicName = TopicName.get(topic);
            metadataFuture = this.lookup.getPartitionedTopicMetadata(topicName);
        }
        catch (IllegalArgumentException e) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
        }
        return metadataFuture;
    }

    private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
        ThreadFactory threadFactory = PulsarClientImpl.getThreadFactory("pulsar-client-io");
        return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
    }

    private static ThreadFactory getThreadFactory(String poolName) {
        return new DefaultThreadFactory(poolName, Thread.currentThread().isDaemon());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cleanupProducer(ProducerBase<?> producer) {
        IdentityHashMap<ProducerBase<?>, Boolean> identityHashMap = this.producers;
        synchronized (identityHashMap) {
            this.producers.remove(producer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cleanupConsumer(ConsumerBase<?> consumer) {
        IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            this.consumers.remove(consumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int producersCount() {
        IdentityHashMap<ProducerBase<?>, Boolean> identityHashMap = this.producers;
        synchronized (identityHashMap) {
            return this.producers.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int consumersCount() {
        IdentityHashMap<ConsumerBase<?>, Boolean> identityHashMap = this.consumers;
        synchronized (identityHashMap) {
            return this.consumers.size();
        }
    }

    static enum State {
        Open,
        Closing,
        Closed;

    }
}

