/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManagerId;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceEventListener;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceReconcileResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FineGrainedSlotManager
implements SlotManager {
    private static final Logger LOG = LoggerFactory.getLogger(FineGrainedSlotManager.class);
    private final TaskManagerTracker taskManagerTracker;
    private final ResourceTracker resourceTracker;
    private final ResourceAllocationStrategy resourceAllocationStrategy;
    private final SlotStatusSyncer slotStatusSyncer;
    private final ScheduledExecutor scheduledExecutor;
    private final Time taskManagerTimeout;
    private final Duration requirementsCheckDelay;
    private final Duration declareNeededResourceDelay;
    private final SlotManagerMetricGroup slotManagerMetricGroup;
    private final Map<JobID, String> jobMasterTargetAddresses = new HashMap<JobID, String>();
    private final boolean waitResultConsumedBeforeRelease;
    private final CPUResource maxTotalCpu;
    private final MemorySize maxTotalMem;
    private boolean sendNotEnoughResourceNotifications = true;
    private final Set<JobID> unfulfillableJobs = new HashSet<JobID>();
    @Nullable
    private ResourceManagerId resourceManagerId;
    @Nullable
    private Executor mainThreadExecutor;
    @Nullable
    private ResourceAllocator resourceAllocator;
    @Nullable
    private ResourceEventListener resourceEventListener;
    @Nullable
    private ScheduledFuture<?> clusterReconciliationCheck;
    @Nullable
    private CompletableFuture<Void> requirementsCheckFuture;
    @Nullable
    private CompletableFuture<Void> declareNeededResourceFuture;
    @Nullable
    private BlockedTaskManagerChecker blockedTaskManagerChecker;
    private boolean started;

    public FineGrainedSlotManager(ScheduledExecutor scheduledExecutor, SlotManagerConfiguration slotManagerConfiguration, SlotManagerMetricGroup slotManagerMetricGroup, ResourceTracker resourceTracker, TaskManagerTracker taskManagerTracker, SlotStatusSyncer slotStatusSyncer, ResourceAllocationStrategy resourceAllocationStrategy) {
        this.scheduledExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)scheduledExecutor);
        Preconditions.checkNotNull((Object)slotManagerConfiguration);
        this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
        this.waitResultConsumedBeforeRelease = slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
        this.requirementsCheckDelay = (Duration)Preconditions.checkNotNull((Object)slotManagerConfiguration.getRequirementCheckDelay());
        this.declareNeededResourceDelay = (Duration)Preconditions.checkNotNull((Object)slotManagerConfiguration.getDeclareNeededResourceDelay());
        this.slotManagerMetricGroup = (SlotManagerMetricGroup)Preconditions.checkNotNull((Object)slotManagerMetricGroup);
        this.resourceTracker = (ResourceTracker)Preconditions.checkNotNull((Object)resourceTracker);
        this.taskManagerTracker = (TaskManagerTracker)Preconditions.checkNotNull((Object)taskManagerTracker);
        this.slotStatusSyncer = (SlotStatusSyncer)Preconditions.checkNotNull((Object)slotStatusSyncer);
        this.resourceAllocationStrategy = (ResourceAllocationStrategy)Preconditions.checkNotNull((Object)resourceAllocationStrategy);
        this.maxTotalCpu = (CPUResource)Preconditions.checkNotNull((Object)slotManagerConfiguration.getMaxTotalCpu());
        this.maxTotalMem = (MemorySize)Preconditions.checkNotNull((Object)slotManagerConfiguration.getMaxTotalMem());
        this.resourceManagerId = null;
        this.resourceAllocator = null;
        this.resourceEventListener = null;
        this.mainThreadExecutor = null;
        this.clusterReconciliationCheck = null;
        this.requirementsCheckFuture = null;
        this.started = false;
    }

    @Override
    public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
        this.checkInit();
        this.sendNotEnoughResourceNotifications = failUnfulfillableRequest;
        if (failUnfulfillableRequest && !this.unfulfillableJobs.isEmpty()) {
            for (JobID jobId : this.unfulfillableJobs) {
                this.resourceEventListener.notEnoughResourceAvailable(jobId, this.resourceTracker.getAcquiredResources(jobId));
            }
        }
    }

    @Override
    public void triggerResourceRequirementsCheck() {
        this.checkResourceRequirementsWithDelay();
    }

    @Override
    public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceAllocator newResourceAllocator, ResourceEventListener newResourceEventListener, BlockedTaskManagerChecker newBlockedTaskManagerChecker) {
        LOG.info("Starting the slot manager.");
        this.resourceManagerId = (ResourceManagerId)((Object)Preconditions.checkNotNull((Object)((Object)newResourceManagerId)));
        this.mainThreadExecutor = (Executor)Preconditions.checkNotNull((Object)newMainThreadExecutor);
        this.resourceAllocator = (ResourceAllocator)Preconditions.checkNotNull((Object)newResourceAllocator);
        this.resourceEventListener = (ResourceEventListener)Preconditions.checkNotNull((Object)newResourceEventListener);
        this.slotStatusSyncer.initialize(this.taskManagerTracker, this.resourceTracker, this.resourceManagerId, this.mainThreadExecutor);
        this.blockedTaskManagerChecker = (BlockedTaskManagerChecker)Preconditions.checkNotNull((Object)newBlockedTaskManagerChecker);
        this.started = true;
        if (this.resourceAllocator.isSupported()) {
            this.clusterReconciliationCheck = this.scheduledExecutor.scheduleWithFixedDelay(() -> this.mainThreadExecutor.execute(this::checkClusterReconciliation), 0L, this.taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        this.registerSlotManagerMetrics();
    }

    private void registerSlotManagerMetrics() {
        this.slotManagerMetricGroup.gauge("taskSlotsAvailable", () -> (long)this.getNumberFreeSlots());
        this.slotManagerMetricGroup.gauge("taskSlotsTotal", () -> (long)this.getNumberRegisteredSlots());
    }

    @Override
    public void suspend() {
        if (!this.started) {
            return;
        }
        LOG.info("Suspending the slot manager.");
        this.slotManagerMetricGroup.close();
        if (this.clusterReconciliationCheck != null) {
            this.clusterReconciliationCheck.cancel(false);
            this.clusterReconciliationCheck = null;
        }
        this.slotStatusSyncer.close();
        this.taskManagerTracker.clear();
        this.resourceTracker.clear();
        this.unfulfillableJobs.clear();
        this.resourceManagerId = null;
        this.resourceAllocator = null;
        this.resourceEventListener = null;
        this.started = false;
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing the slot manager.");
        this.suspend();
    }

    @Override
    public void clearResourceRequirements(JobID jobId) {
        this.maybeReclaimInactiveSlots(jobId);
        this.jobMasterTargetAddresses.remove(jobId);
        this.resourceTracker.notifyResourceRequirements(jobId, Collections.emptyList());
        if (this.resourceAllocator.isSupported()) {
            this.taskManagerTracker.clearPendingAllocationsOfJob(jobId);
            this.checkResourcesNeedReconcile();
            this.declareNeededResourcesWithDelay();
        }
    }

    private void maybeReclaimInactiveSlots(JobID jobId) {
        if (!this.resourceTracker.getAcquiredResources(jobId).isEmpty()) {
            this.slotStatusSyncer.freeInactiveSlots(jobId);
        }
    }

    @Override
    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
        this.checkInit();
        if (resourceRequirements.getResourceRequirements().isEmpty() && this.resourceTracker.isRequirementEmpty(resourceRequirements.getJobId())) {
            return;
        }
        if (resourceRequirements.getResourceRequirements().isEmpty()) {
            LOG.info("Clearing resource requirements of job {}", (Object)resourceRequirements.getJobId());
            this.jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
            if (this.resourceAllocator.isSupported()) {
                this.taskManagerTracker.clearPendingAllocationsOfJob(resourceRequirements.getJobId());
            }
        } else {
            LOG.info("Received resource requirements from job {}: {}", (Object)resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
            this.jobMasterTargetAddresses.put(resourceRequirements.getJobId(), resourceRequirements.getTargetAddress());
        }
        this.resourceTracker.notifyResourceRequirements(resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements());
        this.checkResourceRequirementsWithDelay();
    }

    @Override
    public SlotManager.RegistrationResult registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport, ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) {
        Optional matchedPendingTaskManagerOptional;
        this.checkInit();
        LOG.info("Registering task executor {} under {} at the slot manager.", (Object)taskExecutorConnection.getResourceID(), (Object)taskExecutorConnection.getInstanceID());
        if (this.taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).isPresent()) {
            LOG.debug("Task executor {} was already registered.", (Object)taskExecutorConnection.getResourceID());
            this.reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
            return SlotManager.RegistrationResult.IGNORED;
        }
        Optional<Object> optional = matchedPendingTaskManagerOptional = initialSlotReport.hasAllocatedSlot() ? Optional.empty() : this.findMatchingPendingTaskManager(totalResourceProfile, defaultSlotResourceProfile);
        if (!matchedPendingTaskManagerOptional.isPresent() && this.isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
            LOG.info("Can not register task manager {}. The max total resource limitation <{}, {}> is reached.", new Object[]{taskExecutorConnection.getResourceID(), this.maxTotalCpu, this.maxTotalMem.toHumanReadableString()});
            return SlotManager.RegistrationResult.REJECTED;
        }
        this.taskManagerTracker.addTaskManager(taskExecutorConnection, totalResourceProfile, defaultSlotResourceProfile);
        if (initialSlotReport.hasAllocatedSlot()) {
            this.slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
        }
        if (matchedPendingTaskManagerOptional.isPresent()) {
            PendingTaskManager pendingTaskManager = (PendingTaskManager)matchedPendingTaskManagerOptional.get();
            this.allocateSlotsForRegisteredPendingTaskManager(pendingTaskManager, taskExecutorConnection.getInstanceID());
            this.taskManagerTracker.removePendingTaskManager(pendingTaskManager.getPendingTaskManagerId());
            return SlotManager.RegistrationResult.SUCCESS;
        }
        this.checkResourceRequirementsWithDelay();
        return SlotManager.RegistrationResult.SUCCESS;
    }

    private void declareNeededResourcesWithDelay() {
        Preconditions.checkState((boolean)this.resourceAllocator.isSupported());
        if (this.declareNeededResourceDelay.toMillis() <= 0L) {
            this.declareNeededResources();
        } else if (this.declareNeededResourceFuture == null || this.declareNeededResourceFuture.isDone()) {
            this.declareNeededResourceFuture = new CompletableFuture();
            this.scheduledExecutor.schedule(() -> this.mainThreadExecutor.execute(() -> {
                this.declareNeededResources();
                ((CompletableFuture)Preconditions.checkNotNull(this.declareNeededResourceFuture)).complete(null);
            }), this.declareNeededResourceDelay.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void declareNeededResources() {
        Map<InstanceID, WorkerResourceSpec> unWantedTaskManagers = this.taskManagerTracker.getUnWantedTaskManager();
        Map unWantedTaskManagerBySpec = unWantedTaskManagers.entrySet().stream().collect(Collectors.groupingBy(Map.Entry::getValue, Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
        Stream<WorkerResourceSpec> registeredTaskManagerStream = this.taskManagerTracker.getRegisteredTaskManagers().stream().filter(t -> !unWantedTaskManagers.containsKey((Object)t.getInstanceId())).map(t -> WorkerResourceSpec.fromTotalResourceProfile(t.getTotalResource(), t.getDefaultNumSlots()));
        Stream<WorkerResourceSpec> pendingTaskManagerStream = this.taskManagerTracker.getPendingTaskManagers().stream().map(t -> WorkerResourceSpec.fromTotalResourceProfile(t.getTotalResourceProfile(), t.getNumSlots()));
        Map requiredWorkers = Stream.concat(registeredTaskManagerStream, pendingTaskManagerStream).collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(e -> 1)));
        HashSet<WorkerResourceSpec> workerResourceSpecs = new HashSet<WorkerResourceSpec>(requiredWorkers.keySet());
        workerResourceSpecs.addAll(unWantedTaskManagerBySpec.keySet());
        ArrayList<ResourceDeclaration> resourceDeclarations = new ArrayList<ResourceDeclaration>();
        workerResourceSpecs.forEach(spec -> resourceDeclarations.add(new ResourceDeclaration((WorkerResourceSpec)spec, requiredWorkers.getOrDefault(spec, 0), unWantedTaskManagerBySpec.getOrDefault(spec, Collections.emptySet()))));
        this.resourceAllocator.declareResourceNeeded(resourceDeclarations);
    }

    private void allocateSlotsForRegisteredPendingTaskManager(PendingTaskManager pendingTaskManager, InstanceID instanceId) {
        Map<JobID, Map<InstanceID, ResourceCounter>> allocations = pendingTaskManager.getPendingSlotAllocationRecords().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.singletonMap(instanceId, e.getValue())));
        this.allocateSlotsAccordingTo(allocations);
    }

    private Optional<PendingTaskManager> findMatchingPendingTaskManager(ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) {
        Collection<PendingTaskManager> matchedPendingTaskManagers = this.taskManagerTracker.getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(totalResourceProfile, defaultSlotResourceProfile);
        Optional<PendingTaskManager> matchedPendingTaskManagerIdsWithAllocatedSlots = matchedPendingTaskManagers.stream().filter(pendingTaskManager -> !pendingTaskManager.getPendingSlotAllocationRecords().isEmpty()).findAny();
        if (matchedPendingTaskManagerIdsWithAllocatedSlots.isPresent()) {
            return matchedPendingTaskManagerIdsWithAllocatedSlots;
        }
        return matchedPendingTaskManagers.stream().findAny();
    }

    @Override
    public boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {
        this.checkInit();
        LOG.info("Unregistering task executor {} from the slot manager.", (Object)instanceId);
        if (this.taskManagerTracker.getRegisteredTaskManager(instanceId).isPresent()) {
            HashSet<AllocationID> allocatedSlots = new HashSet<AllocationID>(this.taskManagerTracker.getRegisteredTaskManager(instanceId).get().getAllocatedSlots().keySet());
            for (AllocationID allocationId : allocatedSlots) {
                this.slotStatusSyncer.freeSlot(allocationId);
            }
            this.taskManagerTracker.removeTaskManager(instanceId);
            if (!allocatedSlots.isEmpty()) {
                this.checkResourceRequirementsWithDelay();
            }
            return true;
        }
        LOG.debug("There is no task executor registered with instance ID {}. Ignoring this message.", (Object)instanceId);
        return false;
    }

    @Override
    public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
        this.checkInit();
        LOG.debug("Received slot report from instance {}: {}.", (Object)instanceId, (Object)slotReport);
        if (this.taskManagerTracker.getRegisteredTaskManager(instanceId).isPresent()) {
            if (!this.slotStatusSyncer.reportSlotStatus(instanceId, slotReport)) {
                this.checkResourceRequirementsWithDelay();
            }
            return true;
        }
        LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", (Object)instanceId);
        return false;
    }

    @Override
    public void freeSlot(SlotID slotId, AllocationID allocationId) {
        this.checkInit();
        LOG.debug("Freeing slot {}.", (Object)allocationId);
        if (this.taskManagerTracker.getAllocatedOrPendingSlot(allocationId).isPresent()) {
            this.slotStatusSyncer.freeSlot(allocationId);
            this.checkResourceRequirementsWithDelay();
        } else {
            LOG.debug("Trying to free a slot {} which has not been allocated. Ignoring this message.", (Object)allocationId);
        }
    }

    private void checkResourceRequirementsWithDelay() {
        if (this.requirementsCheckDelay.toMillis() <= 0L) {
            this.checkResourceRequirements();
        } else if (this.requirementsCheckFuture == null || this.requirementsCheckFuture.isDone()) {
            this.requirementsCheckFuture = new CompletableFuture();
            this.scheduledExecutor.schedule(() -> this.mainThreadExecutor.execute(() -> {
                this.checkResourceRequirements();
                ((CompletableFuture)Preconditions.checkNotNull(this.requirementsCheckFuture)).complete(null);
            }), this.requirementsCheckDelay.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void checkResourceRequirements() {
        Set<PendingTaskManagerId> failAllocations;
        if (!this.started) {
            return;
        }
        Map<JobID, Collection<ResourceRequirement>> missingResources = this.resourceTracker.getMissingResources();
        if (missingResources.isEmpty()) {
            if (this.resourceAllocator.isSupported() && !this.taskManagerTracker.getPendingTaskManagers().isEmpty()) {
                this.taskManagerTracker.replaceAllPendingAllocations(Collections.emptyMap());
                this.checkResourcesNeedReconcile();
                this.declareNeededResourcesWithDelay();
            }
            return;
        }
        this.logMissingAndAvailableResource(missingResources);
        missingResources = missingResources.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList((Collection)e.getValue())));
        ResourceAllocationResult result = this.resourceAllocationStrategy.tryFulfillRequirements(missingResources, this.taskManagerTracker, this::isBlockedTaskManager);
        this.allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
        if (this.resourceAllocator.isSupported()) {
            failAllocations = this.allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());
            HashMap<PendingTaskManagerId, Map<JobID, ResourceCounter>> pendingResourceAllocationResult = new HashMap<PendingTaskManagerId, Map<JobID, ResourceCounter>>(result.getAllocationsOnPendingResources());
            pendingResourceAllocationResult.keySet().removeAll(failAllocations);
            this.taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);
        } else {
            failAllocations = result.getPendingTaskManagersToAllocate().stream().map(PendingTaskManager::getPendingTaskManagerId).collect(Collectors.toSet());
        }
        this.unfulfillableJobs.clear();
        this.unfulfillableJobs.addAll(result.getUnfulfillableJobs());
        for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
            this.unfulfillableJobs.addAll(result.getAllocationsOnPendingResources().get((Object)pendingTaskManagerId).keySet());
        }
        if (this.sendNotEnoughResourceNotifications) {
            for (JobID jobId : this.unfulfillableJobs) {
                LOG.warn("Could not fulfill resource requirements of job {}.", (Object)jobId);
                this.resourceEventListener.notEnoughResourceAvailable(jobId, this.resourceTracker.getAcquiredResources(jobId));
            }
        }
        if (this.resourceAllocator.isSupported()) {
            this.checkResourcesNeedReconcile();
            this.declareNeededResourcesWithDelay();
        }
    }

    private void logMissingAndAvailableResource(Map<JobID, Collection<ResourceRequirement>> missingResources) {
        StringJoiner lines = new StringJoiner(System.lineSeparator());
        lines.add("Matching resource requirements against available resources.");
        lines.add("Missing resources:");
        missingResources.forEach((jobId, resources) -> {
            lines.add("\t Job " + jobId);
            resources.forEach(resource -> lines.add(String.format("\t\t%s", resource)));
        });
        lines.add("Current resources:");
        if (this.taskManagerTracker.getRegisteredTaskManagers().isEmpty()) {
            lines.add("\t(none)");
        } else {
            for (TaskManagerInfo taskManagerInfo : this.taskManagerTracker.getRegisteredTaskManagers()) {
                ResourceID resourceId = taskManagerInfo.getTaskExecutorConnection().getResourceID();
                lines.add("\tTaskManager " + resourceId);
                lines.add("\t\tAvailable: " + taskManagerInfo.getAvailableResource());
                lines.add("\t\tTotal:     " + taskManagerInfo.getTotalResource());
            }
        }
        LOG.info(lines.toString());
    }

    private void allocateSlotsAccordingTo(Map<JobID, Map<InstanceID, ResourceCounter>> result) {
        ArrayList<CompletableFuture<Void>> allocationFutures = new ArrayList<CompletableFuture<Void>>();
        for (Map.Entry<JobID, Map<InstanceID, ResourceCounter>> jobEntry : result.entrySet()) {
            JobID jobID = jobEntry.getKey();
            for (Map.Entry<InstanceID, ResourceCounter> tmEntry : jobEntry.getValue().entrySet()) {
                InstanceID instanceID = tmEntry.getKey();
                for (Map.Entry<ResourceProfile, Integer> slotEntry : tmEntry.getValue().getResourcesWithCount()) {
                    for (int i = 0; i < slotEntry.getValue(); ++i) {
                        allocationFutures.add(this.slotStatusSyncer.allocateSlot(instanceID, jobID, this.jobMasterTargetAddresses.get(jobID), slotEntry.getKey()));
                    }
                }
            }
        }
        FutureUtils.combineAll(allocationFutures).whenCompleteAsync((s, t) -> {
            if (t != null) {
                this.checkResourceRequirementsWithDelay();
            }
        }, this.mainThreadExecutor);
    }

    private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(List<PendingTaskManager> pendingTaskManagers) {
        Preconditions.checkState((boolean)this.resourceAllocator.isSupported());
        HashSet<PendingTaskManagerId> failedAllocations = new HashSet<PendingTaskManagerId>();
        for (PendingTaskManager pendingTaskManager : pendingTaskManagers) {
            if (this.allocateResource(pendingTaskManager)) continue;
            failedAllocations.add(pendingTaskManager.getPendingTaskManagerId());
        }
        return failedAllocations;
    }

    @Override
    public int getNumberRegisteredSlots() {
        return this.taskManagerTracker.getNumberRegisteredSlots();
    }

    @Override
    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
        return this.taskManagerTracker.getNumberRegisteredSlotsOf(instanceId);
    }

    @Override
    public int getNumberFreeSlots() {
        return this.taskManagerTracker.getNumberFreeSlots();
    }

    @Override
    public int getNumberFreeSlotsOf(InstanceID instanceId) {
        return this.taskManagerTracker.getNumberFreeSlotsOf(instanceId);
    }

    @Override
    public ResourceProfile getRegisteredResource() {
        return this.taskManagerTracker.getRegisteredResource();
    }

    @Override
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
        return this.taskManagerTracker.getRegisteredResourceOf(instanceID);
    }

    @Override
    public ResourceProfile getFreeResource() {
        return this.taskManagerTracker.getFreeResource();
    }

    @Override
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
        return this.taskManagerTracker.getFreeResourceOf(instanceID);
    }

    @Override
    public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
        return ((Collection)this.taskManagerTracker.getRegisteredTaskManager(instanceID).map(TaskManagerInfo::getAllocatedSlots).map(Map::values).orElse(Collections.emptyList())).stream().map(slot -> new SlotInfo(slot.getJobId(), slot.getResourceProfile())).collect(Collectors.toList());
    }

    private void checkClusterReconciliation() {
        if (this.checkResourcesNeedReconcile()) {
            this.declareNeededResourcesWithDelay();
        }
    }

    private boolean checkResourcesNeedReconcile() {
        ResourceReconcileResult reconcileResult = this.resourceAllocationStrategy.tryReconcileClusterResources(this.taskManagerTracker);
        reconcileResult.getPendingTaskManagersToRelease().stream().map(PendingTaskManager::getPendingTaskManagerId).forEach(this.taskManagerTracker::removePendingTaskManager);
        for (TaskManagerInfo taskManagerToRelease : reconcileResult.getTaskManagersToRelease()) {
            if (this.waitResultConsumedBeforeRelease) {
                this.releaseIdleTaskExecutorIfPossible(taskManagerToRelease);
                continue;
            }
            this.releaseIdleTaskExecutor(taskManagerToRelease.getInstanceId());
        }
        reconcileResult.getPendingTaskManagersToAllocate().forEach(this::allocateResource);
        return reconcileResult.needReconcile();
    }

    private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) {
        long idleSince = taskManagerInfo.getIdleSince();
        taskManagerInfo.getTaskExecutorConnection().getTaskExecutorGateway().canBeReleased().thenAcceptAsync(canBeReleased -> {
            boolean stillIdle;
            boolean bl = stillIdle = idleSince == taskManagerInfo.getIdleSince();
            if (stillIdle && canBeReleased.booleanValue()) {
                this.releaseIdleTaskExecutor(taskManagerInfo.getInstanceId());
                this.declareNeededResourcesWithDelay();
            }
        }, this.mainThreadExecutor);
    }

    private void releaseIdleTaskExecutor(InstanceID taskManagerToRelease) {
        Preconditions.checkState((boolean)this.resourceAllocator.isSupported());
        this.taskManagerTracker.addUnWantedTaskManager(taskManagerToRelease);
    }

    private boolean allocateResource(PendingTaskManager pendingTaskManager) {
        Preconditions.checkState((boolean)this.resourceAllocator.isSupported());
        if (this.isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
            LOG.info("Could not allocate {}. Max total resource limitation <{}, {}> is reached.", new Object[]{pendingTaskManager, this.maxTotalCpu, this.maxTotalMem.toHumanReadableString()});
            return false;
        }
        this.taskManagerTracker.addPendingTaskManager(pendingTaskManager);
        return true;
    }

    @VisibleForTesting
    public long getTaskManagerIdleSince(InstanceID instanceId) {
        return this.taskManagerTracker.getRegisteredTaskManager(instanceId).map(TaskManagerInfo::getIdleSince).orElse(0L);
    }

    private void checkInit() {
        Preconditions.checkState((boolean)this.started, (Object)"The slot manager has not been started.");
        Preconditions.checkNotNull((Object)((Object)this.resourceManagerId));
        Preconditions.checkNotNull((Object)this.mainThreadExecutor);
        Preconditions.checkNotNull((Object)this.resourceAllocator);
        Preconditions.checkNotNull((Object)this.resourceEventListener);
    }

    private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) {
        ResourceProfile totalResourceAfterAdding = newResource.merge(this.taskManagerTracker.getRegisteredResource()).merge(this.taskManagerTracker.getPendingResource());
        return totalResourceAfterAdding.getCpuCores().compareTo((Resource)this.maxTotalCpu) > 0 || totalResourceAfterAdding.getTotalMemory().compareTo(this.maxTotalMem) > 0;
    }

    private boolean isBlockedTaskManager(ResourceID resourceID) {
        Preconditions.checkNotNull((Object)this.blockedTaskManagerChecker);
        return this.blockedTaskManagerChecker.isBlockedTaskManager(resourceID);
    }
}

