/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.internals.AckMode;
import reactor.kafka.receiver.internals.AtmostOnceOffsets;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.SeekablePartition;

class ConsumerEventLoop<K, V>
implements Sinks.EmitFailureHandler {
    private static final Logger log = LoggerFactory.getLogger((String)ConsumerEventLoop.class.getName());
    final AtomicBoolean isActive = new AtomicBoolean(true);
    final AtmostOnceOffsets atmostOnceOffsets;
    final PollEvent pollEvent;
    final AckMode ackMode;
    final ReceiverOptions<K, V> receiverOptions;
    final Scheduler eventScheduler;
    final CommitEvent commitEvent = new CommitEvent();
    final Predicate<Throwable> isRetriableException;
    final Set<TopicPartition> pausedByUser = new HashSet<TopicPartition>();
    private final Disposable periodicCommitDisposable;
    Consumer<K, V> consumer;
    final Sinks.Many<ConsumerRecords<K, V>> sink;
    final AtomicBoolean awaitingTransaction;
    volatile long requested;
    static final AtomicLongFieldUpdater<ConsumerEventLoop> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConsumerEventLoop.class, "requested");

    ConsumerEventLoop(AckMode ackMode, AtmostOnceOffsets atmostOnceOffsets, ReceiverOptions<K, V> receiverOptions, Scheduler eventScheduler, Consumer<K, V> consumer, Predicate<Throwable> isRetriableException, Sinks.Many<ConsumerRecords<K, V>> sink, AtomicBoolean awaitingTransaction) {
        this.ackMode = ackMode;
        this.atmostOnceOffsets = atmostOnceOffsets;
        this.receiverOptions = receiverOptions;
        this.eventScheduler = eventScheduler;
        this.consumer = consumer;
        this.isRetriableException = isRetriableException;
        this.sink = sink;
        this.awaitingTransaction = awaitingTransaction;
        this.pollEvent = new PollEvent();
        this.commitEvent.commitBatch.outOfOrderCommits = receiverOptions.maxDeferredCommits() > 0;
        eventScheduler.schedule((Runnable)new SubscribeEvent());
        Duration commitInterval = receiverOptions.commitInterval();
        if (!commitInterval.isZero()) {
            switch (ackMode) {
                case AUTO_ACK: 
                case MANUAL_ACK: {
                    this.periodicCommitDisposable = Schedulers.parallel().schedulePeriodically(this.commitEvent::scheduleIfRequired, commitInterval.toMillis(), commitInterval.toMillis(), TimeUnit.MILLISECONDS);
                    break;
                }
                default: {
                    this.periodicCommitDisposable = Disposables.disposed();
                    break;
                }
            }
        } else {
            this.periodicCommitDisposable = Disposables.disposed();
        }
    }

    void paused(Collection<TopicPartition> paused) {
        this.pausedByUser.addAll(paused);
    }

    void resumed(Collection<TopicPartition> resumed) {
        this.pausedByUser.removeAll(resumed);
    }

    void onRequest(long toAdd) {
        if (log.isDebugEnabled()) {
            log.debug("onRequest.toAdd {}, paused {}", (Object)toAdd, (Object)this.pollEvent.isPaused());
        }
        Operators.addCap(REQUESTED, (Object)this, (long)toAdd);
        if (this.pollEvent.isPaused()) {
            this.consumer.wakeup();
        }
        this.pollEvent.schedule();
    }

    private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsRevoked {}", partitions);
        if (!partitions.isEmpty()) {
            if (this.ackMode != AckMode.ATMOST_ONCE) {
                this.commitEvent.runIfRequired(true);
                long maxDelayRebalance = this.receiverOptions.maxDelayRebalance().toMillis();
                if (this.isActive.get() && maxDelayRebalance > 0L) {
                    long interval = this.receiverOptions.commitIntervalDuringDelay();
                    int inPipeline = this.commitEvent.commitBatch.getInPipeline();
                    if (inPipeline > 0 || this.awaitingTransaction.get()) {
                        long end = maxDelayRebalance + System.currentTimeMillis();
                        do {
                            try {
                                log.debug("Rebalancing; waiting for {} records in pipeline", (Object)inPipeline);
                                Thread.sleep(interval);
                                this.commitEvent.runIfRequired(true);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                            inPipeline = this.commitEvent.commitBatch.getInPipeline();
                        } while (this.isActive.get() && (inPipeline > 0 || this.awaitingTransaction.get()) && System.currentTimeMillis() < end);
                    }
                }
            }
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onRevoke : this.receiverOptions.revokeListeners()) {
                onRevoke.accept(this.toSeekable(partitions));
            }
        }
    }

    private Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> partitions) {
        ArrayList<ReceiverPartition> seekableList = new ArrayList<ReceiverPartition>(partitions.size());
        for (TopicPartition partition : partitions) {
            seekableList.add(new SeekablePartition(this.consumer, partition));
        }
        return seekableList;
    }

    Mono<Void> stop() {
        return Mono.defer(() -> {
            log.debug("dispose {}", (Object)this.isActive);
            if (!this.isActive.compareAndSet(true, false)) {
                return Mono.empty();
            }
            this.periodicCommitDisposable.dispose();
            if (this.consumer == null) {
                return Mono.empty();
            }
            this.consumer.wakeup();
            return (Mono)Mono.fromRunnable((Runnable)new CloseEvent(this.receiverOptions.closeTimeout())).as(flux -> flux.subscribeOn(this.eventScheduler));
        }).onErrorResume(e -> {
            log.warn("Cancel exception: " + e);
            return Mono.empty();
        });
    }

    public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult result) {
        if (!this.isActive.get()) {
            return false;
        }
        return result == Sinks.EmitResult.FAIL_NON_SERIALIZED;
    }

    private class CloseEvent
    implements Runnable {
        private final long closeEndTimeMillis;

        CloseEvent(Duration timeout) {
            this.closeEndTimeMillis = System.currentTimeMillis() + timeout.toMillis();
        }

        @Override
        public void run() {
            block9: {
                try {
                    if (ConsumerEventLoop.this.consumer == null) break block9;
                    Collection<TopicPartition> manualAssignment = ConsumerEventLoop.this.receiverOptions.assignment();
                    if (manualAssignment != null && !manualAssignment.isEmpty()) {
                        ConsumerEventLoop.this.onPartitionsRevoked(manualAssignment);
                    }
                    int attempts = 3;
                    for (int i = 0; i < attempts; ++i) {
                        try {
                            long timeoutMillis;
                            boolean forceCommit = true;
                            if (ConsumerEventLoop.this.ackMode == AckMode.ATMOST_ONCE) {
                                forceCommit = ConsumerEventLoop.this.atmostOnceOffsets.undoCommitAhead(ConsumerEventLoop.this.commitEvent.commitBatch);
                            }
                            if (ConsumerEventLoop.this.ackMode != AckMode.EXACTLY_ONCE) {
                                ConsumerEventLoop.this.commitEvent.runIfRequired(forceCommit);
                                ConsumerEventLoop.this.commitEvent.waitFor(this.closeEndTimeMillis);
                            }
                            if ((timeoutMillis = this.closeEndTimeMillis - System.currentTimeMillis()) < 0L) {
                                timeoutMillis = 0L;
                            }
                            ConsumerEventLoop.this.consumer.close(Duration.ofMillis(timeoutMillis));
                            ConsumerEventLoop.this.consumer = null;
                            break;
                        }
                        catch (WakeupException e) {
                            if (i != attempts - 1) continue;
                            throw e;
                        }
                    }
                }
                catch (Exception e) {
                    log.error("Unexpected exception during close", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }
    }

    class CommitEvent
    implements Runnable {
        final CommittableBatch commitBatch = new CommittableBatch();
        private final AtomicBoolean isPending = new AtomicBoolean();
        private final AtomicInteger inProgress = new AtomicInteger();
        private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();
        private final AtomicBoolean retrying = new AtomicBoolean();

        CommitEvent() {
        }

        @Override
        public void run() {
            block13: {
                if (!this.isPending.compareAndSet(true, false)) {
                    return;
                }
                CommittableBatch.CommitArgs commitArgs = this.commitBatch.getAndClearOffsets();
                try {
                    if (commitArgs == null) break block13;
                    if (!commitArgs.offsets().isEmpty()) {
                        switch (ConsumerEventLoop.this.ackMode) {
                            case ATMOST_ONCE: {
                                if (log.isDebugEnabled()) {
                                    log.debug("Sync committing: " + commitArgs.offsets());
                                }
                                ConsumerEventLoop.this.consumer.commitSync(commitArgs.offsets());
                                this.handleSuccess(commitArgs, commitArgs.offsets());
                                ConsumerEventLoop.this.atmostOnceOffsets.onCommit(commitArgs.offsets());
                                break;
                            }
                            case EXACTLY_ONCE: {
                                break;
                            }
                            case AUTO_ACK: 
                            case MANUAL_ACK: {
                                this.inProgress.incrementAndGet();
                                try {
                                    if (log.isDebugEnabled()) {
                                        log.debug("Async committing: " + commitArgs.offsets());
                                    }
                                    ConsumerEventLoop.this.consumer.commitAsync(commitArgs.offsets(), (offsets, exception) -> {
                                        this.inProgress.decrementAndGet();
                                        if (exception == null) {
                                            this.handleSuccess(commitArgs, offsets);
                                        } else {
                                            this.handleFailure(commitArgs, exception);
                                        }
                                    });
                                }
                                catch (Throwable e) {
                                    this.inProgress.decrementAndGet();
                                    throw e;
                                }
                                ConsumerEventLoop.this.pollEvent.schedule();
                            }
                        }
                        break block13;
                    }
                    this.handleSuccess(commitArgs, commitArgs.offsets());
                }
                catch (Exception e) {
                    log.error("Unexpected exception", (Throwable)e);
                    this.handleFailure(commitArgs, e);
                }
            }
        }

        void runIfRequired(boolean force) {
            if (force) {
                this.isPending.set(true);
            }
            if (!this.retrying.get() && this.isPending.get()) {
                this.run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (!offsets.isEmpty()) {
                this.consecutiveCommitFailures.set(0);
            }
            this.pollTaskAfterRetry();
            if (commitArgs.callbackEmitters() != null) {
                for (MonoSink<Void> emitter : commitArgs.callbackEmitters()) {
                    emitter.success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exception) {
            boolean mayRetry;
            log.warn("Commit failed", (Throwable)exception);
            boolean bl = mayRetry = ConsumerEventLoop.this.isRetriableException.test(exception) && ConsumerEventLoop.this.consumer != null && this.consecutiveCommitFailures.incrementAndGet() < ConsumerEventLoop.this.receiverOptions.maxCommitAttempts();
            if (!mayRetry) {
                log.debug("Cannot retry");
                this.pollTaskAfterRetry();
                List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
                if (callbackEmitters != null && !callbackEmitters.isEmpty()) {
                    this.isPending.set(false);
                    this.commitBatch.restoreOffsets(commitArgs, false);
                    for (MonoSink<Void> emitter : callbackEmitters) {
                        emitter.error((Throwable)exception);
                    }
                } else {
                    ConsumerEventLoop.this.sink.emitError((Throwable)exception, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            } else {
                this.commitBatch.restoreOffsets(commitArgs, true);
                log.warn("Commit failed with exception" + exception + ", retries remaining " + (ConsumerEventLoop.this.receiverOptions.maxCommitAttempts() - this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                this.retrying.set(true);
                ConsumerEventLoop.this.pollEvent.schedule();
                ConsumerEventLoop.this.eventScheduler.schedule((Runnable)this, ConsumerEventLoop.this.receiverOptions.commitRetryInterval().toMillis(), TimeUnit.MILLISECONDS);
            }
        }

        private void pollTaskAfterRetry() {
            if (log.isTraceEnabled()) {
                log.trace("after retry " + this.retrying.get());
            }
            if (this.retrying.getAndSet(false)) {
                ConsumerEventLoop.this.pollEvent.schedule();
            }
        }

        void scheduleIfRequired() {
            if (ConsumerEventLoop.this.isActive.get() && !this.retrying.get() && this.isPending.compareAndSet(false, true)) {
                ConsumerEventLoop.this.eventScheduler.schedule((Runnable)this);
            }
        }

        private void waitFor(long endTimeMillis) {
            while (this.inProgress.get() > 0 && endTimeMillis - System.currentTimeMillis() > 0L) {
                ConsumerEventLoop.this.consumer.poll(Duration.ofMillis(1L));
            }
        }
    }

    class PollEvent
    implements Runnable {
        private final Duration pollTimeout;
        private final AtomicBoolean pausedByUs;
        private final AtomicBoolean scheduled;
        private final long maxDeferredCommits;
        private final CommittableBatch commitBatch;

        PollEvent() {
            this.pollTimeout = ConsumerEventLoop.this.receiverOptions.pollTimeout();
            this.pausedByUs = new AtomicBoolean();
            this.scheduled = new AtomicBoolean();
            this.maxDeferredCommits = ConsumerEventLoop.this.receiverOptions.maxDeferredCommits();
            this.commitBatch = ConsumerEventLoop.this.commitEvent.commitBatch;
        }

        @Override
        public void run() {
            block19: {
                try {
                    ConsumerRecords records;
                    boolean pauseForDeferred;
                    this.scheduled.set(false);
                    if (!ConsumerEventLoop.this.isActive.get()) break block19;
                    ConsumerEventLoop.this.commitEvent.runIfRequired(false);
                    long r = ConsumerEventLoop.this.requested;
                    boolean bl = pauseForDeferred = this.maxDeferredCommits > 0L && (long)this.commitBatch.deferredCount() >= this.maxDeferredCommits;
                    if (pauseForDeferred || ConsumerEventLoop.this.commitEvent.retrying.get()) {
                        r = 0L;
                    }
                    if (r > 0L) {
                        if (!ConsumerEventLoop.this.awaitingTransaction.get()) {
                            if (this.pausedByUs.getAndSet(false)) {
                                HashSet toResume = new HashSet(ConsumerEventLoop.this.consumer.assignment());
                                toResume.removeAll(ConsumerEventLoop.this.pausedByUser);
                                ConsumerEventLoop.this.consumer.resume(toResume);
                                if (log.isDebugEnabled()) {
                                    log.debug("Resumed partitions: " + toResume);
                                }
                            }
                        } else if (this.checkAndSetPausedByUs()) {
                            ConsumerEventLoop.this.consumer.pause((Collection)ConsumerEventLoop.this.consumer.assignment());
                            log.debug("Paused - awaiting transaction");
                        }
                    } else if (this.checkAndSetPausedByUs()) {
                        ConsumerEventLoop.this.consumer.pause((Collection)ConsumerEventLoop.this.consumer.assignment());
                        if (pauseForDeferred) {
                            log.debug("Paused - too many deferred commits");
                        } else if (ConsumerEventLoop.this.commitEvent.retrying.get()) {
                            log.debug("Paused - commits are retrying");
                        } else {
                            log.debug("Paused - back pressure");
                        }
                    }
                    try {
                        records = ConsumerEventLoop.this.consumer.poll(this.pollTimeout);
                    }
                    catch (WakeupException e) {
                        log.debug("Consumer woken");
                        records = ConsumerRecords.empty();
                    }
                    if (ConsumerEventLoop.this.isActive.get()) {
                        this.schedule();
                    }
                    if (!records.isEmpty()) {
                        this.commitBatch.addUncommitted(records);
                        Operators.produced(REQUESTED, (Object)ConsumerEventLoop.this, (long)1L);
                        log.debug("Emitting {} records, requested now {}", (Object)records.count(), (Object)r);
                        ConsumerEventLoop.this.sink.emitNext((Object)records, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                    }
                }
                catch (Exception e) {
                    if (!ConsumerEventLoop.this.isActive.get()) break block19;
                    log.error("Unexpected exception", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }

        private boolean checkAndSetPausedByUs() {
            boolean pausedNow;
            boolean bl = pausedNow = !this.pausedByUs.getAndSet(true);
            if (pausedNow && ConsumerEventLoop.this.requested > 0L && !ConsumerEventLoop.this.commitEvent.retrying.get()) {
                ConsumerEventLoop.this.consumer.wakeup();
            }
            return pausedNow;
        }

        void schedule() {
            if (!this.scheduled.getAndSet(true)) {
                ConsumerEventLoop.this.eventScheduler.schedule((Runnable)this);
            }
        }

        boolean isPaused() {
            return this.pausedByUs.get();
        }
    }

    class SubscribeEvent
    implements Runnable {
        SubscribeEvent() {
        }

        @Override
        public void run() {
            block2: {
                try {
                    ConsumerEventLoop.this.receiverOptions.subscriber(new ConsumerRebalanceListener(){

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            log.debug("onPartitionsAssigned {}", partitions);
                            boolean repausedAll = false;
                            if (!partitions.isEmpty() && ConsumerEventLoop.this.pollEvent.pausedByUs.get()) {
                                log.debug("Rebalance during back pressure, re-pausing new assignments");
                                ConsumerEventLoop.this.consumer.pause(partitions);
                                repausedAll = true;
                            }
                            if (!ConsumerEventLoop.this.pausedByUser.isEmpty()) {
                                ArrayList toRepause = new ArrayList();
                                Iterator<TopicPartition> iterator = ConsumerEventLoop.this.pausedByUser.iterator();
                                while (iterator.hasNext()) {
                                    TopicPartition next = iterator.next();
                                    if (partitions.contains(next)) {
                                        toRepause.add(next);
                                        continue;
                                    }
                                    iterator.remove();
                                }
                                if (!repausedAll && !toRepause.isEmpty()) {
                                    ConsumerEventLoop.this.consumer.pause((Collection)toRepause);
                                }
                            }
                            for (java.util.function.Consumer consumer : ConsumerEventLoop.this.receiverOptions.assignListeners()) {
                                consumer.accept(ConsumerEventLoop.this.toSeekable(partitions));
                            }
                            if (log.isTraceEnabled()) {
                                try {
                                    ArrayList positions = new ArrayList();
                                    partitions.forEach(part -> positions.add(String.format("%s pos: %d", part, ConsumerEventLoop.this.consumer.position(part, Duration.ofSeconds(5L)))));
                                    log.trace("positions: {}, committed: {}", positions, (Object)ConsumerEventLoop.this.consumer.committed(new HashSet<TopicPartition>(partitions), Duration.ofSeconds(5L)));
                                }
                                catch (Exception ex) {
                                    log.error("Failed to get positions or committed", (Throwable)ex);
                                }
                            }
                        }

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            ConsumerEventLoop.this.onPartitionsRevoked(partitions);
                            ConsumerEventLoop.this.pollEvent.commitBatch.partitionsRevoked(partitions);
                        }
                    }).accept(ConsumerEventLoop.this.consumer);
                }
                catch (Exception e) {
                    if (!ConsumerEventLoop.this.isActive.get()) break block2;
                    log.error("Unexpected exception", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }
    }
}

