/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.stream;

import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscription;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.stream.ReadOffsetStrategy;
import org.springframework.data.redis.stream.StreamReceiver;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

class DefaultStreamReceiver<K, V extends Record<K, ?>>
implements StreamReceiver<K, V> {
    private final Log logger = LogFactory.getLog(this.getClass());
    private final ReactiveRedisTemplate<K, ?> template;
    private final ReactiveStreamOperations<K, Object, Object> streamOperations;
    private final StreamReadOptions readOptions;
    private final StreamReceiver.StreamReceiverOptions<K, V> receiverOptions;

    DefaultStreamReceiver(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) {
        this.receiverOptions = options;
        RedisSerializationContext serializationContext = RedisSerializationContext.newSerializationContext(options.getKeySerializer()).key(options.getKeySerializer()).hashKey(options.getHashKeySerializer()).hashValue(options.getHashValueSerializer()).build();
        StreamReadOptions readOptions = StreamReadOptions.empty();
        if (options.getBatchSize().isPresent()) {
            readOptions = readOptions.count(options.getBatchSize().getAsInt());
        }
        if (!options.getPollTimeout().isZero()) {
            readOptions = readOptions.block(options.getPollTimeout());
        }
        this.readOptions = readOptions;
        this.template = new ReactiveRedisTemplate(connectionFactory, serializationContext);
        this.streamOperations = options.getHashMapper() != null ? this.template.opsForStream(options.getHashMapper()) : this.template.opsForStream();
    }

    @Override
    public Flux<V> receive(StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)String.format("receive(%s)", streamOffset));
        }
        BiFunction<Object, ReadOffset, Flux> readFunction = this.receiverOptions.getHashMapper() != null ? (key, readOffset) -> this.streamOperations.read(this.receiverOptions.getTargetType(), this.readOptions, StreamOffset.create(key, readOffset)) : (key, readOffset) -> this.streamOperations.read(this.readOptions, StreamOffset.create(key, readOffset));
        return Flux.defer(() -> {
            PollState pollState = PollState.standalone(streamOffset.getOffset());
            return Flux.create((T sink) -> new StreamSubscription(sink, streamOffset.getKey(), pollState, readFunction).arm());
        });
    }

    @Override
    public Flux<V> receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)String.format("receiveAutoAck(%s, %s)", consumer, streamOffset));
        }
        BiFunction<K, ReadOffset, Flux<Record<?, ?>>> readFunction = this.getConsumeReadFunction(consumer, this.readOptions.autoAcknowledge());
        return Flux.defer(() -> {
            PollState pollState = PollState.consumer(consumer, streamOffset.getOffset());
            return Flux.create((T sink) -> new StreamSubscription(sink, streamOffset.getKey(), pollState, readFunction).arm());
        });
    }

    @Override
    public Flux<V> receive(Consumer consumer, StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)String.format("receive(%s, %s)", consumer, streamOffset));
        }
        BiFunction<K, ReadOffset, Flux<Record<?, ?>>> readFunction = this.getConsumeReadFunction(consumer, this.readOptions);
        return Flux.defer(() -> {
            PollState pollState = PollState.consumer(consumer, streamOffset.getOffset());
            return Flux.create((T sink) -> new StreamSubscription(sink, streamOffset.getKey(), pollState, readFunction).arm());
        });
    }

    private BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> getConsumeReadFunction(Consumer consumer, StreamReadOptions readOptions) {
        if (this.receiverOptions.getHashMapper() != null) {
            return (key, readOffset) -> this.streamOperations.read(this.receiverOptions.getTargetType(), consumer, readOptions, StreamOffset.create(key, readOffset));
        }
        return (key, readOffset) -> this.streamOperations.read(consumer, readOptions, StreamOffset.create(key, readOffset));
    }

    static class PollState {
        private final AtomicLong requestsPending = new AtomicLong();
        private final AtomicBoolean active = new AtomicBoolean(true);
        private final AtomicBoolean scheduled = new AtomicBoolean(false);
        private final ReadOffsetStrategy readOffsetStrategy;
        private final AtomicReference<ReadOffset> currentOffset;
        private final Optional<Consumer> consumer;

        private PollState(Optional<Consumer> consumer, ReadOffsetStrategy readOffsetStrategy, ReadOffset currentOffset) {
            this.readOffsetStrategy = readOffsetStrategy;
            this.currentOffset = new AtomicReference<ReadOffset>(currentOffset);
            this.consumer = consumer;
        }

        static PollState standalone(ReadOffset offset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(offset);
            return new PollState(Optional.empty(), strategy, strategy.getFirst(offset, Optional.empty()));
        }

        static PollState consumer(Consumer consumer, ReadOffset offset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(offset);
            Optional<Consumer> optionalConsumer = Optional.of(consumer);
            return new PollState(optionalConsumer, strategy, strategy.getFirst(offset, optionalConsumer));
        }

        public boolean isSubscriptionActive() {
            return this.active.get();
        }

        public void cancel() {
            this.active.set(false);
        }

        boolean decrementRequested() {
            long demand = this.requestsPending.get();
            if (demand > 0L) {
                return this.requestsPending.compareAndSet(demand, demand - 1L);
            }
            return false;
        }

        void incrementRequested() {
            this.requestsPending.incrementAndGet();
        }

        public long getRequested() {
            return this.requestsPending.get();
        }

        boolean setRequested(long expect, long update) {
            return this.requestsPending.compareAndSet(expect, update);
        }

        boolean activateSchedule() {
            return this.scheduled.compareAndSet(false, true);
        }

        public boolean isScheduled() {
            return this.scheduled.get();
        }

        void scheduleCompleted() {
            this.scheduled.compareAndSet(true, false);
        }

        void updateReadOffset(String messageId) {
            ReadOffset next = this.readOffsetStrategy.getNext(this.getCurrentReadOffset(), this.consumer, messageId);
            this.currentOffset.set(next);
        }

        ReadOffset getCurrentReadOffset() {
            return this.currentOffset.get();
        }
    }

    class StreamSubscription {
        private final Queue<V> overflow = (Queue)Queues.small().get();
        private final FluxSink<V> sink;
        private final K key;
        private final PollState pollState;
        private final BiFunction<K, ReadOffset, Flux<V>> readFunction;

        void arm() {
            this.sink.onRequest(toAdd -> {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onRequest(%d)", this.key, toAdd));
                }
                if (this.pollState.isSubscriptionActive()) {
                    long u;
                    long r;
                    do {
                        if ((r = this.pollState.getRequested()) != Long.MAX_VALUE) continue;
                        this.scheduleIfRequired();
                        return;
                    } while (!this.pollState.setRequested(r, u = Operators.addCap((long)r, (long)toAdd)));
                    if (u > 0L) {
                        this.scheduleIfRequired();
                    }
                    return;
                }
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onRequest(%d): Dropping, subscription canceled", this.key, toAdd));
                }
            });
            this.sink.onCancel(this.pollState::cancel);
        }

        private void scheduleIfRequired() {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired()", this.key));
            }
            if (this.pollState.isScheduled()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired(): Already scheduled", this.key));
                }
                return;
            }
            if (!this.pollState.isSubscriptionActive()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired(): Subscription cancelled", this.key));
                }
                return;
            }
            if (this.pollState.getRequested() > 0L && !this.overflow.isEmpty()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.info((Object)String.format("[stream: %s] scheduleIfRequired(): Requested: %d Emit from buffer", this.key, this.pollState.getRequested()));
                }
                this.emitBuffer();
            }
            if (this.pollState.getRequested() == 0L) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired(): Subscriber has no demand. Suspending subscription.", this.key));
                }
                return;
            }
            if (this.pollState.getRequested() <= 0L) {
                return;
            }
            if (this.pollState.activateSchedule()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired(): Activating subscription", this.key));
                }
                ReadOffset readOffset = this.pollState.getCurrentReadOffset();
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] scheduleIfRequired(): Activating subscription, offset %s", this.key, readOffset));
                }
                Flux poll = this.readFunction.apply(this.key, readOffset);
                poll.subscribe(this.getSubscriber());
            }
        }

        private CoreSubscriber<V> getSubscriber() {
            return new CoreSubscriber<V>(){

                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                public void onNext(V message) {
                    StreamSubscription.this.onStreamMessage(message);
                }

                public void onError(Throwable t) {
                    StreamSubscription.this.onStreamError(t);
                }

                public void onComplete() {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onComplete()", StreamSubscription.this.key));
                    }
                    StreamSubscription.this.pollState.scheduleCompleted();
                    StreamSubscription.this.scheduleIfRequired();
                }

                public Context currentContext() {
                    return StreamSubscription.this.sink.currentContext();
                }
            };
        }

        private void onStreamMessage(V message) {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamMessage(%s)", this.key, message));
            }
            this.pollState.updateReadOffset(message.getId().getValue());
            long requested = this.pollState.getRequested();
            if (requested > 0L) {
                if (requested == Long.MAX_VALUE) {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamMessage(%s): Emitting item, fast-path", this.key, message));
                    }
                    this.sink.next(message);
                } else if (this.pollState.decrementRequested()) {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamMessage(%s): Emitting item, slow-path", this.key, message));
                    }
                    this.sink.next(message);
                } else {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamMessage(%s): Buffering overflow", this.key, message));
                    }
                    this.overflow.add(message);
                }
            } else {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamMessage(%s): Buffering overflow", this.key, message));
                }
                this.overflow.offer(message);
            }
        }

        private void onStreamError(Throwable t) {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] onStreamError(%s)", this.key, t));
            }
            this.pollState.cancel();
            this.sink.error(t);
        }

        private void emitBuffer() {
            long demand;
            while (!this.overflow.isEmpty() && (demand = this.pollState.getRequested()) > 0L) {
                Record message;
                if (demand == Long.MAX_VALUE) {
                    message = (Record)this.overflow.poll();
                    if (message == null) {
                        if (!DefaultStreamReceiver.this.logger.isDebugEnabled()) break;
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] emitBuffer(): emission missed", this.key));
                        break;
                    }
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] emitBuffer(%s): Emitting item from buffer, fast-path", this.key, message));
                    }
                    this.sink.next((Object)message);
                    continue;
                }
                if (!this.pollState.setRequested(demand, demand - 1L)) continue;
                message = (Record)this.overflow.poll();
                if (message == null) {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] emitBuffer(): emission missed", this.key));
                    }
                    this.pollState.incrementRequested();
                    break;
                }
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug((Object)String.format("[stream: %s] emitBuffer(%s): Emitting item from buffer, slow-path", this.key, message));
                }
                this.sink.next((Object)message);
            }
        }

        public StreamSubscription(FluxSink<V> sink, K key, PollState pollState, BiFunction<K, ReadOffset, Flux<V>> readFunction) {
            this.sink = sink;
            this.key = key;
            this.pollState = pollState;
            this.readFunction = readFunction;
        }
    }
}

