/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.server.internal;

import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.pubsub.Replication;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.map.CMap2EngineReplicator;
import net.openhft.chronicle.engine.server.internal.EngineWireNetworkContext;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.network.cluster.AbstractSubHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MapReplicationHandler
extends AbstractSubHandler<EngineWireNetworkContext>
implements Demarshallable,
WriteMarshallable {
    private static final Logger LOG = LoggerFactory.getLogger(MapReplicationHandler.class);
    private final ThreadLocal<CMap2EngineReplicator.VanillaReplicatedEntry> vre = ThreadLocal.withInitial(CMap2EngineReplicator.VanillaReplicatedEntry::new);
    private Replication replication;
    private long timestamp;
    private byte localIdentifier;
    private volatile boolean closed;
    @NotNull
    private Class keyType;
    @NotNull
    private Class valueType;

    @UsedViaReflection
    private MapReplicationHandler(@NotNull WireIn wire) {
        this.timestamp = wire.read(() -> "timestamp").int64();
        this.keyType = wire.read(() -> "keyType").typeLiteral();
        this.valueType = wire.read(() -> "valueType").typeLiteral();
    }

    private MapReplicationHandler(long timestamp, @NotNull Class keyType, @NotNull Class valueType) {
        this.timestamp = timestamp;
        this.keyType = keyType;
        this.valueType = valueType;
    }

    @NotNull
    public static WriteMarshallable newMapReplicationHandler(long lastUpdateTime, @NotNull Class keyType, @NotNull Class valueType, String csp, long cid) {
        MapReplicationHandler h = new MapReplicationHandler(lastUpdateTime, keyType, valueType);
        return w -> w.writeDocument(true, d -> d.writeEventName((WireKey)CoreFields.csp).text(csp).writeEventName((WireKey)CoreFields.cid).int64(cid).writeEventName((WireKey)CoreFields.handler).typedMarshallable((WriteMarshallable)h));
    }

    public void writeMarshallable(@NotNull WireOut wire) {
        wire.write((CharSequence)"timestamp").int64(this.timestamp);
        wire.write((CharSequence)"keyType").typeLiteral(this.keyType);
        wire.write((CharSequence)"valueType").typeLiteral(this.valueType);
    }

    public void onRead(@NotNull WireIn inWire, @NotNull WireOut outWire) {
        StringBuilder eventName = Wires.acquireStringBuilder();
        ValueIn valueIn = inWire.readEventName(eventName);
        if (CoreFields.lastUpdateTime.contentEquals((CharSequence)eventName)) {
            long time = valueIn.int64();
            byte id = inWire.read(() -> "id").int8();
            this.replication.setLastModificationTime(id, time);
            return;
        }
        if (EventId.replicationEvent.contentEquals(eventName)) {
            CMap2EngineReplicator.VanillaReplicatedEntry entry = this.vre.get();
            entry.clear();
            valueIn.marshallable((ReadMarshallable)entry);
            this.replication.applyReplication(entry);
        }
    }

    public void onInitialize(@NotNull WireOut outWire) {
        if (this.isClosed()) {
            return;
        }
        Asset rootAsset = ((EngineWireNetworkContext)this.nc()).rootAsset();
        RequestContext requestContext = RequestContext.requestContext(this.csp());
        Asset asset = rootAsset.acquireAsset(requestContext.fullName());
        this.replication = asset.acquireView(Replication.class, RequestContext.requestContext(asset.fullName()).keyType(this.keyType).valueType(this.valueType));
        long lastUpdateTime = this.replication.lastModificationTime((byte)this.remoteIdentifier());
        WriteMarshallable writeMarshallable = MapReplicationHandler.newMapReplicationHandler(lastUpdateTime, this.keyType, this.valueType, this.csp(), this.cid());
        this.publish(writeMarshallable);
        HostIdentifier hostIdentifier = rootAsset.findOrCreateView(HostIdentifier.class);
        if (hostIdentifier != null) {
            this.localIdentifier = hostIdentifier.hostId();
        }
        EventLoop eventLoop = rootAsset.findOrCreateView(EventLoop.class);
        eventLoop.start();
        EngineReplication.ModificationIterator mi = this.replication.acquireModificationIterator((byte)this.remoteIdentifier());
        if (mi != null) {
            mi.dirtyEntries(this.timestamp);
        }
        if (mi == null) {
            return;
        }
        mi.setModificationNotifier(() -> ((EventLoop)eventLoop).unpause());
        if (!eventLoop.isAlive() && !eventLoop.isClosed()) {
            throw new IllegalStateException("the event loop is not yet running !");
        }
        eventLoop.addHandler(true, (EventHandler)new ReplicationEventHandler(mi, (byte)this.remoteIdentifier()));
    }

    public void close() {
        this.closed = true;
    }

    private class ReplicationEventHandler
    implements EventHandler,
    Closeable {
        private final EngineReplication.ModificationIterator mi;
        private final byte id;
        boolean hasSentLastUpdateTime;
        long lastUpdateTime;
        boolean hasLogged;
        int count;
        long startBufferFullTimeStamp;

        ReplicationEventHandler(EngineReplication.ModificationIterator mi, byte id) {
            this.mi = mi;
            this.id = id;
            this.lastUpdateTime = 0L;
            this.hasLogged = false;
            this.count = 0;
            this.startBufferFullTimeStamp = 0L;
        }

        @NotNull
        public HandlerPriority priority() {
            return HandlerPriority.REPLICATION;
        }

        public boolean action() throws InvalidEventHandlerException {
            if (MapReplicationHandler.this.closed || ((EngineWireNetworkContext)MapReplicationHandler.this.nc()).isClosed()) {
                throw new InvalidEventHandlerException();
            }
            WireOutPublisher publisher = ((EngineWireNetworkContext)MapReplicationHandler.this.nc()).wireOutPublisher();
            assert (!MapReplicationHandler.this.closed);
            if (publisher.isClosed()) {
                throw new InvalidEventHandlerException("publisher is closed");
            }
            if (!this.mi.hasNext()) {
                if (!this.hasSentLastUpdateTime && this.lastUpdateTime > 0L) {
                    publisher.put(null, w -> {
                        w.writeDocument(true, d -> d.write((WireKey)CoreFields.cid).int64(MapReplicationHandler.this.cid()));
                        w.writeDocument(false, d -> {
                            d.writeEventName((WireKey)CoreFields.lastUpdateTime).int64(this.lastUpdateTime);
                            d.write(() -> "id").int8(this.id);
                        });
                    });
                    this.hasSentLastUpdateTime = true;
                }
                return false;
            }
            this.mi.nextEntry(e -> publisher.put(null, w -> {
                assert (e.remoteIdentifier() != MapReplicationHandler.this.localIdentifier);
                long newlastUpdateTime = Math.max(this.lastUpdateTime, e.timestamp());
                if (newlastUpdateTime > this.lastUpdateTime) {
                    this.hasSentLastUpdateTime = false;
                    this.lastUpdateTime = newlastUpdateTime;
                }
                w.writeDocument(true, d -> d.write((WireKey)CoreFields.cid).int64(MapReplicationHandler.this.cid()));
                w.writeDocument(false, d -> {
                    d.writeEventName((WireKey)EventId.replicationEvent).marshallable((WriteMarshallable)e);
                    d.writeComment((CharSequence)("isAcceptor=" + ((EngineWireNetworkContext)MapReplicationHandler.this.nc()).isAcceptor()));
                });
            }));
            return true;
        }

        @NotNull
        public String toString() {
            return "ReplicationEventHandler{id=" + this.id + ",connectionClosed=" + ((EngineWireNetworkContext)MapReplicationHandler.this.nc()).isClosed() + '}';
        }

        public void close() {
            MapReplicationHandler.this.close();
        }
    }

    public static enum EventId implements ParameterizeWireKey
    {
        replicationEvent(new WireKey[0]),
        bootstrap(new WireKey[0]);

        private final WireKey[] params;

        @SafeVarargs
        private <P extends WireKey> EventId(P ... params) {
            this.params = params;
        }

        @NotNull
        public <P extends WireKey> P[] params() {
            return this.params;
        }
    }
}

