/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Message;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.NatsConstants;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

class MessageQueue {
    protected static final int STOPPED = 0;
    protected static final int RUNNING = 1;
    protected static final int DRAINING = 2;
    protected static final String POISON = "_poison";
    protected final AtomicLong length;
    protected final AtomicLong sizeInBytes;
    protected final AtomicInteger running;
    protected final boolean singleReaderMode;
    protected final LinkedBlockingQueue<NatsMessage> queue;
    protected final Lock editLock;
    protected final int maxMessagesInOutgoingQueue;
    protected final boolean discardWhenFull;
    protected final long offerLockMillis;
    protected final long offerTimeoutMillis;
    protected final Duration requestCleanupInterval;
    protected static final NatsMessage POISON_PILL = new NatsMessage("_poison", null, NatsConstants.EMPTY_BODY);

    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval) {
        this(singleReaderMode, -1, false, requestCleanupInterval, null);
    }

    MessageQueue(boolean singleReaderMode, Duration requestCleanupInterval, MessageQueue source) {
        this(singleReaderMode, -1, false, requestCleanupInterval, source);
    }

    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval) {
        this(singleReaderMode, maxMessagesInOutgoingQueue, discardWhenFull, requestCleanupInterval, null);
    }

    MessageQueue(boolean singleReaderMode, int maxMessagesInOutgoingQueue, boolean discardWhenFull, Duration requestCleanupInterval, MessageQueue source) {
        this.maxMessagesInOutgoingQueue = maxMessagesInOutgoingQueue;
        this.queue = maxMessagesInOutgoingQueue > 0 ? new LinkedBlockingQueue(maxMessagesInOutgoingQueue) : new LinkedBlockingQueue();
        this.discardWhenFull = discardWhenFull;
        this.running = new AtomicInteger(1);
        this.sizeInBytes = new AtomicLong(0L);
        this.length = new AtomicLong(0L);
        this.offerLockMillis = requestCleanupInterval.toMillis();
        this.offerTimeoutMillis = Math.max(1L, requestCleanupInterval.toMillis() * 95L / 100L);
        this.editLock = new ReentrantLock();
        this.singleReaderMode = singleReaderMode;
        this.requestCleanupInterval = requestCleanupInterval;
        if (source != null) {
            source.drainTo(this);
        }
    }

    void drainTo(MessageQueue target) {
        this.editLock.lock();
        try {
            this.queue.drainTo(target.queue);
            target.length.set(this.queue.size());
        }
        finally {
            this.editLock.unlock();
        }
    }

    boolean isSingleReaderMode() {
        return this.singleReaderMode;
    }

    boolean isRunning() {
        return this.running.get() != 0;
    }

    boolean isDraining() {
        return this.running.get() == 2;
    }

    void pause() {
        this.running.set(0);
        this.poisonTheQueue();
    }

    void resume() {
        this.running.set(1);
    }

    void drain() {
        this.running.set(2);
        this.poisonTheQueue();
    }

    boolean isDrained() {
        return this.running.get() == 2 && this.length() == 0L;
    }

    boolean push(NatsMessage msg) {
        return this.push(msg, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean push(NatsMessage msg, boolean internal) {
        long start = System.currentTimeMillis();
        try {
            if (!this.editLock.tryLock(this.offerLockMillis, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("Output queue is full " + this.queue.size());
            }
            if (!internal && this.discardWhenFull) {
                boolean bl = this.queue.offer(msg);
                return bl;
            }
            long timeoutLeft = Math.max(100L, this.offerTimeoutMillis - (System.currentTimeMillis() - start));
            if (!this.queue.offer(msg, timeoutLeft, TimeUnit.MILLISECONDS)) {
                throw new IllegalStateException("Output queue is full " + this.queue.size());
            }
            this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
            this.length.incrementAndGet();
            boolean bl = true;
            return bl;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            boolean bl = false;
            return bl;
        }
        finally {
            this.editLock.unlock();
        }
    }

    void poisonTheQueue() {
        try {
            this.queue.add(POISON_PILL);
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    NatsMessage poll(Duration timeout) throws InterruptedException {
        NatsMessage msg = null;
        if (timeout == null || this.isDraining()) {
            msg = this.queue.poll();
        } else {
            long nanos = timeout.toNanos();
            if (nanos != 0L) {
                msg = this.queue.poll(nanos, TimeUnit.NANOSECONDS);
            } else {
                while (this.isRunning() && (msg = this.queue.poll(100L, TimeUnit.DAYS)) == null) {
                }
            }
        }
        return msg == null || this.isPoison(msg) ? null : msg;
    }

    private boolean isPoison(Message msg) {
        return msg == POISON_PILL;
    }

    NatsMessage pop(Duration timeout) throws InterruptedException {
        if (!this.isRunning()) {
            return null;
        }
        NatsMessage msg = this.poll(timeout);
        if (msg == null) {
            return null;
        }
        this.sizeInBytes.getAndAdd(-msg.getSizeInBytes());
        this.length.decrementAndGet();
        return msg;
    }

    NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout) throws InterruptedException {
        NatsMessage next;
        if (!this.singleReaderMode) {
            throw new IllegalStateException("Accumulate is only supported in single reader mode.");
        }
        if (!this.isRunning()) {
            return null;
        }
        NatsMessage msg = this.poll(timeout);
        if (msg == null) {
            return null;
        }
        long size = msg.getSizeInBytes();
        if (maxMessagesToAccumulate <= 1L || size >= maxBytesToAccumulate) {
            this.sizeInBytes.addAndGet(-size);
            this.length.decrementAndGet();
            return msg;
        }
        long count = 1L;
        NatsMessage cursor = msg;
        while ((next = this.queue.peek()) != null && !this.isPoison(next)) {
            long s = next.getSizeInBytes();
            if (maxBytesToAccumulate >= 0L && size + s >= maxBytesToAccumulate) break;
            size += s;
            this.queue.poll();
            cursor.next = next;
            if (next.flushImmediatelyAfterPublish || ++count == maxMessagesToAccumulate) break;
            cursor = cursor.next;
        }
        this.sizeInBytes.addAndGet(-size);
        this.length.addAndGet(-count);
        return msg;
    }

    NatsMessage popNow() throws InterruptedException {
        return this.pop(null);
    }

    long length() {
        return this.length.get();
    }

    long sizeInBytes() {
        return this.sizeInBytes.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void filter(Predicate<NatsMessage> p) {
        this.editLock.lock();
        try {
            if (this.isRunning()) {
                throw new IllegalStateException("Filter is only supported when the queue is paused");
            }
            ArrayList<NatsMessage> newQueue = new ArrayList<NatsMessage>();
            NatsMessage cursor = this.queue.poll();
            while (cursor != null) {
                if (!p.test(cursor)) {
                    newQueue.add(cursor);
                } else {
                    this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
                    this.length.decrementAndGet();
                }
                cursor = this.queue.poll();
            }
            this.queue.addAll(newQueue);
        }
        finally {
            this.editLock.unlock();
        }
    }
}

