/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.ErrorListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PublishOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsDispatcher;
import io.nats.client.impl.NatsJetStreamImplBase;
import io.nats.client.impl.NatsJetStreamSubscription;
import io.nats.client.impl.StreamNamesReader;
import io.nats.client.support.JsonUtils;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class NatsJetStream
extends NatsJetStreamImplBase
implements JetStream {
    public NatsJetStream(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
        super(connection, jsOptions);
    }

    @Override
    public PublishAck publish(String subject, byte[] body) throws IOException, JetStreamApiException {
        return this.publishSyncInternal(subject, null, body, false, null);
    }

    @Override
    public PublishAck publish(String subject, byte[] body, PublishOptions options) throws IOException, JetStreamApiException {
        return this.publishSyncInternal(subject, null, body, options);
    }

    @Override
    public PublishAck publish(Message message) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return this.publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null);
    }

    @Override
    public PublishAck publish(Message message, PublishOptions options) throws IOException, JetStreamApiException {
        Validator.validateNotNull(message, "Message");
        return this.publishSyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), options);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body) {
        return this.publishAsyncInternal(subject, null, body, null, null);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body, PublishOptions options) {
        return this.publishAsyncInternal(subject, null, body, options, null);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(Message message) {
        Validator.validateNotNull(message, "Message");
        return this.publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), null, null);
    }

    @Override
    public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions options) {
        Validator.validateNotNull(message, "Message");
        return this.publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), message.isUtf8mode(), options, null);
    }

    private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options) throws IOException, JetStreamApiException {
        return this.publishSyncInternal(subject, headers, data, false, options);
    }

    @Deprecated
    private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, boolean utf8mode, PublishOptions options) throws IOException, JetStreamApiException {
        Headers merged = this.mergePublishOptions(headers, options);
        if (this.jso.isPublishNoAck()) {
            this.conn.publishInternal(subject, null, merged, data, utf8mode);
            return null;
        }
        Duration timeout = options == null ? this.jso.getRequestTimeout() : options.getStreamTimeout();
        Message resp = this.makeInternalRequestResponseRequired(subject, merged, data, utf8mode, timeout, false);
        return this.processPublishResponse(resp, options);
    }

    private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout) {
        return this.publishAsyncInternal(subject, headers, data, false, options, knownTimeout);
    }

    @Deprecated
    private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, boolean utf8mode, PublishOptions options, Duration knownTimeout) {
        Headers merged = this.mergePublishOptions(headers, options);
        if (this.jso.isPublishNoAck()) {
            this.conn.publishInternal(subject, null, merged, data, utf8mode);
            return null;
        }
        CompletableFuture<Message> future = this.conn.requestFutureInternal(subject, merged, data, utf8mode, knownTimeout, false);
        return future.thenCompose(resp -> {
            try {
                this.responseRequired((Message)resp);
                return CompletableFuture.completedFuture(this.processPublishResponse((Message)resp, options));
            }
            catch (JetStreamApiException | IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private PublishAck processPublishResponse(Message resp, PublishOptions options) throws IOException, JetStreamApiException {
        String pubStream;
        if (resp.isStatusMessage()) {
            if (resp.getStatus().getCode() == 503) {
                throw new IOException("Error Publishing: No stream available.");
            }
            throw new IOException("Error Publishing: " + resp.getStatus().getMessage());
        }
        PublishAck ack = new PublishAck(resp);
        String ackStream = ack.getStream();
        String string = pubStream = options == null ? null : options.getStream();
        if (pubStream != null && !pubStream.equals(ackStream)) {
            throw new IOException("Expected ack from stream " + pubStream + ", received from: " + ackStream);
        }
        return ack;
    }

    private Headers mergePublishOptions(Headers headers, PublishOptions opts) {
        Headers merged;
        Headers headers2 = merged = headers == null ? null : new Headers(headers);
        if (opts != null) {
            merged = this.mergeNum(merged, "Nats-Expected-Last-Sequence", opts.getExpectedLastSequence());
            merged = this.mergeNum(merged, "Nats-Expected-Last-Subject-Sequence", opts.getExpectedLastSubjectSequence());
            merged = this.mergeString(merged, "Nats-Expected-Last-Msg-Id", opts.getExpectedLastMsgId());
            merged = this.mergeString(merged, "Nats-Expected-Stream", opts.getExpectedStream());
            merged = this.mergeString(merged, "Nats-Msg-Id", opts.getMessageId());
        }
        return merged;
    }

    private Headers mergeNum(Headers h, String key, long value) {
        return value > 0L ? this._mergeNum(h, key, Long.toString(value)) : h;
    }

    private Headers mergeString(Headers h, String key, String value) {
        return Validator.nullOrEmpty(value) ? h : this._mergeNum(h, key, value);
    }

    private Headers _mergeNum(Headers h, String key, String value) {
        if (h == null) {
            h = new Headers();
        }
        return h.add(key, value);
    }

    NatsJetStreamSubscription createSubscription(String subject, String queueName, NatsDispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions pushSubscribeOptions, PullSubscribeOptions pullSubscribeOptions) throws IOException, JetStreamApiException {
        NatsJetStreamSubscription sub;
        ConsumerConfiguration.Builder ccBuilder;
        String stream;
        SubscribeOptions so;
        boolean isPullMode;
        boolean bl = isPullMode = pullSubscribeOptions != null;
        if (isPullMode) {
            so = pullSubscribeOptions;
            stream = pullSubscribeOptions.getStream();
            ccBuilder = ConsumerConfiguration.builder(pullSubscribeOptions.getConsumerConfiguration());
            ccBuilder.deliverSubject(null);
        } else {
            so = pushSubscribeOptions == null ? PushSubscribeOptions.builder().build() : pushSubscribeOptions;
            stream = so.getStream();
            ccBuilder = ConsumerConfiguration.builder(so.getConsumerConfiguration());
            ccBuilder.maxPullWaiting(0L);
        }
        boolean bindMode = so.isBind();
        String durable = ccBuilder.getDurable();
        String inbox = ccBuilder.getDeliverSubject();
        String filterSubject = ccBuilder.getFilterSubject();
        boolean createConsumer = true;
        if (stream == null) {
            stream = this.lookupStreamBySubject(subject);
        }
        if (durable != null) {
            ConsumerInfo consumerInfo = this.lookupConsumerInfo(stream, durable);
            if (consumerInfo != null) {
                createConsumer = false;
                ConsumerConfiguration cc = consumerInfo.getConsumerConfiguration();
                String existingFilterSubject = cc.getFilterSubject();
                if (filterSubject != null && !filterSubject.equals(existingFilterSubject)) {
                    throw new IllegalArgumentException(String.format("Subject %s mismatches consumer configuration %s.", subject, filterSubject));
                }
                filterSubject = existingFilterSubject;
                inbox = cc.getDeliverSubject();
            } else if (bindMode) {
                throw new IllegalArgumentException("Consumer not found for durable. Required in direct mode.");
            }
        }
        if (inbox == null) {
            inbox = this.conn.createInbox();
        }
        if (dispatcher == null) {
            sub = (NatsJetStreamSubscription)this.conn.createSubscription(inbox, queueName, null, true);
        } else {
            MessageHandler mh = autoAck ? new AutoAckMessageHandler(this.conn, handler) : handler;
            sub = (NatsJetStreamSubscription)dispatcher.subscribeImpl(inbox, queueName, mh, true);
        }
        if (createConsumer) {
            ConsumerInfo ci;
            if (!isPullMode) {
                ccBuilder.deliverSubject(inbox);
            }
            ccBuilder.filterSubject(filterSubject == null ? subject : filterSubject);
            try {
                ci = this.addOrUpdateConsumerInternal(stream, ccBuilder.build());
            }
            catch (JetStreamApiException e) {
                if (dispatcher == null) {
                    sub.unsubscribe();
                } else {
                    dispatcher.unsubscribe(sub);
                }
                throw e;
            }
            sub.setupJetStream(this, ci.getName(), ci.getStreamName(), inbox, so);
        } else {
            sub.setupJetStream(this, durable, stream, inbox, so);
        }
        return sub;
    }

    @Override
    public JetStreamSubscription subscribe(String subject) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        return this.createSubscription(subject, null, null, null, false, null, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        return this.createSubscription(subject, null, null, null, false, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, String queue, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        queue = Validator.validateQueueName(queue, false);
        return this.createSubscription(subject, queue, null, null, false, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, null, (NatsDispatcher)dispatcher, handler, autoAck, null, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, null, (NatsDispatcher)dispatcher, handler, autoAck, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, String queue, Dispatcher dispatcher, MessageHandler handler, boolean autoAck, PushSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        queue = Validator.validateQueueName(queue, false);
        Validator.validateNotNull(dispatcher, "Dispatcher");
        Validator.validateNotNull(handler, "Handler");
        return this.createSubscription(subject, queue, (NatsDispatcher)dispatcher, handler, autoAck, options, null);
    }

    @Override
    public JetStreamSubscription subscribe(String subject, PullSubscribeOptions options) throws IOException, JetStreamApiException {
        Validator.validateSubject(subject, true);
        Validator.validateNotNull(options, "Options");
        return this.createSubscription(subject, null, null, null, false, null, options);
    }

    ConsumerInfo lookupConsumerInfo(String stream, String consumer) throws IOException, JetStreamApiException {
        try {
            return this.getConsumerInfo(stream, consumer);
        }
        catch (JetStreamApiException e) {
            if (e.getApiErrorCode() == 10014 || e.getErrorCode() == 404 && e.getErrorDescription().contains("consumer")) {
                return null;
            }
            throw e;
        }
    }

    protected String lookupStreamBySubject(String subject) throws IOException, JetStreamApiException {
        byte[] body = JsonUtils.simpleMessageBody("subject", subject);
        StreamNamesReader snr = new StreamNamesReader();
        Message resp = this.makeRequestResponseRequired("STREAM.NAMES", body, this.jso.getRequestTimeout());
        snr.process(resp);
        if (snr.getStrings().size() != 1) {
            throw new IllegalStateException("No matching streams for subject: " + subject);
        }
        return snr.getStrings().get(0);
    }

    protected static class AutoAckMessageHandler
    implements MessageHandler {
        NatsConnection conn;
        MessageHandler userMH;

        AutoAckMessageHandler(NatsConnection conn, MessageHandler userMH) {
            this.conn = conn;
            this.userMH = userMH;
        }

        @Override
        public void onMessage(Message msg) throws InterruptedException {
            block3: {
                try {
                    this.userMH.onMessage(msg);
                    if (msg.isJetStream()) {
                        msg.ack();
                    }
                }
                catch (Exception e) {
                    ErrorListener el = this.conn.getOptions().getErrorListener();
                    if (el == null) break block3;
                    el.exceptionOccurred(this.conn, e);
                }
            }
        }
    }
}

