/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.exec.planner.fragment.MemoryCalculator;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.proto.CoordinationProtos;

public class QueueQueryParallelizer
extends SimpleParallelizer {
    private final boolean planHasMemory;
    private final QueryContext queryContext;
    private final Map<CoordinationProtos.DrillbitEndpoint, Map<PhysicalOperator, Long>> operators;

    public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) {
        super(queryContext);
        this.planHasMemory = memoryPlanning;
        this.queryContext = queryContext;
        this.operators = new HashMap<CoordinationProtos.DrillbitEndpoint, Map<PhysicalOperator, Long>>();
    }

    @Override
    public BiFunction<CoordinationProtos.DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
        return (endpoint, operator) -> {
            if (!this.planHasMemory) {
                return this.operators.get(endpoint).get(operator);
            }
            return operator.getMaxAllocation();
        };
    }

    @Override
    public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
        if (this.planHasMemory) {
            return;
        }
        Map<CoordinationProtos.DrillbitEndpoint, NodeResource> totalNodeResources = activeEndpoints.stream().collect(Collectors.toMap(x -> x, x -> NodeResource.create()));
        Map<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> operators = activeEndpoints.stream().collect(Collectors.toMap(x -> x, x -> new ArrayList()));
        for (Wrapper wrapper : roots) {
            this.traverse(wrapper, CheckedConsumer.throwingConsumerWrapper(fragment -> {
                MemoryCalculator calculator = new MemoryCalculator(planningSet, this.queryContext);
                fragment.getNode().getRoot().accept(calculator, fragment);
                NodeResource.merge(totalNodeResources, fragment.getResourceMap());
                operators.entrySet().stream().forEach(entry -> ((List)entry.getValue()).addAll(calculator.getBufferedOperators((CoordinationProtos.DrillbitEndpoint)entry.getKey())));
            }));
        }
        Map<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = this.ensureOperatorMemoryWithinLimits(operators, totalNodeResources, 10);
        memoryAdjustedOperators.entrySet().stream().forEach(x -> {
            Map<PhysicalOperator, Long> memoryPerOperator = ((List)x.getValue()).stream().collect(Collectors.toMap(operatorLongPair -> (PhysicalOperator)operatorLongPair.getLeft(), operatorLongPair -> (Long)operatorLongPair.getRight(), (mem_1, mem_2) -> mem_1 + mem_2));
            this.operators.put((CoordinationProtos.DrillbitEndpoint)x.getKey(), memoryPerOperator);
        });
    }

    private Map<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> ensureOperatorMemoryWithinLimits(Map<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryPerOperator, Map<CoordinationProtos.DrillbitEndpoint, NodeResource> nodeResourceMap, int nodeLimit) {
        HashMap onlyMemoryAboveLimitOperators = new HashMap();
        memoryPerOperator.entrySet().stream().forEach(entry -> {
            onlyMemoryAboveLimitOperators.putIfAbsent((CoordinationProtos.DrillbitEndpoint)entry.getKey(), new ArrayList());
            if (((NodeResource)nodeResourceMap.get(entry.getKey())).getMemory() > (long)nodeLimit) {
                ((List)onlyMemoryAboveLimitOperators.get(entry.getKey())).addAll((Collection)entry.getValue());
            }
        });
        HashMap memoryAdjustedDrillbits = new HashMap();
        onlyMemoryAboveLimitOperators.entrySet().stream().forEach(entry -> {
            Long totalMemory = ((List)entry.getValue()).stream().mapToLong(Pair::getValue).sum();
            List adjustedMemory = ((List)entry.getValue()).stream().map(operatorMemory -> Pair.of((Object)((PhysicalOperator)operatorMemory.getKey()), (Object)((long)Math.ceil((Long)operatorMemory.getValue() / totalMemory * (long)nodeLimit)))).collect(Collectors.toList());
            memoryAdjustedDrillbits.put((CoordinationProtos.DrillbitEndpoint)entry.getKey(), adjustedMemory);
        });
        HashMap<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<CoordinationProtos.DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>>();
        memoryPerOperator.entrySet().stream().filter(entry -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(operatorMemory -> allDrillbits.put((CoordinationProtos.DrillbitEndpoint)operatorMemory.getKey(), (List)operatorMemory.getValue()));
        memoryAdjustedDrillbits.entrySet().stream().forEach(operatorMemory -> allDrillbits.put((CoordinationProtos.DrillbitEndpoint)operatorMemory.getKey(), (List)operatorMemory.getValue()));
        return allDrillbits;
    }
}

