/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.h2.twostep;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.h2.twostep.ReduceBlockList;
import org.apache.ignite.internal.processors.query.h2.twostep.ReduceResultPage;
import org.apache.ignite.internal.processors.query.h2.twostep.ReduceSourceKey;
import org.apache.ignite.internal.processors.query.h2.twostep.Reducer;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.SpanType;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.index.Cursor;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractReducer
implements Reducer {
    static final int MAX_FETCH_SIZE = IgniteSystemProperties.getInteger((String)"IGNITE_SQL_MERGE_TABLE_MAX_SIZE", (int)10000);
    static int prefetchSize = IgniteSystemProperties.getInteger((String)"IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE", (int)1024);
    private static final AtomicReferenceFieldUpdater<AbstractReducer, ConcurrentMap> LAST_PAGES_UPDATER;
    private final GridKernalContext ctx;
    private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
    protected Set<UUID> srcNodes;
    private int pageSize;
    protected final ReduceBlockList<Row> fetched;
    private Row lastEvictedRow;

    AbstractReducer(GridKernalContext ctx) {
        this.ctx = ctx;
        this.fetched = new ReduceBlockList(prefetchSize);
    }

    @Override
    public void setSources(Map<ClusterNode, Integer> nodesToSegmentsCnt) {
        assert (this.srcNodes == null);
        this.srcNodes = new HashSet<UUID>(nodesToSegmentsCnt.size());
        for (ClusterNode node : nodesToSegmentsCnt.keySet()) {
            if (this.srcNodes.add(node.id())) continue;
            throw new IllegalStateException();
        }
    }

    @Override
    public Set<UUID> sources() {
        return this.srcNodes;
    }

    @Override
    public boolean hasSource(UUID nodeId) {
        return this.srcNodes.contains(nodeId);
    }

    @Override
    public void setPageSize(int pageSize) {
        this.pageSize = pageSize;
    }

    @Override
    public void onFailure(UUID nodeId, final CacheException e) {
        if (nodeId == null) {
            nodeId = (UUID)F.first(this.srcNodes);
        }
        this.addPage0(new ReduceResultPage(null, nodeId, null){

            @Override
            public boolean isFail() {
                return true;
            }

            @Override
            public void fetchNextPage() {
                if (e != null) {
                    throw e;
                }
                super.fetchNextPage();
            }
        });
    }

    @Override
    public final Cursor find(@Nullable SearchRow first, @Nullable SearchRow last) {
        this.checkBounds(this.lastEvictedRow, first, last);
        if (this.fetchedAll()) {
            return this.findAllFetched(this.fetched, first, last);
        }
        return this.findInStream(first, last);
    }

    protected abstract Cursor findInStream(@Nullable SearchRow var1, @Nullable SearchRow var2);

    protected abstract Cursor findAllFetched(List<Row> var1, @Nullable SearchRow var2, @Nullable SearchRow var3);

    protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
        if (lastEvictedRow != null) {
            throw new IgniteException("Fetched result set was too large. IGNITE_SQL_MERGE_TABLE_MAX_SIZE(" + MAX_FETCH_SIZE + ") should be increased.");
        }
    }

    protected void onBlockEvict(@NotNull List<Row> evictedBlock) {
        assert (evictedBlock.size() == prefetchSize);
        this.lastEvictedRow = Objects.requireNonNull(AbstractReducer.last(evictedBlock));
    }

    static <Z> Z last(List<Z> l) {
        return l.get(l.size() - 1);
    }

    @Override
    public void addPage(ReduceResultPage page) {
        this.markLastPage(page);
        this.addPage0(page);
    }

    protected abstract void addPage0(ReduceResultPage var1);

    private void checkSourceNodesAlive() {
        for (UUID nodeId : this.srcNodes) {
            if (this.ctx.discovery().alive(nodeId)) continue;
            this.onFailure(nodeId, null);
            return;
        }
    }

    public void fail(CacheException e) {
        for (UUID nodeId : this.srcNodes) {
            this.onFailure(nodeId, e);
        }
    }

    private void markLastPage(ReduceResultPage page) {
        GridQueryNextPageResponse res = page.response();
        if (!res.last()) {
            UUID nodeId = page.source();
            this.initLastPages(nodeId, res);
            ConcurrentMap<ReduceSourceKey, Integer> lp = this.lastPages;
            if (lp == null) {
                return;
            }
            Integer lastPage = (Integer)lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
            if (lastPage == null) {
                return;
            }
            if (lastPage.intValue() != res.page()) {
                assert (lastPage > res.page());
                return;
            }
        }
        page.setLast(true);
    }

    private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
        int lastPage;
        int allRows = res.allRows();
        if (allRows < 0 || res.page() != 0) {
            return;
        }
        ConcurrentMap<ReduceSourceKey, Integer> lp = this.lastPages;
        if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<ReduceSourceKey, Integer>())) {
            lp = this.lastPages;
        }
        assert (this.pageSize > 0) : this.pageSize;
        int n = lastPage = allRows == 0 ? 0 : (allRows - 1) / this.pageSize;
        assert (lastPage >= 0) : lastPage;
        if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null) {
            throw new IllegalStateException();
        }
    }

    protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
        assert (!lastPage.isDummyLast());
        return new ReduceResultPage(this.ctx, lastPage.source(), null).setLast(true);
    }

    protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
        if (!iter.hasNext()) {
            try (MTC.TraceSurroundings ignored = MTC.support((Span)this.ctx.tracing().create(SpanType.SQL_PAGE_FETCH, MTC.span()));){
                ReduceResultPage page = this.takeNextPage(queue);
                if (!page.isLast()) {
                    page.fetchNextPage();
                }
                iter = page.rows();
                MTC.span().addTag("sql.page.rows", () -> Integer.toString(page.rowsInPage()));
                assert (iter.hasNext() || page.isDummyLast() || page.isFail());
            }
        }
        return iter;
    }

    private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
        try (MTC.TraceSurroundings ignored = MTC.support((Span)this.ctx.tracing().create(SpanType.SQL_PAGE_WAIT, MTC.span()));){
            ReduceResultPage page;
            while (true) {
                try {
                    page = queue.poll(500L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    throw new CacheException("Query execution was interrupted.", (Throwable)e);
                }
                if (page != null) break;
                this.checkSourceNodesAlive();
            }
            ReduceResultPage reduceResultPage = page;
            return reduceResultPage;
        }
    }

    static {
        if (!U.isPow2((int)prefetchSize)) {
            throw new IllegalArgumentException("IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE (" + prefetchSize + ") must be positive and a power of 2.");
        }
        if (prefetchSize >= MAX_FETCH_SIZE) {
            throw new IllegalArgumentException("IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE (" + prefetchSize + ") must be less than " + "IGNITE_SQL_MERGE_TABLE_MAX_SIZE" + " (" + MAX_FETCH_SIZE + ").");
        }
        LAST_PAGES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReducer.class, ConcurrentMap.class, "lastPages");
    }

    static interface Pollable<E> {
        public E poll(long var1, TimeUnit var3) throws InterruptedException;
    }
}

