/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.TopicMetadataImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl<T>
extends ProducerBase<T> {
    private List<ProducerImpl<T>> producers;
    private MessageRouter routerPolicy;
    private final ProducerStatsRecorderImpl stats;
    private final TopicMetadata topicMetadata;
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);

    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema) {
        super(client, topic, conf, producerCreatedFuture, schema);
        this.producers = Lists.newArrayListWithCapacity(numPartitions);
        this.topicMetadata = new TopicMetadataImpl(numPartitions);
        this.routerPolicy = this.getMessageRouter();
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStatsRecorderImpl() : null;
        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
        conf.setMaxPendingMessages(maxPendingMessages);
        this.start();
    }

    private MessageRouter getMessageRouter() {
        MessageRouter messageRouter;
        MessageRoutingMode messageRouteMode = this.conf.getMessageRoutingMode();
        MessageRouter customMessageRouter = this.conf.getCustomMessageRouter();
        switch (messageRouteMode) {
            case CustomPartition: {
                Preconditions.checkNotNull(customMessageRouter);
                messageRouter = customMessageRouter;
                break;
            }
            case RoundRobinPartition: {
                messageRouter = new RoundRobinPartitionMessageRouterImpl(this.conf.getHashingScheme(), ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.isBatchingEnabled(), TimeUnit.MICROSECONDS.toMillis(this.conf.getBatchingMaxPublishDelayMicros()));
                break;
            }
            default: {
                messageRouter = new SinglePartitionMessageRouterImpl(ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.getHashingScheme());
            }
        }
        return messageRouter;
    }

    @Override
    public String getProducerName() {
        return this.producers.get(0).getProducerName();
    }

    @Override
    public long getLastSequenceId() {
        return this.producers.stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1L);
    }

    private void start() {
        AtomicReference createFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger();
        for (int partitionIndex = 0; partitionIndex < this.topicMetadata.numPartitions(); ++partitionIndex) {
            String partitionName = TopicName.get(this.topic).getPartition(partitionIndex).toString();
            ProducerImpl producer = new ProducerImpl(this.client, partitionName, this.conf, new CompletableFuture(), partitionIndex, this.schema);
            this.producers.add(producer);
            producer.producerCreatedFuture().handle((prod, createException) -> {
                if (createException != null) {
                    this.setState(HandlerState.State.Failed);
                    createFail.compareAndSet(null, createException);
                }
                if (completed.incrementAndGet() == this.topicMetadata.numPartitions()) {
                    if (createFail.get() == null) {
                        this.setState(HandlerState.State.Ready);
                        this.producerCreatedFuture().complete(this);
                        log.info("[{}] Created partitioned producer", (Object)this.topic);
                    } else {
                        this.closeAsync().handle((ok, closeException) -> {
                            this.producerCreatedFuture().completeExceptionally((Throwable)createFail.get());
                            this.client.cleanupProducer(this);
                            return null;
                        });
                        log.error("[{}] Could not create partitioned producer.", (Object)this.topic, (Object)((Throwable)createFail.get()).getCause());
                    }
                }
                return null;
            });
        }
    }

    @Override
    CompletableFuture<MessageId> internalSendAsync(Message<T> message) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                break;
            }
            case Closing: 
            case Closed: {
                return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Producer already closed"));
            }
            case Terminated: {
                return FutureUtil.failedFuture(new PulsarClientException.TopicTerminatedException("Topic was terminated"));
            }
            case Failed: 
            case Uninitialized: {
                return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
            }
        }
        int partition = this.routerPolicy.choosePartition(message, this.topicMetadata);
        Preconditions.checkArgument(partition >= 0 && partition < this.topicMetadata.numPartitions(), "Illegal partition index chosen by the message routing policy: " + partition);
        return this.producers.get(partition).internalSendAsync(message);
    }

    @Override
    public CompletableFuture<Void> flushAsync() {
        List<CompletableFuture> flushFutures = this.producers.stream().map(ProducerImpl::flushAsync).collect(Collectors.toList());
        return CompletableFuture.allOf(flushFutures.toArray(new CompletableFuture[flushFutures.size()]));
    }

    @Override
    void triggerFlush() {
        this.producers.forEach(ProducerImpl::triggerFlush);
    }

    @Override
    public boolean isConnected() {
        return this.producers.stream().allMatch(ProducerImpl::isConnected);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerState.State.Closing);
        AtomicReference closeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.topicMetadata.numPartitions());
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        for (Producer producer : this.producers) {
            if (producer == null) continue;
            producer.closeAsync().handle((closed, ex) -> {
                if (ex != null) {
                    closeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (closeFail.get() == null) {
                        this.setState(HandlerState.State.Closed);
                        closeFuture.complete(null);
                        log.info("[{}] Closed Partitioned Producer", (Object)this.topic);
                        this.client.cleanupProducer(this);
                    } else {
                        this.setState(HandlerState.State.Failed);
                        closeFuture.completeExceptionally((Throwable)closeFail.get());
                        log.error("[{}] Could not close Partitioned Producer", (Object)this.topic, (Object)((Throwable)closeFail.get()).getCause());
                    }
                }
                return null;
            });
        }
        return closeFuture;
    }

    @Override
    public synchronized ProducerStatsRecorderImpl getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        for (int i = 0; i < this.topicMetadata.numPartitions(); ++i) {
            this.stats.updateCumulativeStats(this.producers.get(i).getStats());
        }
        return this.stats;
    }

    @Override
    String getHandlerName() {
        return "partition-producer";
    }
}

