/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream.simple;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.IterableConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.MessageConsumer;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.examples.jetstream.NatsJsUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class QueueExample {
    private static final String STREAM = "q-stream";
    private static final String SUBJECT = "q-subject";
    private static final String CONSUMER_NAME = "q-consumer";
    private static final int MESSAGE_COUNT = 10000;
    private static final int CONSUMER_COUNT = 6;
    public static String SERVER = "nats://localhost:4222";

    public static void main(String[] args) {
        Options options = Options.builder().server(SERVER).build();
        try (Connection nc = Nats.connect(options);){
            StreamContext streamContext;
            JetStream js = nc.jetStream();
            NatsJsUtils.createOrReplaceStream(nc.jetStreamManagement(), STREAM, SUBJECT);
            try {
                streamContext = nc.getStreamContext(STREAM);
                streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
            }
            catch (JetStreamApiException | IOException e) {
                if (nc != null) {
                    if (var3_4 != null) {
                        try {
                            nc.close();
                        }
                        catch (Throwable throwable) {
                            var3_4.addSuppressed(throwable);
                        }
                    } else {
                        nc.close();
                    }
                }
                return;
            }
            for (int x = 1; x <= 10000; ++x) {
                try {
                    js.publish(SUBJECT, ("message-" + x).getBytes());
                    continue;
                }
                catch (JetStreamApiException | IOException e) {
                    throw new RuntimeException(e);
                }
            }
            int readCount = 1666;
            System.out.println("Each of the 10000 queue subscriptions will read about " + readCount + " messages.");
            CountDownLatch latch = new CountDownLatch(10000);
            ArrayList<ConsumerHolder> holders = new ArrayList<ConsumerHolder>();
            for (int id = 1; id <= 6; ++id) {
                try {
                    if (id % 2 == 0) {
                        holders.add(new HandlerConsumerHolder(id, streamContext, latch));
                        continue;
                    }
                    holders.add(new IterableConsumerHolder(id, streamContext, latch));
                    continue;
                }
                catch (JetStreamApiException e) {
                    throw new RuntimeException(e);
                }
            }
            latch.await(20L, TimeUnit.SECONDS);
            for (ConsumerHolder holder : holders) {
                holder.stop();
                holder.report();
            }
            System.out.println();
            nc.jetStreamManagement().deleteStream(STREAM);
        }
        catch (JetStreamApiException | IOException | InterruptedException exception) {
            // empty catch block
        }
    }

    static abstract class ConsumerHolder {
        int id;
        ConsumerContext consumerContext;
        AtomicInteger thisReceived;
        CountDownLatch latch;

        public ConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
            this.id = id;
            this.thisReceived = new AtomicInteger();
            this.latch = latch;
            this.consumerContext = sc.getConsumerContext(QueueExample.CONSUMER_NAME);
        }

        public abstract void stop() throws InterruptedException;

        public void report() {
            System.out.printf("Instance # %d handled %d messages.\n", this.id, this.thisReceived.get());
        }
    }

    static class IterableConsumerHolder
    extends ConsumerHolder {
        IterableConsumer iterableConsumer;
        Thread t;
        CountDownLatch finished = new CountDownLatch(1);

        public IterableConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
            super(id, sc, latch);
            this.iterableConsumer = this.consumerContext.iterate();
            this.t = new Thread(() -> {
                while (latch.getCount() > 0L) {
                    try {
                        Message msg = this.iterableConsumer.nextMessage(1000L);
                        if (msg == null) continue;
                        this.thisReceived.incrementAndGet();
                        latch.countDown();
                        String data = new String(msg.getData(), StandardCharsets.US_ASCII);
                        System.out.printf("Iterable # %d message # %d %s\n", id, this.thisReceived.get(), data);
                        msg.ack();
                    }
                    catch (Exception exception) {}
                }
                this.finished.countDown();
            });
            this.t.start();
        }

        @Override
        public void stop() throws InterruptedException {
            this.finished.await(2L, TimeUnit.SECONDS);
        }
    }

    static class HandlerConsumerHolder
    extends ConsumerHolder {
        MessageConsumer messageConsumer;

        public HandlerConsumerHolder(int id, StreamContext sc, CountDownLatch latch) throws JetStreamApiException, IOException {
            super(id, sc, latch);
            this.messageConsumer = this.consumerContext.consume(msg -> {
                this.thisReceived.incrementAndGet();
                latch.countDown();
                String data = new String(msg.getData(), StandardCharsets.US_ASCII);
                System.out.printf("Handler  # %d message # %d %s\n", id, this.thisReceived.get(), data);
                msg.ack();
            });
        }

        @Override
        public void stop() throws InterruptedException {
            this.messageConsumer.stop();
        }
    }
}

