/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.adapter;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
import org.apache.pulsar.reactive.client.internal.adapter.ProducerCache;
import org.apache.pulsar.reactive.client.internal.adapter.PulsarFutureAdapter;
import org.apache.pulsar.reactive.client.internal.adapter.ReactiveProducerAdapter;
import org.apache.pulsar.reactive.client.internal.adapter.ReactiveProducerAdapterFactory;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class AdaptedReactiveMessageSender<T>
implements ReactiveMessageSender<T> {
    private final Schema<T> schema;
    private final ReactiveMessageSenderSpec senderSpec;
    private final int maxConcurrency;
    private final ReactiveProducerAdapterFactory reactiveProducerAdapterFactory;
    private final ProducerCache producerCache;
    private final Supplier<PublisherTransformer> producerActionTransformer;
    private final Object producerActionTransformerKey;
    private final boolean stopOnError;

    AdaptedReactiveMessageSender(Schema<T> schema, ReactiveMessageSenderSpec senderSpec, int maxConcurrency, ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ProducerCache producerCache, Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey, boolean stopOnError) {
        this.schema = schema;
        this.senderSpec = senderSpec;
        this.maxConcurrency = maxConcurrency;
        this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
        this.producerCache = producerCache;
        this.producerActionTransformer = producerActionTransformer;
        this.producerActionTransformerKey = producerActionTransformerKey;
        this.stopOnError = stopOnError;
    }

    ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
        return this.reactiveProducerAdapterFactory.create(pulsarClient -> {
            ProducerBuilder producerBuilder = pulsarClient.newProducer(this.schema);
            this.configureProducerBuilder(producerBuilder);
            return producerBuilder;
        }, this.producerCache, this.producerActionTransformer, this.producerActionTransformerKey);
    }

    private void configureProducerBuilder(ProducerBuilder<T> producerBuilder) {
        if (this.senderSpec.getTopicName() != null) {
            producerBuilder.topic(this.senderSpec.getTopicName());
        }
        if (this.senderSpec.getProducerName() != null) {
            producerBuilder.producerName(this.senderSpec.getProducerName());
        }
        if (this.senderSpec.getSendTimeout() != null) {
            producerBuilder.sendTimeout((int)(this.senderSpec.getSendTimeout().toMillis() / 1000L), TimeUnit.SECONDS);
        }
        if (this.senderSpec.getMaxPendingMessages() != null) {
            producerBuilder.maxPendingMessages(this.senderSpec.getMaxPendingMessages().intValue());
        }
        if (this.senderSpec.getMaxPendingMessagesAcrossPartitions() != null) {
            producerBuilder.maxPendingMessagesAcrossPartitions(this.senderSpec.getMaxPendingMessagesAcrossPartitions().intValue());
        }
        if (this.senderSpec.getMessageRoutingMode() != null) {
            producerBuilder.messageRoutingMode(this.senderSpec.getMessageRoutingMode());
        }
        if (this.senderSpec.getHashingScheme() != null) {
            producerBuilder.hashingScheme(this.senderSpec.getHashingScheme());
        }
        if (this.senderSpec.getCryptoFailureAction() != null) {
            producerBuilder.cryptoFailureAction(this.senderSpec.getCryptoFailureAction());
        }
        if (this.senderSpec.getMessageRouter() != null) {
            producerBuilder.messageRouter(this.senderSpec.getMessageRouter());
        }
        if (this.senderSpec.getBatchingMaxPublishDelay() != null) {
            producerBuilder.batchingMaxPublishDelay(this.senderSpec.getBatchingMaxPublishDelay().toNanos(), TimeUnit.NANOSECONDS);
        }
        if (this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency() != null) {
            producerBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue());
        }
        if (this.senderSpec.getBatchingMaxMessages() != null) {
            producerBuilder.batchingMaxMessages(this.senderSpec.getBatchingMaxMessages().intValue());
        }
        if (this.senderSpec.getBatchingMaxBytes() != null) {
            producerBuilder.batchingMaxBytes(this.senderSpec.getBatchingMaxBytes().intValue());
        }
        if (this.senderSpec.getBatchingEnabled() != null) {
            producerBuilder.enableBatching(this.senderSpec.getBatchingEnabled().booleanValue());
        }
        if (this.senderSpec.getBatcherBuilder() != null) {
            producerBuilder.batcherBuilder(this.senderSpec.getBatcherBuilder());
        }
        if (this.senderSpec.getChunkingEnabled() != null) {
            producerBuilder.enableChunking(this.senderSpec.getChunkingEnabled().booleanValue());
        }
        if (this.senderSpec.getCryptoKeyReader() != null) {
            producerBuilder.cryptoKeyReader(this.senderSpec.getCryptoKeyReader());
        }
        if (this.senderSpec.getEncryptionKeys() != null && !this.senderSpec.getEncryptionKeys().isEmpty()) {
            this.senderSpec.getEncryptionKeys().forEach(arg_0 -> producerBuilder.addEncryptionKey(arg_0));
        }
        if (this.senderSpec.getCompressionType() != null) {
            producerBuilder.compressionType(this.senderSpec.getCompressionType());
        }
        if (this.senderSpec.getInitialSequenceId() != null) {
            producerBuilder.initialSequenceId(this.senderSpec.getInitialSequenceId().longValue());
        }
        if (this.senderSpec.getAutoUpdatePartitions() != null) {
            producerBuilder.autoUpdatePartitions(this.senderSpec.getAutoUpdatePartitions().booleanValue());
        }
        if (this.senderSpec.getAutoUpdatePartitionsInterval() != null) {
            producerBuilder.autoUpdatePartitionsInterval((int)(this.senderSpec.getAutoUpdatePartitionsInterval().toMillis() / 1000L), TimeUnit.SECONDS);
        }
        if (this.senderSpec.getMultiSchema() != null) {
            producerBuilder.enableMultiSchema(this.senderSpec.getMultiSchema().booleanValue());
        }
        if (this.senderSpec.getAccessMode() != null) {
            producerBuilder.accessMode(this.senderSpec.getAccessMode());
        }
        if (this.senderSpec.getLazyStartPartitionedProducers() != null) {
            producerBuilder.enableLazyStartPartitionedProducers(this.senderSpec.getLazyStartPartitionedProducers().booleanValue());
        }
        if (this.senderSpec.getProperties() != null && !this.senderSpec.getProperties().isEmpty()) {
            producerBuilder.properties(Collections.unmodifiableMap(new LinkedHashMap(this.senderSpec.getProperties())));
        }
    }

    public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
        return this.createReactiveProducerAdapter().usingProducer((producer, transformer) -> this.createMessageMono(messageSpec, (Producer<T>)producer, (PublisherTransformer)transformer));
    }

    private Mono<MessageSendResult<T>> createMessageSendResult(MessageSpec<T> messageSpec, Producer<T> producer, PublisherTransformer transformer) {
        Mono result = this.createMessageMono(messageSpec, producer, transformer).map(messageId -> new MessageSendResult(messageId, messageSpec, null));
        if (this.stopOnError) {
            return result.onErrorResume(throwable -> Mono.error((Throwable)new ReactiveMessageSendingException(throwable, messageSpec)));
        }
        return result.onErrorResume(throwable -> Mono.just((Object)new MessageSendResult(null, messageSpec, throwable)));
    }

    private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer, PublisherTransformer transformer) {
        return PulsarFutureAdapter.adaptPulsarFuture(() -> {
            TypedMessageBuilder typedMessageBuilder = producer.newMessage();
            ((InternalMessageSpec)messageSpec).configure(typedMessageBuilder);
            return typedMessageBuilder.sendAsync();
        }).transform(arg_0 -> ((PublisherTransformer)transformer).transform(arg_0));
    }

    public Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>> messageSpecs) {
        return this.createReactiveProducerAdapter().usingProducerMany((producer, transformer) -> Flux.from((Publisher)messageSpecs).flatMapSequential(messageSpec -> this.createMessageSendResult((MessageSpec<T>)messageSpec, (Producer<T>)producer, (PublisherTransformer)transformer), this.maxConcurrency));
    }
}

