/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTracker;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class AckSetTrackerImpl
extends ProxyService
implements AckSetTracker {
    private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();
    @GuardedBy(value="this")
    private final Committer committer;
    @GuardedBy(value="this")
    private final Deque<Receipt> receipts = new ArrayDeque<Receipt>();
    @GuardedBy(value="this")
    private final PriorityQueue<Offset> acks = new PriorityQueue();
    @GuardedBy(value="this")
    private long generation = 0L;
    @GuardedBy(value="this")
    private boolean shutdown = false;

    public AckSetTrackerImpl(Committer committer) throws ApiException {
        super(new ApiService[0]);
        this.committer = committer;
        this.addServices(committer);
    }

    @Override
    public synchronized Runnable track(SequencedMessage message) throws CheckedApiException {
        CheckedApiPreconditions.checkArgument(this.receipts.isEmpty() || this.receipts.peekLast().offset.value() < message.offset().value());
        Receipt receipt = new Receipt(message.offset(), this.generation, this);
        this.receipts.addLast(receipt);
        return receipt::onAck;
    }

    @Override
    public synchronized void waitUntilCommitted() throws CheckedApiException {
        ++this.generation;
        this.receipts.clear();
        this.acks.clear();
        this.committer.waitUntilEmpty();
    }

    private synchronized void onAck(Offset offset, long generation) {
        if (this.shutdown) {
            ((GoogleLogger.Api)LOGGER.atFine()).log("Dropping ack after tracker shutdown.");
            return;
        }
        if (generation != this.generation) {
            ((GoogleLogger.Api)LOGGER.atFine()).log("Dropping ack from wrong generation (admin seek occurred).");
            return;
        }
        this.acks.add(offset);
        Optional<Object> prefixAckedOffset = Optional.empty();
        while (!this.receipts.isEmpty() && !this.acks.isEmpty() && this.receipts.peekFirst().offset.value() == this.acks.peek().value()) {
            prefixAckedOffset = Optional.of((Offset)this.acks.remove());
            this.receipts.removeFirst();
        }
        if (prefixAckedOffset.isPresent()) {
            ApiFuture<Void> future = this.committer.commitOffset(Offset.of(((Offset)prefixAckedOffset.get()).value() + 1L));
            ExtractStatus.addFailureHandler(future, x$0 -> this.onPermanentError((CheckedApiException)x$0));
        }
    }

    @Override
    protected void start() throws CheckedApiException {
    }

    @Override
    protected synchronized void stop() throws CheckedApiException {
        this.shutdown = true;
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
    }

    private static class Receipt {
        final Offset offset;
        final long generation;
        private final AtomicBoolean wasAcked = new AtomicBoolean();
        private final AckSetTrackerImpl tracker;

        Receipt(Offset offset, long generation, AckSetTrackerImpl tracker) {
            this.offset = offset;
            this.generation = generation;
            this.tracker = tracker;
        }

        void onAck() throws ApiException {
            if (this.wasAcked.getAndSet(true)) {
                CheckedApiException e = new CheckedApiException("Duplicate acks are not allowed.", StatusCode.Code.FAILED_PRECONDITION);
                this.tracker.onPermanentError(e);
                throw e.underlying;
            }
            this.tracker.onAck(this.offset, this.generation);
        }
    }
}

