/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SetQueue;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailablilityListener;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobmanager.scheduler.SubSlot;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements InstanceListener,
SlotAvailablilityListener {
    static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final Object globalLock = new Object();
    private final ExecutorService executor;
    private final Set<Instance> allInstances = new HashSet<Instance>();
    private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
    private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
    private final BlockingQueue<Instance> newlyAvailableInstances;
    private int unconstrainedAssignments;
    private int localizedAssignments;
    private int nonLocalizedAssignments;

    public Scheduler() {
        this(null);
    }

    public Scheduler(ExecutorService executorService) {
        this.executor = executorService;
        this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance i : this.allInstances) {
                i.removeSlotListener();
                i.cancelAndReleaseAllSlots();
            }
            this.allInstances.clear();
            this.instancesWithAvailableResources.clear();
            this.taskQueue.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfAvailableSlots() {
        int count = 0;
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance instance : this.instancesWithAvailableResources) {
                count += instance.getNumberOfAvailableSlots();
            }
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getTotalNumberOfSlots() {
        int count = 0;
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance instance : this.allInstances) {
                if (!instance.isAlive()) continue;
                count += instance.getTotalNumberOfSlots();
            }
        }
        return count;
    }

    public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
        Object ret = this.scheduleTask(task, false);
        if (ret instanceof AllocatedSlot) {
            return (AllocatedSlot)ret;
        }
        throw new RuntimeException();
    }

    public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
        Object ret = this.scheduleTask(task, true);
        if (ret instanceof AllocatedSlot) {
            return new SlotAllocationFuture((AllocatedSlot)ret);
        }
        if (ret instanceof SlotAllocationFuture) {
            return (SlotAllocationFuture)ret;
        }
        throw new RuntimeException();
    }

    private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
        if (task == null) {
            throw new IllegalArgumentException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Scheduling task " + task);
        }
        ExecutionVertex vertex = task.getTaskToExecute().getVertex();
        Object object = this.globalLock;
        synchronized (object) {
            AllocatedSlot slot;
            SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
            if (sharingUnit != null) {
                if (queueIfNoResource) {
                    throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
                }
                SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
                CoLocationConstraint constraint = task.getLocationConstraint();
                SubSlot slotFromGroup = constraint == null ? assignment.getSlotForTask(vertex) : assignment.getSlotForTask(vertex, constraint);
                AllocatedSlot newSlot = null;
                try {
                    SubSlot toUse;
                    if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {
                        this.updateLocalityCounters(slotFromGroup.getLocality());
                        return slotFromGroup;
                    }
                    Iterable<Instance> locations = constraint == null || constraint.isUnassigned() ? vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());
                    newSlot = this.getFreeSlotForTask(vertex, locations);
                    if (newSlot == null) {
                        if (slotFromGroup == null) {
                            if (constraint == null || constraint.isUnassigned()) {
                                throw new NoResourceAvailableException(this.getNumberOfAvailableInstances(), this.getTotalNumberOfSlots());
                            }
                            throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint.");
                        }
                        toUse = slotFromGroup;
                    } else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
                        if (slotFromGroup != null) {
                            slotFromGroup.releaseSlot();
                        }
                        toUse = constraint == null ? assignment.addNewSlotWithTask(newSlot, vertex) : assignment.addNewSlotWithTask(newSlot, vertex, constraint);
                    } else {
                        newSlot.releaseSlot();
                        toUse = slotFromGroup;
                    }
                    if (constraint != null) {
                        if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
                            constraint.setSharedSlot(toUse.getSharedSlot());
                        } else {
                            throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint.");
                        }
                    }
                    this.updateLocalityCounters(toUse.getLocality());
                    return toUse;
                }
                catch (NoResourceAvailableException e) {
                    throw e;
                }
                catch (Throwable t) {
                    if (slotFromGroup != null) {
                        slotFromGroup.releaseSlot();
                    }
                    if (newSlot != null) {
                        newSlot.releaseSlot();
                    }
                    ExceptionUtils.rethrow((Throwable)t, (String)"An error occurred while allocating a slot in a sharing group");
                }
            }
            if ((slot = this.getFreeSlotForTask(vertex, vertex.getPreferredLocations())) != null) {
                this.updateLocalityCounters(slot.getLocality());
                return slot;
            }
            if (queueIfNoResource) {
                SlotAllocationFuture future = new SlotAllocationFuture();
                this.taskQueue.add(new QueuedTask(task, future));
                return future;
            }
            throw new NoResourceAvailableException(this.getNumberOfAvailableInstances(), this.getTotalNumberOfSlots());
        }
    }

    protected AllocatedSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) {
        while (true) {
            if (this.instancesWithAvailableResources.isEmpty()) {
                Instance queuedInstance = (Instance)this.newlyAvailableInstances.poll();
                if (queuedInstance == null) {
                    return null;
                }
                this.instancesWithAvailableResources.add(queuedInstance);
            }
            Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
            Instance instanceToUse = null;
            Locality locality = Locality.UNCONSTRAINED;
            if (locations != null && locations.hasNext()) {
                while (locations.hasNext()) {
                    Instance location = locations.next();
                    if (location == null || !this.instancesWithAvailableResources.remove(location)) continue;
                    instanceToUse = location;
                    locality = Locality.LOCAL;
                    if (!LOG.isDebugEnabled()) break;
                    LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
                    break;
                }
                if (instanceToUse == null) {
                    instanceToUse = this.instancesWithAvailableResources.poll();
                    locality = Locality.NON_LOCAL;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
                    }
                }
            } else {
                instanceToUse = this.instancesWithAvailableResources.poll();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
                }
            }
            try {
                AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
                if (instanceToUse.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.add(instanceToUse);
                }
                if (slot == null) continue;
                slot.setLocality(locality);
                return slot;
            }
            catch (InstanceDiedException e) {
                this.allInstances.remove(instanceToUse);
                this.instancesWithAvailableResources.remove(instanceToUse);
                continue;
            }
            break;
        }
    }

    @Override
    public void newSlotAvailable(Instance instance) {
        this.newlyAvailableInstances.add(instance);
        if (this.executor != null) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    Scheduler.this.handleNewSlot();
                }
            });
        } else {
            this.handleNewSlot();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleNewSlot() {
        Object object = this.globalLock;
        synchronized (object) {
            block10: {
                Instance instance = (Instance)this.newlyAvailableInstances.poll();
                if (instance == null || !instance.hasResourcesAvailable()) {
                    return;
                }
                QueuedTask queued = this.taskQueue.peek();
                if (queued != null) {
                    ScheduledUnit task = queued.getTask();
                    ExecutionVertex vertex = task.getTaskToExecute().getVertex();
                    try {
                        AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
                        if (newSlot == null) break block10;
                        this.taskQueue.poll();
                        if (queued.getFuture() == null) break block10;
                        try {
                            queued.getFuture().setSlot(newSlot);
                            break block10;
                        }
                        catch (Throwable t) {
                            LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
                            task.getTaskToExecute().fail(t);
                        }
                    }
                    catch (InstanceDiedException e) {
                        this.allInstances.remove(instance);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Instance " + instance + " was marked dead asynchronously.");
                        }
                        break block10;
                    }
                }
                this.instancesWithAvailableResources.add(instance);
            }
        }
    }

    private void updateLocalityCounters(Locality locality) {
        switch (locality) {
            case UNCONSTRAINED: {
                ++this.unconstrainedAssignments;
                break;
            }
            case LOCAL: {
                ++this.localizedAssignments;
                break;
            }
            case NON_LOCAL: {
                ++this.nonLocalizedAssignments;
                break;
            }
            default: {
                throw new RuntimeException(locality.name());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void newInstanceAvailable(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        if (instance.getNumberOfAvailableSlots() <= 0) {
            throw new IllegalArgumentException("The given instance has no resources.");
        }
        if (!instance.isAlive()) {
            throw new IllegalArgumentException("The instance is not alive.");
        }
        Object object = this.globalLock;
        synchronized (object) {
            if (!this.allInstances.add(instance)) {
                throw new IllegalArgumentException("The instance is already contained.");
            }
            try {
                instance.setSlotAvailabilityListener(this);
            }
            catch (IllegalStateException e) {
                this.allInstances.remove(instance);
                LOG.error("Scheduler could not attach to the instance as a listener.");
            }
            this.instancesWithAvailableResources.add(instance);
            for (int i = 0; i < instance.getNumberOfAvailableSlots(); ++i) {
                this.newSlotAvailable(instance);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void instanceDied(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        instance.markDead();
        Object object = this.globalLock;
        synchronized (object) {
            this.allInstances.remove(instance);
            this.instancesWithAvailableResources.remove(instance);
        }
    }

    public int getNumberOfAvailableInstances() {
        return this.allInstances.size();
    }

    public int getNumberOfInstancesWithAvailableSlots() {
        return this.instancesWithAvailableResources.size();
    }

    public int getNumberOfUnconstrainedAssignments() {
        return this.unconstrainedAssignments;
    }

    public int getNumberOfLocalizedAssignments() {
        return this.localizedAssignments;
    }

    public int getNumberOfNonLocalizedAssignments() {
        return this.nonLocalizedAssignments;
    }

    private static final class QueuedTask {
        private final ScheduledUnit task;
        private final SlotAllocationFuture future;

        public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
            this.task = task;
            this.future = future;
        }

        public ScheduledUnit getTask() {
            return this.task;
        }

        public SlotAllocationFuture getFuture() {
            return this.future;
        }
    }
}

