/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.PublishAck;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import io.nats.examples.jetstream.NatsJsUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class NatsJsPullSubMultipleWorkers {
    static final String usageString = "\nUsage: java -cp <classpath> io.nats.examples.jetstream.NatsJsPullSubMultipleWorkers [-s server] [-strm stream] [-sub subject] [-dur durable] [-mcnt msgCount] [-scnt subCount]\n\nDefault Values:\n   [-strm stream]   psmw-stream\n   [-sub subject]   psmw-subject\n   [-dur durable]   psmw-durable\n   [-mcnt msgCount] 100\n   [-scnt subCount] 5\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n\nUse the URL in the -s server parameter for user/pass/token authentication.\n";

    public static void main(String[] args) {
        ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe, Durable Consumer, Shared Processing", args, usageString).defaultStream("psmw-stream").defaultSubject("psmw-subject").defaultDurable("psmw-durable").defaultMsgCount(100).defaultSubCount(5).build();
        try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, true));){
            JetStreamManagement jsm = nc.jetStreamManagement();
            NatsJsUtils.createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
            ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(exArgs.durable).ackPolicy(AckPolicy.All).build();
            jsm.addOrUpdateConsumer(exArgs.stream, cc);
            JetStream js = nc.jetStream();
            System.out.println();
            PullSubscribeOptions pso = PullSubscribeOptions.fastBind(exArgs.stream, exArgs.durable);
            AtomicInteger allReceived = new AtomicInteger();
            ArrayList<JsPullSubWorker> subscribers = new ArrayList<JsPullSubWorker>();
            ArrayList<Thread> subThreads = new ArrayList<Thread>();
            for (int id = 1; id <= exArgs.subCount; ++id) {
                JetStreamSubscription sub = js.subscribe(exArgs.subject, pso);
                JsPullSubWorker qs = new JsPullSubWorker(id, exArgs, js, sub, allReceived);
                subscribers.add(qs);
                Thread t = new Thread(qs);
                subThreads.add(t);
                t.start();
            }
            nc.flush(Duration.ofSeconds(1L));
            Thread pubThread = new Thread(new JsPublisher(js, exArgs));
            pubThread.start();
            pubThread.join();
            for (Thread t : subThreads) {
                t.join();
            }
            for (JsPullSubWorker sub : subscribers) {
                sub.report();
            }
            System.out.println();
            jsm.deleteStream(exArgs.stream);
        }
        catch (RuntimeException e) {
            System.err.println(e);
        }
        catch (JetStreamApiException | IOException | InterruptedException | TimeoutException e) {
            System.err.println(e);
        }
    }

    static class JsPullSubWorker
    implements Runnable {
        int id;
        int thisReceived;
        List<String> datas;
        ExampleArgs exArgs;
        JetStream js;
        JetStreamSubscription sub;
        AtomicInteger allReceived;

        public JsPullSubWorker(int id, ExampleArgs exArgs, JetStream js, JetStreamSubscription sub, AtomicInteger allReceived) {
            this.id = id;
            this.thisReceived = 0;
            this.datas = new ArrayList<String>();
            this.exArgs = exArgs;
            this.js = js;
            this.sub = sub;
            this.allReceived = allReceived;
        }

        public void report() {
            System.out.printf("Sub # %d handled %d messages.\n", this.id, this.thisReceived);
        }

        @Override
        public void run() {
            while (this.allReceived.get() < this.exArgs.msgCount) {
                List<Message> messages = this.sub.fetch(5, 500L);
                while (messages != null && messages.size() > 0) {
                    for (Message msg : messages) {
                        ++this.thisReceived;
                        this.allReceived.incrementAndGet();
                        String data = new String(msg.getData(), StandardCharsets.US_ASCII);
                        this.datas.add(data);
                        System.out.printf("QS # %d message # %d %s\n", this.id, this.thisReceived, data);
                    }
                    messages.get(messages.size() - 1).ack();
                    messages = this.sub.fetch(10, 500L);
                }
            }
            System.out.printf("QS # %d completed.\n", this.id);
        }
    }

    static class JsPublisher
    implements Runnable {
        JetStream js;
        ExampleArgs exArgs;

        public JsPublisher(JetStream js, ExampleArgs exArgs) {
            this.js = js;
            this.exArgs = exArgs;
        }

        @Override
        public void run() {
            for (int x = 1; x <= this.exArgs.msgCount; ++x) {
                try {
                    PublishAck publishAck = this.js.publish(this.exArgs.subject, ("Data # " + x).getBytes(StandardCharsets.US_ASCII));
                    continue;
                }
                catch (JetStreamApiException | IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
    }
}

