/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.jobmaster.slotpool.NoOpDeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeclarativeSlotPoolService
implements SlotPoolService {
    private final JobID jobId;
    private final Time rpcTimeout;
    private final DeclarativeSlotPool declarativeSlotPool;
    private final Clock clock;
    private final Set<ResourceID> registeredTaskManagers;
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private DeclareResourceRequirementServiceConnectionManager resourceRequirementServiceConnectionManager = NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
    @Nullable
    private JobMasterId jobMasterId;
    @Nullable
    private String jobManagerAddress;
    private State state = State.CREATED;

    public DeclarativeSlotPoolService(JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time idleSlotTimeout, Time rpcTimeout) {
        this.jobId = jobId;
        this.clock = clock;
        this.rpcTimeout = rpcTimeout;
        this.registeredTaskManagers = new HashSet<ResourceID>();
        this.declarativeSlotPool = declarativeSlotPoolFactory.create(jobId, this::declareResourceRequirements, idleSlotTimeout, rpcTimeout);
    }

    protected DeclarativeSlotPool getDeclarativeSlotPool() {
        return this.declarativeSlotPool;
    }

    protected long getRelativeTimeMillis() {
        return this.clock.relativeTimeMillis();
    }

    @Override
    public <T> Optional<T> castInto(Class<T> clazz) {
        if (clazz.isAssignableFrom(this.declarativeSlotPool.getClass())) {
            return Optional.of(clazz.cast(this.declarativeSlotPool));
        }
        return Optional.empty();
    }

    @Override
    public final void start(JobMasterId jobMasterId, String address, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        Preconditions.checkState((this.state == State.CREATED ? 1 : 0) != 0, (Object)"The DeclarativeSlotPoolService can only be started once.");
        this.jobMasterId = (JobMasterId)((Object)Preconditions.checkNotNull((Object)((Object)jobMasterId)));
        this.jobManagerAddress = (String)Preconditions.checkNotNull((Object)address);
        this.resourceRequirementServiceConnectionManager = DefaultDeclareResourceRequirementServiceConnectionManager.create(mainThreadExecutor);
        this.onStart(mainThreadExecutor);
        this.state = State.STARTED;
    }

    protected void onStart(ComponentMainThreadExecutor componentMainThreadExecutor) {
    }

    protected void assertHasBeenStarted() {
        Preconditions.checkState((this.state == State.STARTED ? 1 : 0) != 0, (Object)"The DeclarativeSlotPoolService has to be started.");
    }

    @Override
    public final void close() {
        if (this.state != State.CLOSED) {
            this.onClose();
            this.resourceRequirementServiceConnectionManager.close();
            this.resourceRequirementServiceConnectionManager = NoOpDeclareResourceRequirementServiceConnectionManager.INSTANCE;
            this.releaseAllTaskManagers((Exception)new FlinkException("The DeclarativeSlotPoolService is being closed."));
            this.state = State.CLOSED;
        }
    }

    protected void onClose() {
    }

    @Override
    public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
        this.assertHasBeenStarted();
        if (!this.isTaskManagerRegistered(taskManagerLocation.getResourceID())) {
            this.log.debug("Ignoring offered slots from unknown task manager {}.", (Object)taskManagerLocation.getResourceID());
            return Collections.emptyList();
        }
        return this.declarativeSlotPool.offerSlots(offers, taskManagerLocation, taskManagerGateway, this.clock.relativeTimeMillis());
    }

    boolean isTaskManagerRegistered(ResourceID taskManagerId) {
        return this.registeredTaskManagers.contains(taskManagerId);
    }

    @Override
    public Optional<ResourceID> failAllocation(@Nullable ResourceID taskManagerId, AllocationID allocationId, Exception cause) {
        this.assertHasBeenStarted();
        Preconditions.checkNotNull((Object)((Object)allocationId));
        Preconditions.checkNotNull((Object)taskManagerId, (String)"This slot pool only supports failAllocation calls coming from the TaskExecutor.");
        ResourceCounter previouslyFulfilledRequirements = this.declarativeSlotPool.releaseSlot(allocationId, cause);
        this.onFailAllocation(previouslyFulfilledRequirements);
        if (this.declarativeSlotPool.containsSlots(taskManagerId)) {
            return Optional.empty();
        }
        return Optional.of(taskManagerId);
    }

    protected void onFailAllocation(ResourceCounter previouslyFulfilledRequirements) {
    }

    @Override
    public boolean registerTaskManager(ResourceID taskManagerId) {
        this.assertHasBeenStarted();
        this.log.debug("Register new TaskExecutor {}.", (Object)taskManagerId);
        return this.registeredTaskManagers.add(taskManagerId);
    }

    @Override
    public boolean releaseTaskManager(ResourceID taskManagerId, Exception cause) {
        this.assertHasBeenStarted();
        if (this.registeredTaskManagers.remove(taskManagerId)) {
            this.internalReleaseTaskManager(taskManagerId, cause);
            return true;
        }
        return false;
    }

    private void releaseAllTaskManagers(Exception cause) {
        for (ResourceID registeredTaskManager : this.registeredTaskManagers) {
            this.internalReleaseTaskManager(registeredTaskManager, cause);
        }
        this.registeredTaskManagers.clear();
    }

    private void internalReleaseTaskManager(ResourceID taskManagerId, Exception cause) {
        this.assertHasBeenStarted();
        ResourceCounter previouslyFulfilledRequirement = this.declarativeSlotPool.releaseSlots(taskManagerId, cause);
        this.onReleaseTaskManager(previouslyFulfilledRequirement);
    }

    protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequirement) {
    }

    @Override
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.assertHasBeenStarted();
        this.resourceRequirementServiceConnectionManager.connect(resourceRequirements -> resourceManagerGateway.declareRequiredResources(this.jobMasterId, resourceRequirements, this.rpcTimeout));
        this.declareResourceRequirements(this.declarativeSlotPool.getResourceRequirements());
    }

    private void declareResourceRequirements(Collection<ResourceRequirement> resourceRequirements) {
        this.assertHasBeenStarted();
        this.resourceRequirementServiceConnectionManager.declareResourceRequirements(ResourceRequirements.create(this.jobId, this.jobManagerAddress, resourceRequirements));
    }

    @Override
    public void disconnectResourceManager() {
        this.assertHasBeenStarted();
        this.resourceRequirementServiceConnectionManager.disconnect();
    }

    @Override
    public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
        this.assertHasBeenStarted();
        ArrayList<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<AllocatedSlotInfo>();
        for (SlotInfo slotInfo : this.declarativeSlotPool.getAllSlotsInformation()) {
            if (!slotInfo.getTaskManagerLocation().getResourceID().equals(taskManagerId)) continue;
            allocatedSlotInfos.add(new AllocatedSlotInfo(slotInfo.getPhysicalSlotNumber(), slotInfo.getAllocationId()));
        }
        return new AllocatedSlotReport(this.jobId, allocatedSlotInfos);
    }

    protected String getSlotServiceStatus() {
        return String.format("Registered TMs: %d, registered slots: %d free slots: %d", this.registeredTaskManagers.size(), this.declarativeSlotPool.getAllSlotsInformation().size(), this.declarativeSlotPool.getFreeSlotsInformation().size());
    }

    private static enum State {
        CREATED,
        STARTED,
        CLOSED;

    }
}

