/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.pubsub;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.util.ObjectUtils;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Reference;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class QueueReference<T, M>
implements Reference<M> {
    private final Class<M> eClass;
    @NotNull
    private final ChronicleQueueView<T, M> chronicleQueue;
    private final T name;
    @NotNull
    private final Asset asset;
    private final Map<Subscriber<M>, AtomicBoolean> subscribers = new HashMap<Subscriber<M>, AtomicBoolean>();
    private EventLoop eventLoop;
    @Nullable
    private QueueView.Tailer<T, M> tailer;

    public QueueReference(Class type, @NotNull Asset asset, QueueView<T, M> chronicleQueue, T name) {
        this.eClass = type;
        this.chronicleQueue = (ChronicleQueueView)chronicleQueue;
        this.name = name;
        this.eventLoop = asset.root().acquireView(EventLoop.class);
        this.asset = asset;
        this.tailer = this.chronicleQueue.tailer();
    }

    public QueueReference(@NotNull RequestContext requestContext, @NotNull Asset asset, QueueView<T, M> queueView) {
        this(requestContext.type(), asset, queueView, ObjectUtils.convertTo((Class)requestContext.type(), (Object)requestContext.name()));
    }

    @Override
    public long set(@NotNull M event) {
        return this.chronicleQueue.publishAndIndex(this.name, event);
    }

    @Override
    @Nullable
    public M get() {
        QueueView.Excerpt<T, M> next = this.tailer.read();
        if (next == null) {
            return null;
        }
        return next.message();
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void registerSubscriber(boolean bootstrap, int throttlePeriodMs, @NotNull Subscriber<M> subscriber) throws AssetNotFoundException {
        AtomicBoolean terminate = new AtomicBoolean();
        this.subscribers.put(subscriber, terminate);
        ChronicleQueueView chronicleQueue = (ChronicleQueueView)this.asset.acquireView(QueueView.class);
        QueueView.Tailer iterator = chronicleQueue.tailer();
        this.eventLoop.addHandler(() -> {
            if (terminate.get()) {
                throw new InvalidEventHandlerException();
            }
            QueueView.Excerpt item = iterator.read();
            if (item == null || item.index() == -1L) {
                return false;
            }
            try {
                subscriber.onMessage(item.message());
            }
            catch (InvalidSubscriberException e) {
                terminate.set(true);
            }
            return true;
        });
    }

    @Override
    public void unregisterSubscriber(Subscriber subscriber) {
        AtomicBoolean terminator = this.subscribers.remove(subscriber);
        if (terminator != null) {
            terminator.set(true);
        }
    }

    @Override
    public int subscriberCount() {
        return this.subscribers.size();
    }

    @Override
    public Class getType() {
        return this.eClass;
    }
}

