/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.planner.fragment.FragmentParallelizer;
import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
import org.apache.drill.exec.planner.fragment.ParallelizationParameters;
import org.apache.drill.exec.planner.fragment.Stats;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Ordering;

public class SoftAffinityFragmentParallelizer
implements FragmentParallelizer {
    public static final SoftAffinityFragmentParallelizer INSTANCE = new SoftAffinityFragmentParallelizer();
    private static final Ordering<EndpointAffinity> ENDPOINT_AFFINITY_ORDERING = Ordering.from(new Comparator<EndpointAffinity>(){

        @Override
        public int compare(EndpointAffinity o1, EndpointAffinity o2) {
            return Double.compare(o2.getAffinity(), o1.getAffinity());
        }
    });

    @Override
    public void parallelizeFragment(Wrapper fragmentWrapper, ParallelizationParameters parameters, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
        Stats stats = fragmentWrapper.getStats();
        ParallelizationInfo parallelizationInfo = stats.getParallelizationInfo();
        int width = (int)Math.ceil(stats.getMaxCost() / (double)parameters.getSliceTarget());
        width = Math.min(width, Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
        width = Math.min(width, parameters.getMaxWidthPerNode() * activeEndpoints.size());
        width = Math.max(parallelizationInfo.getMinWidth(), width);
        width = Math.min(parallelizationInfo.getMaxWidth(), width);
        width = Math.max(1, width);
        fragmentWrapper.setWidth(width);
        List<CoordinationProtos.DrillbitEndpoint> assignedEndpoints = this.findEndpoints(activeEndpoints, parallelizationInfo.getEndpointAffinityMap(), fragmentWrapper.getWidth(), parameters);
        fragmentWrapper.assignEndpoints(assignedEndpoints);
    }

    private List<CoordinationProtos.DrillbitEndpoint> findEndpoints(Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints, Map<CoordinationProtos.DrillbitEndpoint, EndpointAffinity> endpointAffinityMap, int width, ParallelizationParameters parameters) throws PhysicalOperatorSetupException {
        ArrayList<CoordinationProtos.DrillbitEndpoint> endpoints = Lists.newArrayList();
        if (endpointAffinityMap.size() > 0) {
            EndpointAffinity endpointAffinity;
            ImmutableList<EndpointAffinity> sortedAffinityList = ENDPOINT_AFFINITY_ORDERING.immutableSortedCopy(endpointAffinityMap.values());
            int numRequiredNodes = 0;
            Iterator iterator = sortedAffinityList.iterator();
            while (iterator.hasNext() && (endpointAffinity = (EndpointAffinity)iterator.next()).isAssignmentRequired()) {
                ++numRequiredNodes;
            }
            if (width < numRequiredNodes) {
                throw new PhysicalOperatorSetupException("Can not parallelize the fragment as the parallelization width (" + width + ") is less than the number of mandatory nodes (" + numRequiredNodes + " nodes with +INFINITE affinity).");
            }
            int affinedSlots = Math.max(1, (int)(Math.ceil(parameters.getAffinityFactor() * (double)width / (double)activeEndpoints.size()) * (double)sortedAffinityList.size()));
            affinedSlots = Math.max(affinedSlots, numRequiredNodes);
            affinedSlots = Math.min(affinedSlots, width);
            Iterator<EndpointAffinity> iterator2 = Iterators.cycle(sortedAffinityList);
            while (endpoints.size() < affinedSlots) {
                EndpointAffinity ea = iterator2.next();
                endpoints.add(ea.getEndpoint());
            }
        }
        if (endpoints.size() < width) {
            ArrayList<Object> endpointsWithNoAffinity;
            Set<CoordinationProtos.DrillbitEndpoint> endpointsWithAffinity = endpointAffinityMap.keySet();
            if (endpointAffinityMap.size() > 0) {
                endpointsWithNoAffinity = Lists.newArrayList();
                for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : activeEndpoints) {
                    if (endpointsWithAffinity.contains(drillbitEndpoint)) continue;
                    endpointsWithNoAffinity.add(drillbitEndpoint);
                }
            } else {
                endpointsWithNoAffinity = Lists.newArrayList(activeEndpoints);
            }
            Collections.shuffle(endpointsWithNoAffinity, ThreadLocalRandom.current());
            Iterator<Object> otherEPItr = Iterators.cycle(endpointsWithNoAffinity.size() > 0 ? endpointsWithNoAffinity : endpointsWithAffinity);
            while (endpoints.size() < width) {
                endpoints.add((CoordinationProtos.DrillbitEndpoint)otherEPItr.next());
            }
        }
        return endpoints;
    }
}

