/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.ConsumeOptions;
import io.nats.client.ConsumerContext;
import io.nats.client.Dispatcher;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.IterableConsumer;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamStatusCheckedException;
import io.nats.client.JetStreamStatusException;
import io.nats.client.Message;
import io.nats.client.MessageConsumer;
import io.nats.client.MessageHandler;
import io.nats.client.PullRequestOptions;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.impl.NatsFetchConsumer;
import io.nats.client.impl.NatsIterableConsumer;
import io.nats.client.impl.NatsJetStream;
import io.nats.client.impl.NatsJetStreamPullSubscription;
import io.nats.client.impl.NatsMessageConsumer;
import io.nats.client.impl.NatsStreamContext;
import io.nats.client.support.Validator;
import java.io.IOException;
import java.time.Duration;

public class NatsConsumerContext
implements ConsumerContext {
    private final NatsStreamContext streamContext;
    private final NatsJetStream js;
    private final PullSubscribeOptions bindPso;
    private ConsumerInfo lastConsumerInfo;

    NatsConsumerContext(NatsStreamContext streamContext, ConsumerInfo ci) throws IOException {
        this.streamContext = streamContext;
        this.js = new NatsJetStream(streamContext.jsm.conn, streamContext.jsm.jso);
        this.bindPso = PullSubscribeOptions.bind(streamContext.streamName, ci.getName());
        this.lastConsumerInfo = ci;
    }

    @Override
    public String getConsumerName() {
        return this.lastConsumerInfo.getName();
    }

    @Override
    public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
        this.lastConsumerInfo = this.streamContext.jsm.getConsumerInfo(this.streamContext.streamName, this.lastConsumerInfo.getName());
        return this.lastConsumerInfo;
    }

    @Override
    public ConsumerInfo getCachedConsumerInfo() {
        return this.lastConsumerInfo;
    }

    @Override
    public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        return new NextSub(30000L).next();
    }

    @Override
    public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        if (maxWait == null) {
            return new NextSub(30000L).next();
        }
        return this.next(maxWait.toMillis());
    }

    @Override
    public Message next(long maxWaitMillis) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
        if (maxWaitMillis < 1000L) {
            throw new IllegalArgumentException("Max wait must be at least 1000 milliseconds.");
        }
        return new NextSub(maxWaitMillis).next();
    }

    @Override
    public FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException {
        return this.fetch(FetchConsumeOptions.builder().maxMessages(maxMessages).build());
    }

    @Override
    public FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException {
        return this.fetch(FetchConsumeOptions.builder().maxBytes(maxBytes).build());
    }

    @Override
    public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException {
        Validator.required(fetchConsumeOptions, "Fetch Consume Options");
        return new NatsFetchConsumer(new SubscriptionMaker(), fetchConsumeOptions);
    }

    @Override
    public IterableConsumer consume() throws IOException, JetStreamApiException {
        return new NatsIterableConsumer(new SubscriptionMaker(), ConsumeOptions.DEFAULT_CONSUME_OPTIONS);
    }

    @Override
    public IterableConsumer consume(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException {
        Validator.required(consumeOptions, "Consume Options");
        return new NatsIterableConsumer(new SubscriptionMaker(), consumeOptions);
    }

    @Override
    public MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException {
        Validator.required(handler, "Message Handler");
        return new NatsMessageConsumer(new SubscriptionMaker(), handler, ConsumeOptions.DEFAULT_CONSUME_OPTIONS);
    }

    @Override
    public MessageConsumer consume(MessageHandler handler, ConsumeOptions consumeOptions) throws IOException, JetStreamApiException {
        Validator.required(handler, "Message Handler");
        Validator.required(consumeOptions, "Consume Options");
        return new NatsMessageConsumer(new SubscriptionMaker(), handler, consumeOptions);
    }

    class SubscriptionMaker {
        Dispatcher dispatcher;

        SubscriptionMaker() {
        }

        public NatsJetStreamPullSubscription makeSubscription(MessageHandler messageHandler) throws IOException, JetStreamApiException {
            if (messageHandler == null) {
                return (NatsJetStreamPullSubscription)NatsConsumerContext.this.js.subscribe(null, NatsConsumerContext.this.bindPso);
            }
            this.dispatcher = ((NatsConsumerContext)NatsConsumerContext.this).js.conn.createDispatcher();
            return (NatsJetStreamPullSubscription)NatsConsumerContext.this.js.subscribe(null, this.dispatcher, messageHandler, NatsConsumerContext.this.bindPso);
        }
    }

    class NextSub {
        private final long maxWaitMillis;
        private final NatsJetStreamPullSubscription sub;

        public NextSub(long maxWaitMillis) throws JetStreamApiException, IOException {
            this.sub = new SubscriptionMaker().makeSubscription(null);
            this.maxWaitMillis = maxWaitMillis;
            this.sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - 10L).build(), false, null);
        }

        Message next() throws JetStreamStatusCheckedException, InterruptedException {
            try {
                Message message = this.sub.nextMessage(this.maxWaitMillis);
                return message;
            }
            catch (JetStreamStatusException e) {
                throw new JetStreamStatusCheckedException(e);
            }
            finally {
                try {
                    this.sub.unsubscribe();
                }
                catch (Exception exception) {}
            }
        }
    }
}

