/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.map;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.CMap2EngineReplicator;
import net.openhft.chronicle.engine.map.ChronicleMapKeyValueStore;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.MapWireHandler;
import net.openhft.chronicle.engine.server.internal.ReplicationHandler2;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractAsyncTemporarySubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ReplicationHub
extends AbstractStatelessClient {
    private static final Logger LOG = LoggerFactory.getLogger(ChronicleMapKeyValueStore.class);
    final ThreadLocal<CMap2EngineReplicator.VanillaReplicatedEntry> vre = ThreadLocal.withInitial(CMap2EngineReplicator.VanillaReplicatedEntry::new);
    @NotNull
    private final EventLoop eventLoop;
    @NotNull
    private final AtomicBoolean isClosed;
    @NotNull
    private final Function<Bytes, Wire> wireType;

    public ReplicationHub(@NotNull RequestContext context, @NotNull TcpChannelHub hub, @NotNull EventLoop eventLoop, @NotNull AtomicBoolean isClosed, @NotNull Function<Bytes, Wire> wireType) {
        super(hub, 0L, ReplicationHub.toUri(context));
        this.eventLoop = eventLoop;
        this.isClosed = isClosed;
        this.wireType = wireType;
    }

    private static String toUri(@NotNull RequestContext context) {
        StringBuilder uri = new StringBuilder(context.fullName() + "?view=Replication");
        if (context.keyType() != String.class) {
            uri.append("&keyType=").append(context.keyType().getName());
        }
        if (context.valueType() != String.class) {
            uri.append("&valueType=").append(context.valueType().getName());
        }
        return uri.toString();
    }

    public void bootstrap(final @NotNull EngineReplication replication, final byte localIdentifier, final byte remoteIdentifier) {
        this.hub.subscribe((AsyncSubscription)new AbstractAsyncSubscription(this.hub, this.csp, localIdentifier, "ReplicationHub bootstrap"){

            public void onSubscribe(@NotNull WireOut wireOut) {
                if (LOG.isDebugEnabled()) {
                    Jvm.debug().on(((Object)((Object)this)).getClass(), "onSubscribe - localIdentifier=" + localIdentifier + ",remoteIdentifier=" + remoteIdentifier);
                }
                wireOut.writeEventName((WireKey)ReplicationHandler2.EventId.identifier).marshallable(WriteMarshallable.EMPTY).writeComment((CharSequence)(this.toString() + ", tcpChannelHub={" + ReplicationHub.this.hub.toString() + "}"));
            }

            public void onConsumer(@NotNull WireIn inWire) {
                if (Jvm.isDebug()) {
                    LOG.info("client : bootstrap");
                }
                inWire.readDocument(null, d -> {
                    byte remoteIdentifier2 = d.read((WireKey)ReplicationHandler2.EventId.identifierReply).int8();
                    ReplicationHub.this.onConnected(localIdentifier, remoteIdentifier2, replication);
                });
            }

            @NotNull
            public String toString() {
                return "bootstrap {localIdentifier=" + localIdentifier + " ,remoteIdentifier=" + remoteIdentifier + "}";
            }
        });
    }

    private void onConnected(byte localIdentifier, final byte remoteIdentifier, final @NotNull EngineReplication replication) {
        final EngineReplication.ModificationIterator mi = replication.acquireModificationIterator(remoteIdentifier);
        assert (mi != null);
        long lastModificationTime = replication.lastModificationTime(remoteIdentifier);
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.lastUpdatedTime(lastModificationTime);
        bootstrap.identifier(localIdentifier);
        this.hub.subscribe((AsyncSubscription)new AbstractAsyncTemporarySubscription(this.hub, this.csp, localIdentifier, "replication onConnected"){
            int count;
            {
                super(arg0, arg1, arg2, arg3);
                this.count = 0;
            }

            public void onSubscribe(@NotNull WireOut wireOut) {
                wireOut.writeEventName((WireKey)MapWireHandler.EventId.bootstrap).typedMarshallable((WriteMarshallable)bootstrap);
            }

            public void onConsumer(@NotNull WireIn inWire) {
                if (Jvm.isDebug()) {
                    LOG.info("client : onConsumer - publishing updates");
                }
                inWire.readDocument(null, d -> {
                    StringBuilder eventName = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(eventName);
                    if (ReplicationHandler2.EventId.bootstrap.contentEquals(eventName)) {
                        Bootstrap b = (Bootstrap)((Object)((Object)valueIn.typedMarshallable()));
                        try {
                            ReplicationHub.this.publish(mi, b, remoteIdentifier);
                        }
                        catch (RuntimeException e) {
                            Jvm.warn().on(((Object)((Object)this)).getClass(), (Throwable)e);
                        }
                        return;
                    }
                    if (ReplicationHandler2.EventId.replicationEvent.contentEquals(eventName)) {
                        long delay;
                        CMap2EngineReplicator.VanillaReplicatedEntry replicatedEntry = ReplicationHub.this.vre.get();
                        valueIn.marshallable((ReadMarshallable)replicatedEntry);
                        if (LOG.isInfoEnabled() && (delay = System.currentTimeMillis() - replicatedEntry.timestamp()) > 100L) {
                            LOG.info("Rcv Clt latency=" + delay + "ms\t");
                            if (this.count++ % 10 == 0) {
                                LOG.info("");
                            }
                        }
                        replication.applyReplication(replicatedEntry);
                    } else if (CoreFields.lastUpdateTime.contentEquals((CharSequence)eventName)) {
                        if (Jvm.isDebug()) {
                            Jvm.debug().on(((Object)((Object)this)).getClass(), "server : received lastUpdateTime");
                        }
                        long time = valueIn.int64();
                        byte id = d.read(() -> "id").int8();
                        replication.setLastModificationTime(id, time);
                    }
                });
            }
        });
    }

    void publish(@NotNull EngineReplication.ModificationIterator mi, @NotNull Bootstrap remote, byte remoteIdentifier) {
        TcpChannelHub hub = this.hub;
        mi.setModificationNotifier(() -> ((EventLoop)this.eventLoop).unpause());
        this.eventLoop.addHandler(true, (EventHandler)new RepEventHandler(hub, mi, remoteIdentifier));
        mi.dirtyEntries(remote.lastUpdatedTime());
    }

    private class RepEventHandler
    implements EventHandler,
    Consumer<EngineReplication.ReplicationEntry> {
        final Bytes bytes;
        final Wire wire;
        private final TcpChannelHub hub;
        private final EngineReplication.ModificationIterator mi;
        private final byte remoteIdentifier;
        boolean hasSentLastUpdateTime;
        long lastUpdateTime;
        boolean hasLogged;

        public RepEventHandler(TcpChannelHub hub, EngineReplication.ModificationIterator mi, byte remoteIdentifier) {
            this.hub = hub;
            this.mi = mi;
            this.remoteIdentifier = remoteIdentifier;
            this.bytes = Bytes.elasticByteBuffer();
            this.wire = (Wire)ReplicationHub.this.wireType.apply(this.bytes);
            this.hasSentLastUpdateTime = false;
            this.lastUpdateTime = 0L;
        }

        public boolean action() throws InvalidEventHandlerException {
            if (this.hub.isOutBytesLocked()) {
                return false;
            }
            if (!this.hub.isOutBytesEmpty()) {
                return false;
            }
            if (ReplicationHub.this.isClosed.get()) {
                throw new InvalidEventHandlerException();
            }
            this.bytes.clear();
            if (!this.mi.hasNext() && !this.hasSentLastUpdateTime && this.lastUpdateTime > 0L) {
                this.wire.writeNotCompleteDocument(false, wire -> {
                    wire.writeEventName((WireKey)CoreFields.lastUpdateTime).int64(this.lastUpdateTime);
                    wire.write(() -> "id").int8(this.remoteIdentifier);
                });
                this.hasSentLastUpdateTime = true;
                if (!this.hasLogged) {
                    this.hasLogged = true;
                }
                if (this.bytes.readRemaining() > 0L) {
                    ReplicationHub.this.sendBytes(this.bytes, false);
                    return true;
                }
                return false;
            }
            this.mi.nextEntry(this);
            if (this.bytes.readRemaining() > 0L) {
                ReplicationHub.this.sendBytes(this.bytes, false);
                return true;
            }
            return false;
        }

        @Override
        public void accept(@NotNull EngineReplication.ReplicationEntry e) {
            long updateTime = Math.max(this.lastUpdateTime, e.timestamp());
            if (updateTime > this.lastUpdateTime) {
                this.hasSentLastUpdateTime = false;
                this.lastUpdateTime = updateTime;
            }
            if (Jvm.isDebug() && LOG.isDebugEnabled()) {
                long delay = System.currentTimeMillis() - e.timestamp();
                Jvm.debug().on(this.getClass(), "*****\t\t\t\tSENT : CLIENT :replicatedEntry latency=" + delay + "ms");
            }
            this.wire.writeNotCompleteDocument(false, wireOut -> wireOut.writeEventName((WireKey)ReplicationHandler2.EventId.replicationEvent).typedMarshallable((WriteMarshallable)e));
        }

        @NotNull
        public HandlerPriority priority() {
            return HandlerPriority.REPLICATION;
        }
    }
}

