/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public final class WindowedSubscriber<T>
extends BaseSubscriber<T> {
    private static final String WORK_ID_KEY = "workId";
    private static final String UPSTREAM_REQUESTED_KEY = "requested";
    private static final String DIFFERENCE_KEY = "difference";
    private final Map<String, Object> loggingContext;
    private final String terminatedMessage;
    private final Duration nextItemTimout;
    private final Consumer<T> releaser;
    private final Function<Flux<T>, Flux<T>> windowDecorator;
    private final boolean cleanCloseStreamingWindowOnTerminate;
    private final ClientLogger logger;
    private final AtomicInteger idGenerator = new AtomicInteger();
    private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque();
    private final ConcurrentLinkedQueue<WindowWork<T>> workQueue = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedDeque<WindowWork<T>> timedOutOrCanceledWorkQueue = new ConcurrentLinkedDeque();
    private volatile Subscription s;
    private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class, Subscription.class, "s");
    private volatile long requested;
    private static final AtomicLongFieldUpdater<WindowedSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(WindowedSubscriber.class, "requested");
    private volatile int wip;
    private static final AtomicIntegerFieldUpdater<WindowedSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(WindowedSubscriber.class, "wip");
    private volatile boolean done;
    private volatile Throwable error;
    private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class, Throwable.class, "error");

    public WindowedSubscriber(Map<String, Object> loggingContext, String terminatedMessage, WindowedSubscriberOptions<T> options) {
        this.loggingContext = Objects.requireNonNull(loggingContext, "'loggingContext' cannot be null.");
        Objects.requireNonNull(terminatedMessage, "'terminatedMessage' cannot be null.");
        this.terminatedMessage = terminatedMessage + " (Reason: %s)";
        Objects.requireNonNull(options, "'options' cannot be null.");
        this.nextItemTimout = ((WindowedSubscriberOptions)options).getNextItemTimout();
        this.releaser = ((WindowedSubscriberOptions)options).getReleaser();
        this.windowDecorator = ((WindowedSubscriberOptions)options).getWindowDecorator();
        this.cleanCloseStreamingWindowOnTerminate = ((WindowedSubscriberOptions)options).shouldCleanCloseStreamingWindowOnTerminate();
        this.logger = new ClientLogger(WindowedSubscriber.class, loggingContext);
    }

    public IterableStream<T> enqueueRequest(int windowSize, Duration windowTimeout) {
        EnqueueResult<T> r = this.enqueueRequestImpl(windowSize, windowTimeout);
        return r.getWindowIterable();
    }

    EnqueueResult<T> enqueueRequestImpl(int windowSize, Duration windowTimeout) {
        if (windowSize < 1) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'windowSize' must be strictly positive."));
        }
        if (Objects.isNull(windowTimeout)) {
            throw this.logger.logExceptionAsError((RuntimeException)new NullPointerException("'windowTimeout' cannot be null."));
        }
        if (windowTimeout.isNegative() || windowTimeout.isZero()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'windowTimeout' period must be strictly positive."));
        }
        long workId = this.idGenerator.getAndIncrement();
        WindowWork w = new WindowWork(this, workId, windowSize, windowTimeout);
        if (this.isDoneOrCanceled()) {
            w.terminate(WorkTerminalState.PARENT_TERMINAL);
            return new EnqueueResult(w, w.windowFlux(false));
        }
        this.workQueue.add(w);
        this.drain();
        return new EnqueueResult(w, w.windowFlux(true));
    }

    protected void hookOnSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)((Object)this), (Subscription)s)) {
            this.drain();
        }
    }

    protected void hookOnNext(T item) {
        if (this.done) {
            Operators.onNextDropped(item, (Context)super.currentContext());
            return;
        }
        if (this.s == Operators.cancelledSubscription()) {
            Operators.onDiscard(item, (Context)super.currentContext());
            return;
        }
        if (this.queue.offer(item)) {
            this.drain();
        } else {
            IllegalStateException e = Exceptions.failWithOverflow((String)"Queue is full: Reactive Streams source doesn't respect backpressure");
            Operators.onOperatorError((Subscription)this, (Throwable)e, (Context)super.currentContext());
            Operators.onDiscard(item, (Context)super.currentContext());
            if (ERROR.compareAndSet(this, null, e)) {
                this.done = true;
                this.drain();
            } else {
                this.done = true;
                Operators.onErrorDropped((Throwable)e, (Context)super.currentContext());
            }
        }
    }

    protected void hookOnError(Throwable t) {
        if (this.done) {
            Operators.onErrorDropped((Throwable)t, (Context)super.currentContext());
            return;
        }
        RuntimeException e = new RuntimeException(String.format(this.terminatedMessage, "upstream-error"), t);
        if (ERROR.compareAndSet(this, null, e)) {
            this.done = true;
            this.drain();
        } else {
            this.done = true;
            Operators.onErrorDropped((Throwable)t, (Context)super.currentContext());
        }
    }

    protected void hookOnComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        this.drain();
    }

    protected void hookOnCancel() {
        if (Operators.terminate(S, (Object)((Object)this))) {
            Operators.onDiscardQueueWithClear(this.queue, (Context)this.currentContext(), null);
            this.drain();
        }
    }

    private void drain() {
        if (WIP.getAndIncrement(this) != 0) {
            return;
        }
        this.drainLoop();
    }

    private void drainLoop() {
        int missed = 1;
        while (true) {
            WindowWork<T> w0;
            if (this.isDoneOrCanceled()) {
                WindowWork<T> w1;
                if (this.cleanCloseStreamingWindowOnTerminate && (w0 = this.workQueue.peek()) != null && ((WindowWork)w0).isStreaming()) {
                    this.workQueue.poll();
                    ((WindowWork)w0).terminate(WorkTerminalState.PARENT_TERMINAL_CLEAN_CLOSE);
                }
                while ((w1 = this.workQueue.poll()) != null) {
                    ((WindowWork)w1).terminate(WorkTerminalState.PARENT_TERMINAL);
                }
            } else {
                T item;
                long r;
                boolean hasWork;
                while ((w0 = this.timedOutOrCanceledWorkQueue.poll()) != null) {
                    if (!this.workQueue.remove(w0)) continue;
                    if (w0.hasTimedOut()) {
                        ((WindowWork)w0).terminate(WorkTerminalState.TIMED_OUT);
                        continue;
                    }
                    if (w0.isCanceled()) {
                        ((WindowWork)w0).terminate(WorkTerminalState.CANCELED);
                        continue;
                    }
                    throw ((WindowWork)w0).getLogger().log((RuntimeException)new IllegalStateException("work with unexpected state in timeout-cancel queue."));
                }
                WindowWork<T> w = this.workQueue.peek();
                boolean bl = hasWork = w != null;
                if (hasWork && this.s != null) {
                    this.initWorkOnce(w);
                    r = this.requested;
                    if (r != 0L) {
                        long emitted;
                        EmitNextResult emitterLoopResult = EmitNextResult.OK;
                        for (emitted = 0L; emitted != r && (item = this.queue.poll()) != null && !this.isDoneOrCanceled(); ++emitted) {
                            emitterLoopResult = ((WindowWork)w).tryEmitNext(item);
                            if (emitterLoopResult == EmitNextResult.OK) continue;
                            this.queue.addFirst(item);
                            break;
                        }
                        if (emitted != 0L && r != Long.MAX_VALUE) {
                            REQUESTED.addAndGet(this, -emitted);
                        }
                        if (w.hasReceivedDemanded()) {
                            this.workQueue.poll();
                            ((WindowWork)w).terminate(WorkTerminalState.RECEIVED_DEMANDED);
                            continue;
                        }
                        if (emitterLoopResult == EmitNextResult.CONSUMER_ERROR) {
                            this.workQueue.poll();
                            ((WindowWork)w).terminate(WorkTerminalState.CONSUMER_ERROR);
                            continue;
                        }
                        if (emitterLoopResult == EmitNextResult.SINK_ERROR) {
                            this.workQueue.poll();
                            ((WindowWork)w).terminate(WorkTerminalState.SINK_ERROR);
                            continue;
                        }
                    }
                }
                if (!hasWork && this.releaser != null && (r = this.requested) != 0L) {
                    long released;
                    for (released = 0L; released != r; ++released) {
                        boolean workArrived;
                        boolean bl2 = workArrived = !this.workQueue.isEmpty();
                        if (workArrived || (item = this.queue.poll()) == null || this.isDoneOrCanceled()) break;
                        try {
                            this.releaser.accept(item);
                            continue;
                        }
                        catch (Throwable e) {
                            this.logger.atError().log("Unexpected: 'releaser' thrown error.", new Object[]{e});
                        }
                    }
                    if (released != 0L && r != Long.MAX_VALUE) {
                        REQUESTED.addAndGet(this, -released);
                    }
                }
            }
            if ((missed = WIP.addAndGet(this, -missed)) == 0) break;
        }
    }

    private boolean isDoneOrCanceled() {
        return this.done || this.s == Operators.cancelledSubscription();
    }

    private void initWorkOnce(WindowWork<T> w) {
        if (!((WindowWork)w).init()) {
            return;
        }
        long requested = REQUESTED.get(this);
        long workDemand = ((WindowWork)w).getDemand();
        long difference = workDemand - requested;
        LoggingEventBuilder logger = ((WindowWork)w).getLogger().addKeyValue(UPSTREAM_REQUESTED_KEY, requested).addKeyValue(DIFFERENCE_KEY, difference);
        if (difference > 0L) {
            Operators.addCap(REQUESTED, (Object)((Object)this), (long)difference);
            logger.log("Initialized: request-upstream:true.");
            this.s.request(difference);
        } else {
            logger.log("Initialized: request-upstream:false.");
        }
    }

    private void postTimedOutOrCanceledWork(WindowWork<T> w) {
        this.timedOutOrCanceledWorkQueue.add(w);
        this.drain();
    }

    private Throwable getTerminalError() {
        assert (this.isDoneOrCanceled());
        if (this.done) {
            Throwable e = this.error;
            return e != null ? e : new RuntimeException(String.format(this.terminatedMessage, "upstream-completion"));
        }
        return new RuntimeException(String.format(this.terminatedMessage, "downstream-cancel"));
    }

    static final class WindowWork<T> {
        private static final String DEMAND_KEY = "demand";
        private static final String PENDING_KEY = "pending";
        public static final String SIGNAL_TYPE_KEY = "signalType";
        public static final String EMIT_RESULT_KEY = "emitResult";
        private static final String TERMINATING_WORK = "Terminating the work.";
        private final AtomicBoolean isInitialized = new AtomicBoolean(false);
        private final AtomicBoolean isCanceled = new AtomicBoolean(false);
        private final AtomicBoolean isTerminated = new AtomicBoolean(false);
        private final AtomicReference<TimeoutReason> timeoutReason = new AtomicReference<Object>(null);
        private final AtomicReference<Throwable> consumerError = new AtomicReference<Object>(null);
        private final ClientLogger logger;
        private final WindowedSubscriber<T> parent;
        private final int demand;
        private final Duration timeout;
        private final Sinks.Many<T> sink;
        private final AtomicInteger pending;
        private final Disposable.Composite timers;

        private WindowWork(WindowedSubscriber<T> parent, long id, int demand, Duration timeout) {
            this.logger = WindowWork.createLogger(((WindowedSubscriber)parent).loggingContext, id, demand);
            this.parent = parent;
            this.demand = demand;
            this.pending = new AtomicInteger(demand);
            this.timeout = timeout;
            this.sink = WindowWork.createSink();
            this.timers = Disposables.composite();
        }

        boolean isCanceled() {
            return this.isCanceled.get();
        }

        boolean hasReceivedDemanded() {
            return this.pending.get() <= 0;
        }

        boolean hasTimedOut() {
            return this.timeoutReason.get() != null;
        }

        int getPending() {
            return this.pending.get();
        }

        private long getDemand() {
            return this.demand;
        }

        private boolean hasConsumerError() {
            return this.consumerError.get() != null;
        }

        private boolean isStreaming() {
            int pending = this.getPending();
            return pending > 0 && pending < this.demand;
        }

        private boolean init() {
            if (this.isInitialized.getAndSet(true)) {
                return false;
            }
            this.timers.add(this.beginTimeoutTimer());
            this.timers.add(this.beginNextItemTimeoutTimer());
            return true;
        }

        private Flux<T> windowFlux(boolean drainOnCancel) {
            Flux flux;
            Function decorator = ((WindowedSubscriber)this.parent).windowDecorator;
            Flux flux2 = flux = decorator != null ? (Flux)decorator.apply(this.sink.asFlux()) : this.sink.asFlux();
            if (drainOnCancel) {
                return flux.doFinally(s -> {
                    if (s == SignalType.CANCEL) {
                        this.isCanceled.set(true);
                        WindowWork w = this;
                        Schedulers.boundedElastic().schedule(() -> ((WindowedSubscriber)this.parent).postTimedOutOrCanceledWork(w));
                    }
                });
            }
            return flux;
        }

        private EmitNextResult tryEmitNext(T item) {
            Sinks.EmitResult emitResult;
            int c = this.pending.getAndDecrement();
            if (c <= 0) {
                if (c < 0) {
                    this.withPendingKey(this.logger.atWarning()).log("Unexpected emit-next attempt when no more demand.");
                }
                return EmitNextResult.RECEIVED_DEMANDED;
            }
            try {
                emitResult = this.sink.tryEmitNext(item);
            }
            catch (Throwable e) {
                this.consumerError.set(e);
                this.withPendingKey(this.logger.atError()).log("Unexpected consumer error occurred while emitting.", new Object[]{e});
                return EmitNextResult.CONSUMER_ERROR;
            }
            if (emitResult == Sinks.EmitResult.OK) {
                return EmitNextResult.OK;
            }
            this.withPendingKey(this.logger.atError()).addKeyValue(EMIT_RESULT_KEY, (Object)emitResult).log("Could not emit-next.");
            return EmitNextResult.SINK_ERROR;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void terminate(WorkTerminalState terminalState) {
            block20: {
                if (this.isTerminated.getAndSet(true)) {
                    return;
                }
                try {
                    this.timers.dispose();
                    if (terminalState != WorkTerminalState.SINK_ERROR) break block20;
                }
                catch (Throwable throwable) {
                    if (terminalState == WorkTerminalState.SINK_ERROR) {
                        this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "sink-error").log(TERMINATING_WORK);
                        return;
                    }
                    if (terminalState == WorkTerminalState.CANCELED) {
                        this.assertCondition(this.isCanceled(), terminalState);
                        this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "sink-canceled").log(TERMINATING_WORK);
                        return;
                    }
                    if (terminalState == WorkTerminalState.RECEIVED_DEMANDED) {
                        this.assertCondition(this.hasReceivedDemanded(), terminalState);
                        this.withPendingKey(this.logger.atVerbose()).log(TERMINATING_WORK);
                        this.closeWindow();
                        return;
                    }
                    if (terminalState == WorkTerminalState.CONSUMER_ERROR) {
                        this.assertCondition(this.hasConsumerError(), terminalState);
                        Throwable e = this.consumerError.get();
                        this.withPendingKey(this.logger.atWarning()).log(e.getMessage(), new Object[]{e});
                        this.closeWindow(e);
                        return;
                    }
                    if (terminalState == WorkTerminalState.TIMED_OUT) {
                        this.assertCondition(this.hasTimedOut(), terminalState);
                        TimeoutReason reason = this.timeoutReason.get();
                        Throwable e = reason.getError();
                        if (e != null) {
                            this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", reason.getMessage()).log(TERMINATING_WORK, new Object[]{e});
                            this.closeWindow(e);
                        } else {
                            this.withPendingKey(this.logger.atVerbose()).addKeyValue("reason", reason.getMessage()).log(TERMINATING_WORK);
                            this.closeWindow();
                        }
                        return;
                    }
                    if (terminalState == WorkTerminalState.PARENT_TERMINAL) {
                        this.assertCondition(((WindowedSubscriber)this.parent).isDoneOrCanceled(), terminalState);
                        Throwable e = ((WindowedSubscriber)this.parent).getTerminalError();
                        this.withPendingKey(this.logger.atWarning()).log(e.getMessage(), new Object[]{e});
                        this.closeWindow(e);
                        return;
                    }
                    if (terminalState == WorkTerminalState.PARENT_TERMINAL_CLEAN_CLOSE) {
                        this.assertCondition(((WindowedSubscriber)this.parent).isDoneOrCanceled() && this.isStreaming(), terminalState);
                        this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "terminal-clean-close").log(TERMINATING_WORK);
                        this.closeWindow();
                        return;
                    }
                    throw throwable;
                }
                this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "sink-error").log(TERMINATING_WORK);
                return;
            }
            if (terminalState == WorkTerminalState.CANCELED) {
                this.assertCondition(this.isCanceled(), terminalState);
                this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "sink-canceled").log(TERMINATING_WORK);
                return;
            }
            if (terminalState == WorkTerminalState.RECEIVED_DEMANDED) {
                this.assertCondition(this.hasReceivedDemanded(), terminalState);
                this.withPendingKey(this.logger.atVerbose()).log(TERMINATING_WORK);
                this.closeWindow();
                return;
            }
            if (terminalState == WorkTerminalState.CONSUMER_ERROR) {
                this.assertCondition(this.hasConsumerError(), terminalState);
                Throwable e = this.consumerError.get();
                this.withPendingKey(this.logger.atWarning()).log(e.getMessage(), new Object[]{e});
                this.closeWindow(e);
                return;
            }
            if (terminalState == WorkTerminalState.TIMED_OUT) {
                this.assertCondition(this.hasTimedOut(), terminalState);
                TimeoutReason reason = this.timeoutReason.get();
                Throwable e = reason.getError();
                if (e != null) {
                    this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", reason.getMessage()).log(TERMINATING_WORK, new Object[]{e});
                    this.closeWindow(e);
                } else {
                    this.withPendingKey(this.logger.atVerbose()).addKeyValue("reason", reason.getMessage()).log(TERMINATING_WORK);
                    this.closeWindow();
                }
                return;
            }
            if (terminalState == WorkTerminalState.PARENT_TERMINAL) {
                this.assertCondition(((WindowedSubscriber)this.parent).isDoneOrCanceled(), terminalState);
                Throwable e = ((WindowedSubscriber)this.parent).getTerminalError();
                this.withPendingKey(this.logger.atWarning()).log(e.getMessage(), new Object[]{e});
                this.closeWindow(e);
                return;
            }
            if (terminalState == WorkTerminalState.PARENT_TERMINAL_CLEAN_CLOSE) {
                this.assertCondition(((WindowedSubscriber)this.parent).isDoneOrCanceled() && this.isStreaming(), terminalState);
                this.withPendingKey(this.logger.atWarning()).addKeyValue("reason", "terminal-clean-close").log(TERMINATING_WORK);
                this.closeWindow();
                return;
            }
            throw this.logger.atError().log((RuntimeException)new IllegalStateException("Unknown work terminal state." + (Object)((Object)terminalState)));
        }

        private Disposable beginTimeoutTimer() {
            Disposable disposable = Mono.delay((Duration)this.timeout).publishOn(Schedulers.boundedElastic()).subscribe(__ -> this.onTimeout(TimeoutReason.TIMEOUT), e -> this.onTimeout(TimeoutReason.timeoutErrored(e)));
            return disposable;
        }

        private Disposable beginNextItemTimeoutTimer() {
            Duration nextItemTimout = ((WindowedSubscriber)this.parent).nextItemTimout;
            if (nextItemTimout == null) {
                return Disposables.disposed();
            }
            Flux nextItemTimer = this.sink.asFlux().map(__ -> Mono.delay((Duration)nextItemTimout));
            Disposable disposable = Flux.switchOnNext((Publisher)nextItemTimer).publishOn(Schedulers.boundedElastic()).subscribe(__ -> this.onTimeout(TimeoutReason.TIMEOUT_NEXT_ITEM), e -> this.onTimeout(TimeoutReason.timeoutNextItemErrored(e)));
            return disposable;
        }

        private void onTimeout(TimeoutReason reason) {
            if (this.timeoutReason.compareAndSet(null, reason)) {
                WindowWork w = this;
                ((WindowedSubscriber)this.parent).postTimedOutOrCanceledWork(w);
            }
        }

        private void assertCondition(boolean condition, WorkTerminalState terminalState) {
            if (condition) {
                return;
            }
            String message = String.format("Illegal invocation of terminate(%s).", new Object[]{terminalState});
            throw this.withPendingKey(this.logger.atError()).log((RuntimeException)new IllegalStateException(message));
        }

        private void closeWindow() {
            this.sink.emitComplete((signalType, emitResult) -> {
                this.logger.atError().addKeyValue(SIGNAL_TYPE_KEY, (Object)signalType).addKeyValue(EMIT_RESULT_KEY, (Object)emitResult).log("Could not close window.");
                return false;
            });
        }

        private void closeWindow(Throwable e) {
            this.sink.emitError(e, (signalType, emitResult) -> {
                this.logger.atError().addKeyValue(SIGNAL_TYPE_KEY, (Object)signalType).addKeyValue(EMIT_RESULT_KEY, (Object)emitResult).log("Could not closed window with error.");
                return false;
            });
        }

        private LoggingEventBuilder withPendingKey(LoggingEventBuilder logger) {
            return logger.addKeyValue(PENDING_KEY, (long)this.pending.get());
        }

        private LoggingEventBuilder getLogger() {
            return this.withPendingKey(this.logger.atVerbose());
        }

        private static ClientLogger createLogger(Map<String, Object> parentLogContext, long id, int demand) {
            HashMap<String, Object> loggingContext = new HashMap<String, Object>(parentLogContext.size() + 5);
            loggingContext.putAll(parentLogContext);
            loggingContext.put(WindowedSubscriber.WORK_ID_KEY, id);
            loggingContext.put(DEMAND_KEY, demand);
            return new ClientLogger(WindowWork.class, loggingContext);
        }

        private static <T> Sinks.Many<T> createSink() {
            return Sinks.many().replay().all();
        }

        private static final class TimeoutReason {
            static final TimeoutReason TIMEOUT = new TimeoutReason("Timeout occurred.", null);
            static final TimeoutReason TIMEOUT_NEXT_ITEM = new TimeoutReason("Timeout between the messages occurred.", null);
            private final String message;
            private final Throwable error;

            static TimeoutReason timeoutErrored(Throwable error) {
                return new TimeoutReason("Error while scheduling or waiting for timeout.", error);
            }

            static TimeoutReason timeoutNextItemErrored(Throwable error) {
                return new TimeoutReason("Error while scheduling or waiting for timeout between the messages.", error);
            }

            private TimeoutReason(String message, Throwable error) {
                this.message = message;
                this.error = error;
            }

            String getMessage() {
                return this.message;
            }

            Throwable getError() {
                return this.error;
            }
        }
    }

    public static final class WindowedSubscriberOptions<T> {
        private Consumer<T> releaser = null;
        private Duration nextItemTimout = null;
        private Function<Flux<T>, Flux<T>> windowDecorator = null;
        private boolean cleanCloseStreamingWindowOnTerminate = false;

        private Consumer<T> getReleaser() {
            return this.releaser;
        }

        private Duration getNextItemTimout() {
            return this.nextItemTimout;
        }

        private Function<Flux<T>, Flux<T>> getWindowDecorator() {
            return this.windowDecorator;
        }

        private boolean shouldCleanCloseStreamingWindowOnTerminate() {
            return this.cleanCloseStreamingWindowOnTerminate;
        }

        public WindowedSubscriberOptions<T> setReleaser(Consumer<T> releaser) {
            this.releaser = Objects.requireNonNull(releaser, "'releaser' cannot be null.");
            return this;
        }

        public WindowedSubscriberOptions<T> setNextItemTimeout(Duration nextItemTimout) {
            this.nextItemTimout = Objects.requireNonNull(nextItemTimout, "'nextItemTimout' cannot be null.");
            return this;
        }

        public WindowedSubscriberOptions<T> setWindowDecorator(Function<Flux<T>, Flux<T>> windowDecorator) {
            this.windowDecorator = Objects.requireNonNull(windowDecorator, "'windowDecorator' cannot be null.");
            return this;
        }

        public WindowedSubscriberOptions<T> cleanCloseStreamingWindowOnTerminate() {
            this.cleanCloseStreamingWindowOnTerminate = true;
            return this;
        }
    }

    static final class EnqueueResult<T> {
        private final WindowWork<T> work;
        private final Flux<T> windowFlux;
        private final IterableStream<T> windowIterable;

        private EnqueueResult(WindowWork<T> work, Flux<T> windowFlux) {
            this.work = work;
            this.windowFlux = windowFlux;
            this.windowIterable = new IterableStream(windowFlux);
        }

        WindowWork<T> getInnerWork() {
            return this.work;
        }

        Flux<T> getWindowFlux() {
            return this.windowFlux;
        }

        IterableStream<T> getWindowIterable() {
            return this.windowIterable;
        }
    }

    private static enum WorkTerminalState {
        PARENT_TERMINAL,
        PARENT_TERMINAL_CLEAN_CLOSE,
        TIMED_OUT,
        CONSUMER_ERROR,
        RECEIVED_DEMANDED,
        SINK_ERROR,
        CANCELED;

    }

    private static enum EmitNextResult {
        OK,
        CONSUMER_ERROR,
        RECEIVED_DEMANDED,
        SINK_ERROR;

    }
}

