/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.api.query;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.engine.api.pubsub.ConsumingSubscriber;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.query.IndexQuery;
import net.openhft.chronicle.engine.api.query.IndexQueueView;
import net.openhft.chronicle.engine.api.query.IndexedValue;
import net.openhft.chronicle.engine.api.query.TypeToString;
import net.openhft.chronicle.engine.api.query.VanillaObjectCacheFactory;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.wire.DefaultValueIn;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.KeyedMarshallable;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.MessageHistory;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VanillaIndexQueueView<V extends Marshallable>
implements IndexQueueView<ConsumingSubscriber<IndexedValue<V>>, V> {
    private static final Logger LOG = LoggerFactory.getLogger(VanillaIndexQueueView.class);
    private static final Iterator EMPTY_ITERATOR = Collections.EMPTY_LIST.iterator();
    private final ChronicleQueue chronicleQueue;
    private final Map<String, ConcurrentMap<Object, IndexedValue<V>>> multiMap = new ConcurrentHashMap<String, ConcurrentMap<Object, IndexedValue<V>>>();
    private final Map<Subscriber<IndexedValue<V>>, AtomicBoolean> activeSubscriptions = new ConcurrentHashMap<Subscriber<IndexedValue<V>>, AtomicBoolean>();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final Object lastIndexLock = new Object();
    private final ThreadLocal<IndexedValue<V>> indexedValue = ThreadLocal.withInitial(IndexedValue::new);
    @Nullable
    private final TypeToString typeToString;
    @NotNull
    private final Asset asset;
    private volatile long lastIndexRead = 0L;
    private long lastSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
    private long messagesReadPerSecond = 0L;
    @NotNull
    private ConcurrentMap<Bytes, BytesStore> bytesToKey = new ConcurrentHashMap<Bytes, BytesStore>();

    public VanillaIndexQueueView(@NotNull RequestContext context, @NotNull Asset asset, @NotNull QueueView<?, V> queueView) {
        this.asset = asset;
        EventLoop eventLoop = asset.acquireView(EventLoop.class);
        ChronicleQueueView chronicleQueueView = (ChronicleQueueView)queueView;
        this.chronicleQueue = chronicleQueueView.chronicleQueue();
        ExcerptTailer tailer = this.chronicleQueue.createTailer();
        AtomicBoolean hasMovedToStart = new AtomicBoolean();
        this.typeToString = asset.root().findView(TypeToString.class);
        eventLoop.addHandler(() -> {
            long currentSecond;
            boolean success;
            if (!hasMovedToStart.get()) {
                RollingChronicleQueue chronicleQueue = (RollingChronicleQueue)this.chronicleQueue;
                int cycle = chronicleQueue.cycle();
                long startOfCurrentCycle = chronicleQueue.rollCycle().toIndex(cycle, 0L);
                success = tailer.moveToIndex(startOfCurrentCycle);
                hasMovedToStart.set(success);
                if (!success) {
                    return false;
                }
            }
            if ((currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())) >= this.lastSecond + 10L) {
                this.lastSecond = currentSecond;
                LOG.info("messages read per second=" + this.messagesReadPerSecond / 10L);
                this.messagesReadPerSecond = 0L;
            }
            if (this.isClosed.get()) {
                throw new InvalidEventHandlerException();
            }
            try (DocumentContext dc = tailer.readingDocument();){
                if (!dc.isPresent()) {
                    success = false;
                    return success;
                }
                long start = dc.wire().bytes().readPosition();
                while (true) {
                    Class<? extends Marshallable> type;
                    try {
                        dc.wire().consumePadding();
                        if (dc.wire().bytes().readRemaining() == 0L) {
                            boolean bl = true;
                            return bl;
                        }
                    }
                    catch (RuntimeException e2) {
                        Jvm.warn().on(this.getClass(), Wires.fromSizePrefixedBlobs((Bytes)dc.wire().bytes(), (long)(start - 4L)), (Throwable)e2);
                        return true;
                    }
                    StringBuilder sb = Wires.acquireStringBuilder();
                    ValueIn read = dc.wire().read(sb);
                    if ("history".contentEquals(sb)) {
                        read.marshallable((ReadMarshallable)MessageHistory.get());
                        boolean bl = true;
                        return bl;
                    }
                    if (sb.length() == 0 || (type = this.typeToString.toType(sb)) == null) continue;
                    Marshallable v = (Marshallable)VanillaObjectCacheFactory.INSTANCE.get().apply(type);
                    long readPosition = dc.wire().bytes().readPosition();
                    try {
                        read.marshallable((ReadMarshallable)v);
                    }
                    catch (Exception e3) {
                        String msg = dc.wire().bytes().toHexString(readPosition, dc.wire().bytes().readLimit() - readPosition);
                        LOG.error("Error passing " + v.getClass().getSimpleName() + " bytes:\n" + msg, (Throwable)e3);
                        boolean bl = false;
                        if (dc == null) return bl;
                        if (var6_12 == null) {
                            dc.close();
                            return bl;
                        }
                        try {
                            dc.close();
                            return bl;
                        }
                        catch (Throwable throwable) {
                            var6_12.addSuppressed(throwable);
                            return bl;
                        }
                    }
                    {
                        if (!(v instanceof KeyedMarshallable)) continue;
                        Bytes bytes = Wires.acquireBytes();
                        ((KeyedMarshallable)v).writeKey(bytes);
                        BytesStore k = this.bytesToKey.computeIfAbsent(bytes, Bytes::copy);
                        if (k == null) continue;
                        ++this.messagesReadPerSecond;
                        String eventName = sb.toString();
                        Object object = this.lastIndexLock;
                        synchronized (object) {
                            this.multiMap.computeIfAbsent(eventName, e -> new ConcurrentHashMap()).compute(k, (k1, vOld) -> {
                                if (vOld == null) {
                                    return new IndexedValue<Marshallable>(Wires.deepCopy((Marshallable)v), dc.index());
                                }
                                Wires.copyTo((Object)v, vOld.v());
                                vOld.index(dc.index());
                                return vOld;
                            });
                            this.lastIndexRead = dc.index();
                        }
                        continue;
                    }
                    break;
                }
            }
        });
    }

    @Override
    public void registerSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> sub, @NotNull IndexQuery<V> vanillaIndexQuery) {
        ExcerptTailer tailer = this.chronicleQueue.createTailer();
        long start = tailer.toStart().index();
        ExcerptTailer excerptTailer = tailer.toEnd();
        long endIndex = excerptTailer.index();
        long fromIndex0 = vanillaIndexQuery.fromIndex();
        if (fromIndex0 == -1L) {
            RollingChronicleQueue chronicleQueue = (RollingChronicleQueue)this.chronicleQueue;
            RollCycle rollCycle = chronicleQueue.rollCycle();
            int currentIndex = rollCycle.current((TimeProvider)SystemTimeProvider.INSTANCE, 0L);
            int cycle = rollCycle.toCycle((long)currentIndex);
            fromIndex0 = rollCycle.toIndex(cycle, 0L);
        } else if (fromIndex0 == 0L) {
            fromIndex0 = endIndex;
        }
        fromIndex0 = Math.min(fromIndex0, endIndex);
        long fromIndex = fromIndex0 = Math.max(fromIndex0, start);
        boolean success = tailer.moveToIndex(fromIndex);
        assert (success || fromIndex == endIndex) : "fromIndex=" + Long.toHexString(fromIndex) + ", start=" + Long.toHexString(start) + ",end=" + Long.toHexString(endIndex);
        if (fromIndex <= endIndex) {
            this.registerSubscriber(sub, vanillaIndexQuery, tailer, fromIndex);
            return;
        }
        this.ensureAllDataIsLoadedBeforeRegistingSubsribe(sub, vanillaIndexQuery, tailer, endIndex, fromIndex);
    }

    private void ensureAllDataIsLoadedBeforeRegistingSubsribe(@NotNull ConsumingSubscriber<IndexedValue<V>> sub, @NotNull IndexQuery<V> vanillaIndexQuery, @NotNull ExcerptTailer tailer, long endIndex, long fromIndex) {
        EventLoop eventLoop = this.asset.root().getView(EventLoop.class);
        eventLoop.addHandler(() -> this.endOfTailCheckedRegisterSubscriber(sub, vanillaIndexQuery, tailer, endIndex, fromIndex));
    }

    private boolean endOfTailCheckedRegisterSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> sub, @NotNull IndexQuery<V> vanillaIndexQuery, @NotNull ExcerptTailer tailer, long endIndex, long fromIndex) throws InvalidEventHandlerException {
        if (this.lastIndexRead > endIndex) {
            return false;
        }
        this.registerSubscriber(sub, vanillaIndexQuery, tailer, fromIndex);
        throw new InvalidEventHandlerException();
    }

    private void registerSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> sub, @NotNull IndexQuery<V> vanillaIndexQuery, @NotNull ExcerptTailer tailer, long fromIndex) {
        AtomicBoolean isClosed = new AtomicBoolean();
        this.activeSubscriptions.put(sub, isClosed);
        String eventName = vanillaIndexQuery.eventName();
        Predicate filter = vanillaIndexQuery.filter();
        ConcurrentMap map = this.multiMap.computeIfAbsent(eventName, k -> new ConcurrentHashMap());
        long fromIndex0 = fromIndex;
        Iterator<IndexedValue<V>> iterator = vanillaIndexQuery.bootstrap() ? map.values().stream().filter(i -> i.index() < fromIndex0 && filter.test(i.v())).iterator() : EMPTY_ITERATOR;
        try {
            Supplier<Marshallable> supplier = this.excerptConsumer(vanillaIndexQuery, tailer, iterator, fromIndex);
            sub.addSupplier(supplier);
        }
        catch (RuntimeException e) {
            sub.onEndOfSubscription();
            Jvm.warn().on(this.getClass(), "Error registering subscription", (Throwable)e);
        }
    }

    @NotNull
    private Supplier<Marshallable> excerptConsumer(@NotNull IndexQuery<V> vanillaIndexQuery, @NotNull ExcerptTailer tailer, @NotNull Iterator<IndexedValue<V>> iterator, long fromIndex) {
        return () -> this.value(vanillaIndexQuery, tailer, iterator, fromIndex);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private Marshallable value(@NotNull IndexQuery<V> vanillaIndexQuery, @NotNull ExcerptTailer tailer, @NotNull Iterator<IndexedValue<V>> iterator, long from) {
        if (iterator.hasNext()) {
            IndexedValue<V> indexedValue = iterator.next();
            indexedValue.timePublished(System.currentTimeMillis());
            indexedValue.maxIndex(this.lastIndexRead);
            return indexedValue;
        }
        String eventName = vanillaIndexQuery.eventName();
        Predicate<V> filter = vanillaIndexQuery.filter();
        if (this.isClosed.get()) {
            throw Jvm.rethrow((Throwable)new InvalidEventHandlerException("shutdown"));
        }
        Throwable throwable = null;
        try (DocumentContext dc = tailer.readingDocument();){
            IndexedValue<Marshallable> indexedValue;
            Marshallable v;
            block56: {
                ValueIn valueIn;
                Class<? extends Marshallable> type;
                block55: {
                    block54: {
                        block53: {
                            block52: {
                                if (dc.isPresent()) break block52;
                                Marshallable marshallable = null;
                                if (dc.isPresent()) {
                                    while (dc.wire().hasMore()) {
                                        dc.wire().read().skipValue();
                                    }
                                }
                                return marshallable;
                            }
                            if (LOG.isDebugEnabled()) {
                                Jvm.debug().on(this.getClass(), "processing the following message=" + Wires.fromSizePrefixedBlobs((DocumentContext)dc));
                            }
                            if (from <= dc.index()) break block53;
                            Marshallable marshallable = null;
                            if (dc.isPresent()) {
                                while (dc.wire().hasMore()) {
                                    dc.wire().read().skipValue();
                                }
                            }
                            return marshallable;
                        }
                        type = this.typeToString.toType(eventName);
                        if (type != null) break block54;
                        Marshallable marshallable = null;
                        if (dc.isPresent()) {
                            while (dc.wire().hasMore()) {
                                dc.wire().read().skipValue();
                            }
                        }
                        return marshallable;
                    }
                    valueIn = dc.wire().read(eventName);
                    if (!(valueIn instanceof DefaultValueIn)) break block55;
                    Marshallable marshallable = null;
                    if (dc.isPresent()) {
                        while (dc.wire().hasMore()) {
                            dc.wire().read().skipValue();
                        }
                    }
                    return marshallable;
                }
                v = (Marshallable)VanillaObjectCacheFactory.INSTANCE.get().apply(type);
                valueIn.marshallable((ReadMarshallable)v);
                if (filter.test(v)) break block56;
                Marshallable marshallable = null;
                if (dc.isPresent()) {
                    while (dc.wire().hasMore()) {
                        dc.wire().read().skipValue();
                    }
                }
                return marshallable;
            }
            try {
                IndexedValue<Marshallable> indexedValue2 = this.indexedValue.get();
                long index = dc.index();
                indexedValue2.index(index);
                indexedValue2.v(v);
                indexedValue2.timePublished(System.currentTimeMillis());
                indexedValue2.maxIndex(Math.max(dc.index(), this.lastIndexRead));
                indexedValue = indexedValue2;
            }
            catch (Throwable throwable2) {
                try {
                    if (dc.isPresent()) {
                        while (dc.wire().hasMore()) {
                            dc.wire().read().skipValue();
                        }
                    }
                    throw throwable2;
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
            }
            if (dc.isPresent()) {
                while (dc.wire().hasMore()) {
                    dc.wire().read().skipValue();
                }
            }
            return indexedValue;
        }
    }

    @Override
    public void unregisterSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> listener) {
        AtomicBoolean isClosed = this.activeSubscriptions.remove(listener);
        if (isClosed != null) {
            isClosed.set(true);
        }
    }

    public void close() {
        this.isClosed.set(true);
        this.activeSubscriptions.values().forEach(v -> v.set(true));
        this.chronicleQueue.close();
    }
}

