/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.autobench;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.examples.autobench.ThrottledBenchmark;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class PubSubBenchmark
extends ThrottledBenchmark {
    public PubSubBenchmark(String name, long messageCount, long messageSize) {
        super(name, messageCount, messageSize);
    }

    @Override
    void executeWithLimiter(Options connectOptions) throws InterruptedException {
        byte[] payload = this.createPayload();
        String subject = this.getSubject();
        CompletableFuture<Object> go = new CompletableFuture<Object>();
        CompletableFuture<Void> subReady = new CompletableFuture<Void>();
        CompletableFuture<Void> pubReady = new CompletableFuture<Void>();
        CompletableFuture<Void> subDone = new CompletableFuture<Void>();
        CompletableFuture<Void> pubDone = new CompletableFuture<Void>();
        Thread subThread = new Thread(() -> {
            try {
                int count = 0;
                Connection subConnect = Nats.connect(connectOptions);
                if (subConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    Subscription sub = subConnect.subscribe(subject);
                    this.defaultFlush(subConnect);
                    subReady.complete(null);
                    while ((long)count < this.getMessageCount()) {
                        Message msg = sub.nextMessage(Duration.ofSeconds(5L));
                        if (msg != null) {
                            ++count;
                            continue;
                        }
                        throw new Exception("No messages within timeout.");
                    }
                    subDone.complete(null);
                }
                catch (Exception exp) {
                    this.setException(exp);
                }
                finally {
                    subConnect.close();
                }
            }
            catch (Exception ex) {
                subReady.cancel(true);
                this.setException(ex);
            }
            finally {
                subDone.complete(null);
            }
        }, "PubSub Test - Subscriber");
        subThread.start();
        Thread pubThread = new Thread(() -> {
            try {
                Connection pubConnect = Nats.connect(connectOptions);
                if (pubConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    pubReady.complete(null);
                    go.get();
                    int i = 0;
                    while ((long)i < this.getMessageCount()) {
                        pubConnect.publish(subject, payload);
                        this.adjustAndSleep(pubConnect);
                        ++i;
                    }
                    try {
                        pubConnect.flush(Duration.ofSeconds(5L));
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    pubDone.complete(null);
                }
                finally {
                    pubConnect.close();
                }
            }
            catch (Exception ex) {
                pubReady.cancel(true);
                this.setException(ex);
                this.pubFailed();
            }
            finally {
                pubDone.complete(null);
            }
        }, "PubSub Test - Publisher");
        pubThread.start();
        this.getFutureSafely(subReady);
        this.getFutureSafely(pubReady);
        if (this.getException() != null) {
            go.complete(null);
            return;
        }
        this.startTiming();
        go.complete(null);
        this.getFutureSafely(pubDone);
        this.getFutureSafely(subDone);
        this.endTiming();
        pubThread.join();
        subThread.join();
    }
}

