/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.ErrorListener;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.impl.NatsMessage;
import io.nats.examples.ExampleArgs;
import io.nats.examples.ExampleUtils;
import io.nats.examples.jetstream.NatsJsUtils;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

public class NatsJsPushSubFlowControl {
    static final String usageString = "\nUsage: java -cp <classpath> io.nats.examples.jetstream.NatsJsPushSubFlowControl [-s server]\n\nDefault Values:\n   [-strm] fc-stream\n   [-sub]  fc-subject\n\nUse tls:// or opentls:// to require tls, via the Default SSLContext\n\nSet the environment variable NATS_NKEY to use challenge response authentication by setting a file containing your private key.\n\nSet the environment variable NATS_CREDS to use JWT/NKey authentication by setting a file containing your user creds.\n\nUse the URL in the -s server parameter for user/pass/token authentication.\n";

    public static void main(String[] args) {
        ExampleArgs exArgs = ExampleArgs.builder("Push Subscribe Using Flow Control", args, usageString).defaultStream("fc-stream").defaultSubject("fc-subject").build();
        final AtomicInteger flowControlMessagesProcessed = new AtomicInteger();
        ErrorListener el = new ErrorListener(){

            @Override
            public void flowControlProcessed(Connection conn, JetStreamSubscription sub, String subject, ErrorListener.FlowControlSource source) {
                System.out.printf("Flow Control Processed (%d), Connection: %d, Subject: %s, Source: %s\n", new Object[]{flowControlMessagesProcessed.incrementAndGet(), conn.getServerInfo().getClientId(), subject, source});
            }
        };
        try (Connection nc = Nats.connect(ExampleUtils.createExampleOptions(exArgs.server, false, el, null));){
            Message msg;
            JetStreamManagement jsm = nc.jetStreamManagement();
            NatsJsUtils.createStreamExitWhenExists(jsm, exArgs.stream, exArgs.subject);
            JetStream js = nc.jetStream();
            ConsumerConfiguration cc = ConsumerConfiguration.builder().flowControl(500L).build();
            PushSubscribeOptions pso = ((PushSubscribeOptions.Builder)PushSubscribeOptions.builder().configuration(cc)).build();
            JetStreamSubscription sub = js.subscribe(exArgs.subject, pso);
            nc.flush(Duration.ofSeconds(5L));
            byte[] data = new byte[8192];
            for (int x = 1; x <= 1000; ++x) {
                msg = NatsMessage.builder().subject(exArgs.subject).data(data).build();
                js.publish(msg);
            }
            ExampleUtils.sleep(1000L);
            int red = 0;
            msg = sub.nextMessage(Duration.ofSeconds(1L));
            while (msg != null) {
                msg.ack();
                ++red;
                msg = sub.nextMessage(1000L);
            }
            System.out.println("\n" + red + " message(s) were received.");
            System.out.println(flowControlMessagesProcessed.get() + " flow control message(s) were processed.\n");
            sub.unsubscribe();
            nc.flush(Duration.ofSeconds(5L));
            jsm.deleteStream(exArgs.stream);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

