/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.implementation.SynchronousReceiveWork;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class SynchronousEventSubscriber
extends BaseSubscriber<PartitionEvent> {
    private final Timer timer = new Timer();
    private final ClientLogger logger = new ClientLogger(SynchronousEventSubscriber.class);
    private final SynchronousReceiveWork work;
    private volatile Subscription subscription;

    public SynchronousEventSubscriber(SynchronousReceiveWork work) {
        this.work = Objects.requireNonNull(work, "'work' cannot be null.");
    }

    protected void hookOnSubscribe(Subscription subscription) {
        if (this.subscription == null) {
            this.subscription = subscription;
        }
        this.logger.info("Work: {}, Pending: {}, Scheduling receive timeout task.", new Object[]{this.work.getId(), this.work.getNumberOfEvents()});
        subscription.request((long)this.work.getNumberOfEvents());
        this.timer.schedule((TimerTask)new ReceiveTimeoutTask(this.work.getId(), this::dispose), this.work.getTimeout().toMillis());
    }

    protected void hookOnNext(PartitionEvent value) {
        this.work.next(value);
        if (this.work.isTerminal()) {
            this.logger.info("Work: {}. Completed. Closing Flux and cancelling subscription.", new Object[]{this.work.getId()});
            this.dispose();
        }
    }

    protected void hookOnComplete() {
        this.logger.info("Completed. No events to listen to.");
        this.dispose();
    }

    protected void hookOnError(Throwable throwable) {
        this.logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, new Object[]{throwable});
        this.work.error(throwable);
        this.dispose();
    }

    public void dispose() {
        this.work.complete();
        this.subscription.cancel();
        this.timer.cancel();
        super.dispose();
    }

    private static class ReceiveTimeoutTask
    extends TimerTask {
        private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class);
        private final long workId;
        private final Runnable onDispose;

        ReceiveTimeoutTask(long workId, Runnable onDispose) {
            this.workId = workId;
            this.onDispose = onDispose;
        }

        @Override
        public void run() {
            this.logger.info("Work: {}. Timeout encountered, disposing of subscriber.", new Object[]{this.workId});
            this.onDispose.run();
        }
    }
}

