/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.pubsub;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.core.util.SerializableBiFunction;
import net.openhft.chronicle.core.util.SerializableFunction;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Reference;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.ReferenceHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteReference<E>
extends AbstractStatelessClient<ReferenceHandler.EventId>
implements Reference<E> {
    private static final Logger LOG = LoggerFactory.getLogger(ReferenceHandler.class);
    private final Class<E> messageClass;
    private final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();

    public RemoteReference(@NotNull RequestContext requestContext, @NotNull Asset asset) {
        this(asset.findView(TcpChannelHub.class), requestContext.messageType(), asset.fullName());
    }

    public RemoteReference(@NotNull TcpChannelHub hub, Class<E> messageClass, String fullName) throws AssetNotFoundException {
        super(hub, 0L, RemoteReference.toUri(fullName, messageClass));
        this.messageClass = messageClass;
    }

    private static String toUri(String fullName, Class messageClass) {
        StringBuilder uri = new StringBuilder();
        uri.append(fullName).append("?view=reference");
        if (messageClass != String.class) {
            uri.append("&messageType=").append(ClassAliasPool.CLASS_ALIASES.nameFor(messageClass));
        }
        return uri.toString();
    }

    @Override
    public long set(E event) {
        this.checkEvent(event);
        this.sendEventAsync((WireKey)ReferenceHandler.EventId.set, valueOut -> valueOut.object(event), true);
        return 0L;
    }

    @Override
    @Nullable
    public E get() {
        return (E)this.proxyReturnTypedObject(ReferenceHandler.EventId.get, null, this.messageClass);
    }

    @Override
    @Nullable
    public E getAndSet(E e) {
        return (E)this.proxyReturnTypedObject(ReferenceHandler.EventId.getAndSet, null, this.messageClass, new Object[]{e});
    }

    @Override
    public void remove() {
        this.sendEventAsync((WireKey)ReferenceHandler.EventId.remove, null, true);
    }

    @Override
    @Nullable
    public E getAndRemove() {
        return (E)this.proxyReturnTypedObject(ReferenceHandler.EventId.getAndRemove, null, this.messageClass);
    }

    @Override
    public void unregisterSubscriber(Subscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            Jvm.debug().on(this.getClass(), "No subscriber to unsubscribe");
            return;
        }
        this.hub.preventSubscribeUponReconnect(tid.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(tid.longValue());
            return;
        }
        this.sendEventAsync((WireKey)ReferenceHandler.EventId.unregisterSubscriber, valueOut -> valueOut.int64(tid.longValue()), false);
    }

    @Override
    public int subscriberCount() {
        return this.proxyReturnInt((WireKey)ReferenceHandler.EventId.countSubscribers);
    }

    @Override
    public void registerSubscriber(boolean bootstrap, int throttlePeriodMs, final @NotNull Subscriber subscriber) throws AssetNotFoundException {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        AbstractAsyncSubscription asyncSubscription = new AbstractAsyncSubscription(this.hub, this.csp + "&bootstrap=" + bootstrap + "&throttlePeriodMs=" + throttlePeriodMs, "Remote Ref registerSubscriber"){

            public void onSubscribe(@NotNull WireOut wireOut) {
                RemoteReference.this.subscribersToTid.put(subscriber, this.tid());
                wireOut.writeEventName((WireKey)ReferenceHandler.EventId.registerSubscriber).text("");
            }

            public void onConsumer(@NotNull WireIn w) {
                w.readDocument(null, d -> {
                    StringBuilder eventname = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(eventname);
                    if (ReferenceHandler.EventId.onEndOfSubscription.contentEquals(eventname)) {
                        subscriber.onEndOfSubscription();
                        RemoteReference.this.subscribersToTid.remove((Object)this);
                        RemoteReference.this.hub.unsubscribe(this.tid());
                    } else if (CoreFields.reply.contentEquals((CharSequence)eventname)) {
                        valueIn.marshallable(m -> {
                            Object message = m.read(() -> "message").object(RemoteReference.this.messageClass);
                            RemoteReference.this.onEvent(message, subscriber);
                        });
                    }
                });
            }
        };
        this.hub.subscribe((AsyncSubscription)asyncSubscription);
    }

    @Override
    public Class getType() {
        return this.messageClass;
    }

    void onEvent(@Nullable E message, @NotNull Subscriber<E> subscriber) {
        if (message == null) {
            return;
        }
        try {
            subscriber.onMessage(message);
        }
        catch (InvalidSubscriberException noLongerValid) {
            this.unregisterSubscriber(subscriber);
        }
    }

    private void checkEvent(@Nullable Object key) {
        if (key == null) {
            throw new NullPointerException("event can not be null");
        }
    }

    @Override
    @Nullable
    public <R> R applyTo(@NotNull SerializableFunction<E, R> function) {
        return this.applyTo((SerializableBiFunction & Serializable)(x, $) -> function.apply(x), null);
    }

    @Override
    public void asyncUpdate(@NotNull SerializableFunction<E, E> updateFunction) {
        this.asyncUpdate((SerializableBiFunction & Serializable)(x, $) -> updateFunction.apply(x), null);
    }

    @Override
    @Nullable
    public <R> R syncUpdate(@NotNull SerializableFunction<E, E> updateFunction, @NotNull SerializableFunction<E, R> returnFunction) {
        return this.syncUpdate((SerializableBiFunction & Serializable)(x, $) -> updateFunction.apply(x), null, (SerializableBiFunction & Serializable)(x, $) -> returnFunction.apply(x), null);
    }

    @Override
    @Nullable
    public <T, R> R applyTo(@NotNull SerializableBiFunction<E, T, R> function, T argument) {
        return (R)super.proxyReturnTypedObject((ParameterizeWireKey)ReferenceHandler.EventId.applyTo2, null, Object.class, new Object[]{function, argument});
    }

    @Override
    public <T> void asyncUpdate(@NotNull SerializableBiFunction<E, T, E> updateFunction, T argument) {
        this.sendEventAsync((WireKey)ReferenceHandler.EventId.update2, RemoteReference.toParameters((ParameterizeWireKey)ReferenceHandler.EventId.update2, (Object[])new Object[]{updateFunction, argument}), true);
    }

    @Override
    @Nullable
    public <UT, RT, R> R syncUpdate(@NotNull SerializableBiFunction<E, UT, E> updateFunction, @Nullable UT updateArgument, @NotNull SerializableBiFunction<E, RT, R> returnFunction, @Nullable RT returnArgument) {
        return (R)this.proxyReturnTypedObject(ReferenceHandler.EventId.update4, null, Object.class, new Object[]{updateFunction, updateArgument, returnFunction, returnArgument});
    }
}

