/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.NatsMessage;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import io.nats.examples.jetstream.NatsJsUtils;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class NatsJsPubAsync2 {
    static final String usageString = "\nUsage: java -cp <classpath> NatsJsPubAsync2 [-s server] [-strm stream] [-sub subject] [-mcnt msgCount] [-m messageWords+] [-r headerKey:headerValue]*\n\nDefault Values:\n   [-strm] example-stream\n   [-sub]  example-subject\n   [-mcnt] 10\n   [-m] hello\n\nRun Notes:\n   - msg_count < 1 is the same as 1\n   - headers are optional\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n\nUse the URL in the -s server parameter for user/pass/token authentication.\n";

    public static void main(String[] args) {
        ExampleArgs exArgs = ExampleArgs.builder("Publish Async 2", args, usageString).defaultStream("example-stream").defaultSubject("example-subject").defaultMessage("hello").defaultMsgCount(10).build();
        String hdrNote = exArgs.hasHeaders() ? ", with " + exArgs.headers.size() + " header(s)" : "";
        System.out.printf("\nPublishing to %s%s. Server is %s\n\n", exArgs.subject, hdrNote, exArgs.server);
        try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server));){
            class Helper {
                Message msg;
                CompletableFuture<PublishAck> future;

                Helper() {
                }
            }
            JetStream js = nc.jetStream();
            NatsJsUtils.createStreamOrUpdateSubjects(nc, exArgs.stream, exArgs.subject);
            int stop = exArgs.msgCount < 2 ? 2 : exArgs.msgCount + 1;
            CountDownLatch ackLatch = new CountDownLatch(stop - 1);
            LinkedBlockingQueue queue = new LinkedBlockingQueue();
            LinkedBlockingQueue redo = new LinkedBlockingQueue();
            new Thread(() -> {
                while (ackLatch.getCount() > 0L) {
                    Helper h;
                    try {
                        h = (Helper)queue.take();
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    try {
                        if (h.future.isDone()) {
                            PublishAck pa = h.future.get();
                            System.out.println("Pub ack received " + pa);
                            ackLatch.countDown();
                            continue;
                        }
                        queue.add(h);
                    }
                    catch (InterruptedException e) {
                        redo.add(h);
                        ackLatch.countDown();
                    }
                    catch (ExecutionException e) {
                        redo.add(h);
                        ackLatch.countDown();
                    }
                }
            }).start();
            new Thread(() -> {
                for (int x = 1; x < stop; ++x) {
                    String data = exArgs.msgCount < 2 ? exArgs.message : exArgs.message + "-" + x;
                    NatsMessage msg = NatsMessage.builder().subject(exArgs.subject).headers(exArgs.headers).data(data, StandardCharsets.UTF_8).build();
                    System.out.printf("Publishing message %s on subject %s.\n", data, exArgs.subject);
                    Helper h = new Helper();
                    h.msg = msg;
                    h.future = js.publishAsync(msg);
                    queue.add(h);
                }
                while (ackLatch.getCount() > 0L) {
                    try {
                        Helper h = (Helper)redo.poll(10L, TimeUnit.MILLISECONDS);
                        if (h == null) continue;
                        System.out.printf("RE publishing message %s.\n", new String(h.msg.getData()));
                        h.future = js.publishAsync(h.msg);
                        queue.add(h);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }).start();
            ackLatch.await();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

