/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Options;
import io.nats.client.StatisticsCollector;
import io.nats.client.impl.DataPort;
import io.nats.client.impl.MarkerMessage;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.WriterMessageQueue;
import io.nats.client.support.BuilderBase;
import io.nats.client.support.ByteArrayBuilder;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

class NatsConnectionWriter
implements Runnable {
    private static final int BUFFER_BLOCK_SIZE = 256;
    private final NatsConnection connection;
    private final ReentrantLock writerLock;
    private Future<Boolean> stopped;
    private Future<DataPort> dataPortFuture;
    private DataPort dataPort;
    private final AtomicBoolean running;
    private final AtomicReference<Mode> mode;
    private final ReentrantLock startStopLock;
    private byte[] sendBuffer;
    private final AtomicInteger sendBufferLength;
    private final WriterMessageQueue normalOutgoing;
    private final WriterMessageQueue reconnectOutgoing;
    private final long reconnectBufferSize;

    NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) {
        this.connection = connection;
        this.writerLock = new ReentrantLock();
        this.running = new AtomicBoolean(false);
        this.mode = sourceWriter == null ? new AtomicReference<Mode>(Mode.Normal) : new AtomicReference<Mode>(Mode.Reconnect);
        this.startStopLock = new ReentrantLock();
        this.stopped = new CompletableFuture<Boolean>();
        ((CompletableFuture)this.stopped).complete(Boolean.TRUE);
        Options options = connection.getOptions();
        int sbl = BuilderBase.bufferAllocSize(options.getBufferSize(), 256);
        this.sendBufferLength = new AtomicInteger(sbl);
        this.sendBuffer = new byte[sbl];
        this.reconnectBufferSize = options.getReconnectBufferSize();
        if (sourceWriter == null) {
            this.normalOutgoing = new WriterMessageQueue(options.getMaxMessagesInOutgoingQueue(), options.isDiscardMessagesWhenOutgoingQueueFull(), options.getWriteQueuePushTimeout());
            this.reconnectOutgoing = new WriterMessageQueue(options.getWriteQueuePushTimeout());
        } else {
            this.normalOutgoing = sourceWriter.normalOutgoing;
            this.normalOutgoing.resume();
            this.reconnectOutgoing = sourceWriter.reconnectOutgoing;
            this.reconnectOutgoing.resume();
        }
    }

    void start(Future<DataPort> dataPortFuture) {
        this.startStopLock.lock();
        try {
            this.dataPortFuture = dataPortFuture;
            this.running.set(true);
            this.normalOutgoing.resume();
            this.reconnectOutgoing.resume();
            this.stopped = this.connection.getExecutor().submit(this, Boolean.TRUE);
        }
        finally {
            this.startStopLock.unlock();
        }
    }

    Future<Boolean> stop() {
        if (this.running.get()) {
            this.running.set(false);
            this.startStopLock.lock();
            try {
                this.normalOutgoing.pause();
                this.reconnectOutgoing.pause();
                this.normalOutgoing.filter();
            }
            finally {
                this.startStopLock.unlock();
            }
        }
        return this.stopped;
    }

    boolean isRunning() {
        return this.running.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException {
        this.writerLock.lock();
        try {
            int sendPosition = 0;
            int sbl = this.sendBufferLength.get();
            while (msg != null) {
                if (msg == MarkerMessage.END_RECONNECT) {
                    this.mode.set(Mode.Normal);
                    break;
                }
                long size = msg.getSizeInBytes();
                if ((long)sendPosition + size > (long)sbl) {
                    if (sendPosition > 0) {
                        dataPort.write(this.sendBuffer, sendPosition);
                        stats.registerWrite(sendPosition);
                        sendPosition = 0;
                    }
                    if (size > (long)sbl) {
                        sbl = BuilderBase.bufferAllocSize((int)size, 256);
                        this.sendBufferLength.set(sbl);
                        this.sendBuffer = new byte[sbl];
                    }
                }
                ByteArrayBuilder bab = msg.getProtocolBab();
                int babLen = bab.length();
                System.arraycopy(bab.internalArray(), 0, this.sendBuffer, sendPosition, babLen);
                sendPosition += babLen;
                this.sendBuffer[sendPosition++] = 13;
                this.sendBuffer[sendPosition++] = 10;
                if (!msg.isProtocol()) {
                    sendPosition += msg.copyNotEmptyHeaders(sendPosition, this.sendBuffer);
                    byte[] bytes = msg.getData();
                    if (bytes.length > 0) {
                        System.arraycopy(bytes, 0, this.sendBuffer, sendPosition, bytes.length);
                        sendPosition += bytes.length;
                    }
                    this.sendBuffer[sendPosition++] = 13;
                    this.sendBuffer[sendPosition++] = 10;
                }
                stats.incrementOut(size);
                if (msg.flushImmediatelyAfterPublish) {
                    dataPort.flush();
                }
                msg = msg.next;
            }
            if (sendPosition > 0) {
                dataPort.write(this.sendBuffer, sendPosition);
                stats.registerWrite(sendPosition);
            }
        }
        finally {
            this.writerLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Duration outgoingTimeout = Duration.ofMinutes(2L);
        Duration reconnectTimeout = Duration.ofMillis(1L);
        try {
            this.dataPort = this.dataPortFuture.get();
            StatisticsCollector stats = this.connection.getStatisticsCollector();
            while (this.running.get() && !Thread.interrupted()) {
                NatsMessage msg = this.mode.get() == Mode.Normal ? this.normalOutgoing.accumulate(this.sendBufferLength.get(), 1000L, outgoingTimeout) : this.reconnectOutgoing.accumulate(this.sendBufferLength.get(), 1000L, reconnectTimeout);
                if (msg == null) continue;
                this.sendMessageBatch(msg, this.dataPort, stats);
            }
        }
        catch (IOException | BufferOverflowException io) {
            if (this.running.get()) {
                this.connection.handleCommunicationIssue(io);
            }
        }
        catch (CancellationException | ExecutionException io) {
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.running.set(false);
        }
    }

    void enterReconnectMode() {
        this.reconnectOutgoing.clear();
        this.mode.set(Mode.Reconnect);
    }

    void enterWaitingForEndReconnectMode() {
        this.reconnectOutgoing.queueMarkerMessage(MarkerMessage.END_RECONNECT);
        this.mode.set(Mode.WaitingForEndReconnect);
    }

    boolean canQueueDuringReconnect(NatsMessage msg) {
        return this.reconnectBufferSize < 0L || this.normalOutgoing.sizeInBytes() + msg.getSizeInBytes() < this.reconnectBufferSize;
    }

    boolean queue(NatsMessage msg) {
        return this.normalOutgoing.push(msg);
    }

    void queueInternalMessage(NatsMessage msg) {
        if (this.mode.get() == Mode.Reconnect) {
            this.reconnectOutgoing.push(msg);
        } else {
            this.normalOutgoing.push(msg, true);
        }
    }

    void flushBuffer() {
        this.writerLock.lock();
        try {
            if (this.running.get()) {
                this.dataPort.flush();
            }
        }
        catch (Exception exception) {
        }
        finally {
            this.writerLock.unlock();
        }
    }

    long outgoingPendingMessageCount() {
        this.writerLock.lock();
        try {
            long l = this.normalOutgoing == null ? -1L : this.normalOutgoing.length();
            return l;
        }
        finally {
            this.writerLock.unlock();
        }
    }

    long outgoingPendingBytes() {
        this.writerLock.lock();
        try {
            long l = this.normalOutgoing == null ? -1L : this.normalOutgoing.sizeInBytes();
            return l;
        }
        finally {
            this.writerLock.unlock();
        }
    }

    static enum Mode {
        Normal,
        Reconnect,
        WaitingForEndReconnect;

    }
}

