/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ClientQuotaControlManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerPurgatory;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.ControllerResultAndOffset;
import org.apache.kafka.controller.DeferredEvent;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.ProducerIdControlManager;
import org.apache.kafka.controller.ReplicaPlacer;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.SnapshotGenerator;
import org.apache.kafka.controller.StripedReplicaPlacer;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

public final class QuorumController
implements Controller {
    private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX = "The active controller appears to be node ";
    private static final String GENERATE_SNAPSHOT = "generateSnapshot";
    private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
    private final LogContext logContext;
    private final Logger log;
    private final int nodeId;
    private final KafkaEventQueue queue;
    private final Time time;
    private final ControllerMetrics controllerMetrics;
    private final SnapshotRegistry snapshotRegistry;
    private final ControllerPurgatory purgatory;
    private final ConfigurationControlManager configurationControl;
    private final ClientQuotaControlManager clientQuotaControlManager;
    private final ClusterControlManager clusterControl;
    private final FeatureControlManager featureControl;
    private final ProducerIdControlManager producerIdControlManager;
    private final ReplicationControlManager replicationControl;
    private final SnapshotGeneratorManager snapshotGeneratorManager = new SnapshotGeneratorManager();
    private final RaftClient<ApiMessageAndVersion> raftClient;
    private QuorumMetaLogListener metaLogListener;
    private volatile int curClaimEpoch;
    private long lastCommittedOffset = -1L;
    private int lastCommittedEpoch = -1;
    private long lastCommittedTimestamp = -1L;
    private long writeOffset;
    private final long snapshotMaxNewRecordBytes;
    private long newBytesSinceLastSnapshot = 0L;

    private NotControllerException newNotControllerException() {
        OptionalInt latestController = this.raftClient.leaderAndEpoch().leaderId();
        if (latestController.isPresent()) {
            return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + latestController.getAsInt());
        }
        return new NotControllerException("No controller appears to be active.");
    }

    public static int exceptionToApparentController(NotControllerException e) {
        if (e.getMessage().startsWith(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX)) {
            return Integer.parseInt(e.getMessage().substring(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX.length()));
        }
        return -1;
    }

    private void handleEventEnd(String name, long startProcessingTimeNs) {
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs;
        this.log.debug("Processed {} in {} us", (Object)name, (Object)TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS));
        this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(deltaNs));
    }

    private Throwable handleEventException(String name, Optional<Long> startProcessingTimeNs, Throwable exception) {
        if (!startProcessingTimeNs.isPresent()) {
            this.log.info("unable to start processing {} because of {}.", (Object)name, (Object)exception.getClass().getSimpleName());
            if (exception instanceof ApiException) {
                return exception;
            }
            return new UnknownServerException(exception);
        }
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs.get();
        long deltaUs = TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS);
        if (exception instanceof ApiException) {
            this.log.info("{}: failed with {} in {} us", new Object[]{name, exception.getClass().getSimpleName(), deltaUs});
            return exception;
        }
        this.log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  Reverting to last committed offset {}.", new Object[]{this, exception.getClass().getSimpleName(), this.curClaimEpoch, deltaUs, this.lastCommittedOffset, exception});
        this.raftClient.resign(this.curClaimEpoch);
        this.renounce();
        return new UnknownServerException(exception);
    }

    private void appendControlEvent(String name, Runnable handler) {
        ControlEvent event = new ControlEvent(name, handler);
        this.queue.append((EventQueue.Event)event);
    }

    ReplicationControlManager replicationControl() {
        return this.replicationControl;
    }

    <T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) {
        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
        this.queue.append(event);
        return event.future();
    }

    <T> CompletableFuture<T> appendReadEvent(String name, long deadlineNs, Supplier<T> handler) {
        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
        this.queue.appendWithDeadline(deadlineNs, event);
        return event.future();
    }

    private <T> CompletableFuture<T> appendWriteEvent(String name, long deadlineNs, ControllerWriteOperation<T> op) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op);
        this.queue.appendWithDeadline(deadlineNs, event);
        return event.future();
    }

    private <T> CompletableFuture<T> appendWriteEvent(String name, ControllerWriteOperation<T> op) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op);
        this.queue.append(event);
        return event.future();
    }

    private void renounce() {
        this.curClaimEpoch = -1;
        this.controllerMetrics.setActive(false);
        this.purgatory.failAll((Exception)this.newNotControllerException());
        if (this.snapshotRegistry.hasSnapshot(this.lastCommittedOffset)) {
            this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
        } else {
            this.resetState();
            this.raftClient.unregister((RaftClient.Listener)this.metaLogListener);
            this.metaLogListener = new QuorumMetaLogListener();
            this.raftClient.register((RaftClient.Listener)this.metaLogListener);
        }
        this.writeOffset = -1L;
        this.clusterControl.deactivate();
        this.cancelMaybeFenceReplicas();
    }

    private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs, ControllerWriteOperation<T> op) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op);
        this.queue.scheduleDeferred(name, (Function)new EventQueue.EarliestDeadlineFunction(deadlineNs), event);
        ((ControllerWriteEvent)event).future.exceptionally(e -> {
            if (e instanceof UnknownServerException && e.getCause() != null && e.getCause() instanceof RejectedExecutionException) {
                this.log.error("Cancelling deferred write event {} because the event queue is now closed.", (Object)name);
                return null;
            }
            if (e instanceof NotControllerException) {
                this.log.debug("Cancelling deferred write event {} because this controller is no longer active.", (Object)name);
                return null;
            }
            this.log.error("Unexpected exception while executing deferred write event {}. Rescheduling for a minute from now.", (Object)name, e);
            this.scheduleDeferredWriteEvent(name, deadlineNs + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.MINUTES), op);
            return null;
        });
    }

    private void rescheduleMaybeFenceStaleBrokers() {
        long nextCheckTimeNs = this.clusterControl.heartbeatManager().nextCheckTimeNs();
        if (nextCheckTimeNs == Long.MAX_VALUE) {
            this.cancelMaybeFenceReplicas();
            return;
        }
        this.scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
            ControllerResult<Void> result = this.replicationControl.maybeFenceOneStaleBroker();
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        });
    }

    private void cancelMaybeFenceReplicas() {
        this.queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
    }

    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
        try {
            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
            switch (type) {
                case REGISTER_BROKER_RECORD: {
                    this.clusterControl.replay((RegisterBrokerRecord)message);
                    break;
                }
                case UNREGISTER_BROKER_RECORD: {
                    this.clusterControl.replay((UnregisterBrokerRecord)message);
                    break;
                }
                case TOPIC_RECORD: {
                    this.replicationControl.replay((TopicRecord)message);
                    break;
                }
                case PARTITION_RECORD: {
                    this.replicationControl.replay((PartitionRecord)message);
                    break;
                }
                case CONFIG_RECORD: {
                    this.configurationControl.replay((ConfigRecord)message);
                    break;
                }
                case PARTITION_CHANGE_RECORD: {
                    this.replicationControl.replay((PartitionChangeRecord)message);
                    break;
                }
                case FENCE_BROKER_RECORD: {
                    this.clusterControl.replay((FenceBrokerRecord)message);
                    break;
                }
                case UNFENCE_BROKER_RECORD: {
                    this.clusterControl.replay((UnfenceBrokerRecord)message);
                    break;
                }
                case REMOVE_TOPIC_RECORD: {
                    this.replicationControl.replay((RemoveTopicRecord)message);
                    break;
                }
                case FEATURE_LEVEL_RECORD: {
                    this.featureControl.replay((FeatureLevelRecord)message);
                    break;
                }
                case CLIENT_QUOTA_RECORD: {
                    this.clientQuotaControlManager.replay((ClientQuotaRecord)message);
                    break;
                }
                case PRODUCER_IDS_RECORD: {
                    this.producerIdControlManager.replay((ProducerIdsRecord)message);
                    break;
                }
                default: {
                    throw new RuntimeException("Unhandled record type " + (Object)((Object)type));
                }
            }
        }
        catch (Exception e) {
            if (snapshotId.isPresent()) {
                this.log.error("Error replaying record {} from snapshot {} at last offset {}.", new Object[]{message.toString(), snapshotId.get(), offset, e});
            }
            this.log.error("Error replaying record {} at last offset {}.", new Object[]{message.toString(), offset, e});
        }
    }

    private void maybeGenerateSnapshot(long batchSizeInBytes) {
        this.newBytesSinceLastSnapshot += batchSizeInBytes;
        if (this.newBytesSinceLastSnapshot >= this.snapshotMaxNewRecordBytes && this.snapshotGeneratorManager.generator == null) {
            boolean isActiveController;
            boolean bl = isActiveController = this.curClaimEpoch != -1;
            if (!isActiveController) {
                this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            }
            this.log.info("Generating a snapshot that includes (epoch={}, offset={}) after {} committed bytes since the last snapshot.", new Object[]{this.lastCommittedEpoch, this.lastCommittedOffset, this.newBytesSinceLastSnapshot});
            this.snapshotGeneratorManager.createSnapshotGenerator(this.lastCommittedOffset, this.lastCommittedEpoch, this.lastCommittedTimestamp);
            this.newBytesSinceLastSnapshot = 0L;
        }
    }

    private void resetState() {
        this.snapshotGeneratorManager.cancel();
        this.snapshotRegistry.reset();
        this.newBytesSinceLastSnapshot = 0L;
        this.lastCommittedOffset = -1L;
        this.lastCommittedEpoch = -1;
        this.lastCommittedTimestamp = -1L;
    }

    private QuorumController(LogContext logContext, int nodeId, KafkaEventQueue queue, Time time, Map<ConfigResource.Type, ConfigDef> configDefs, RaftClient<ApiMessageAndVersion> raftClient, Map<String, VersionRange> supportedFeatures, short defaultReplicationFactor, int defaultNumPartitions, ReplicaPlacer replicaPlacer, long snapshotMaxNewRecordBytes, long sessionTimeoutNs, ControllerMetrics controllerMetrics) {
        this.logContext = logContext;
        this.log = logContext.logger(QuorumController.class);
        this.nodeId = nodeId;
        this.queue = queue;
        this.time = time;
        this.controllerMetrics = controllerMetrics;
        this.snapshotRegistry = new SnapshotRegistry(logContext);
        this.purgatory = new ControllerPurgatory();
        this.configurationControl = new ConfigurationControlManager(logContext, this.snapshotRegistry, configDefs);
        this.clientQuotaControlManager = new ClientQuotaControlManager(this.snapshotRegistry);
        this.clusterControl = new ClusterControlManager(logContext, time, this.snapshotRegistry, sessionTimeoutNs, replicaPlacer);
        this.featureControl = new FeatureControlManager(supportedFeatures, this.snapshotRegistry);
        this.producerIdControlManager = new ProducerIdControlManager(this.clusterControl, this.snapshotRegistry);
        this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
        this.replicationControl = new ReplicationControlManager(this.snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, this.configurationControl, this.clusterControl, controllerMetrics);
        this.raftClient = raftClient;
        this.metaLogListener = new QuorumMetaLogListener();
        this.curClaimEpoch = -1;
        this.writeOffset = -1L;
        this.resetState();
        this.raftClient.register((RaftClient.Listener)this.metaLogListener);
    }

    @Override
    public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterIsrResponseData());
        }
        return this.appendWriteEvent("alterIsr", () -> this.replicationControl.alterIsr(request));
    }

    @Override
    public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new CreateTopicsResponseData());
        }
        return this.appendWriteEvent("createTopics", this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(request.timeoutMs(), TimeUnit.MILLISECONDS), () -> this.replicationControl.createTopics(request));
    }

    @Override
    public CompletableFuture<Void> unregisterBroker(int brokerId) {
        return this.appendWriteEvent("unregisterBroker", () -> this.replicationControl.unregisterBroker(brokerId));
    }

    @Override
    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(long deadlineNs, Collection<String> names) {
        if (names.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicIds", deadlineNs, () -> this.replicationControl.findTopicIds(this.lastCommittedOffset, names));
    }

    @Override
    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(long deadlineNs, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicNames", deadlineNs, () -> this.replicationControl.findTopicNames(this.lastCommittedOffset, ids));
    }

    @Override
    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(long deadlineNs, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("deleteTopics", deadlineNs, () -> this.replicationControl.deleteTopics(ids));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(Map<ConfigResource, Collection<String>> resources) {
        return this.appendReadEvent("describeConfigs", () -> this.configurationControl.describeConfigs(this.lastCommittedOffset, resources));
    }

    @Override
    public CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
        }
        return this.appendWriteEvent("electLeaders", this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(request.timeoutMs(), TimeUnit.MILLISECONDS), () -> this.replicationControl.electLeaders(request));
    }

    @Override
    public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
        return this.appendReadEvent("getFinalizedFeatures", () -> this.featureControl.finalizedFeatures(this.lastCommittedOffset));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges, boolean validateOnly) {
        if (configChanges.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("incrementalAlterConfigs", () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.incrementalAlterConfigs(configChanges);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
        }
        return this.appendWriteEvent("alterPartitionReassignments", this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(request.timeoutMs(), TimeUnit.MILLISECONDS), () -> this.replicationControl.alterPartitionReassignments(request));
    }

    @Override
    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
        if (request.topics() != null && request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new ListPartitionReassignmentsResponseData().setErrorMessage(null));
        }
        return this.appendReadEvent("listPartitionReassignments", this.time.nanoseconds() + TimeUnit.NANOSECONDS.convert(request.timeoutMs(), TimeUnit.MILLISECONDS), () -> this.replicationControl.listPartitionReassignments(request.topics()));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
        if (newConfigs.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("legacyAlterConfigs", () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.legacyAlterConfigs(newConfigs);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(final BrokerHeartbeatRequestData request) {
        return this.appendWriteEvent("processBrokerHeartbeat", new ControllerWriteOperation<BrokerHeartbeatReply>(){
            private final int brokerId;
            private boolean inControlledShutdown;
            {
                this.brokerId = request.brokerId();
                this.inControlledShutdown = false;
            }

            @Override
            public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                ControllerResult<BrokerHeartbeatReply> result = QuorumController.this.replicationControl.processBrokerHeartbeat(request, QuorumController.this.lastCommittedOffset);
                this.inControlledShutdown = result.response().inControlledShutdown();
                QuorumController.this.rescheduleMaybeFenceStaleBrokers();
                return result;
            }

            @Override
            public void processBatchEndOffset(long offset) {
                if (this.inControlledShutdown) {
                    QuorumController.this.clusterControl.heartbeatManager().updateControlledShutdownOffset(this.brokerId, offset);
                }
            }
        });
    }

    @Override
    public CompletableFuture<BrokerRegistrationReply> registerBroker(BrokerRegistrationRequestData request) {
        return this.appendWriteEvent("registerBroker", () -> {
            ControllerResult<BrokerRegistrationReply> result = this.clusterControl.registerBroker(request, this.writeOffset + 1L, this.featureControl.finalizedFeatures(Long.MAX_VALUE));
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        });
    }

    @Override
    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
        if (quotaAlterations.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("alterClientQuotas", () -> {
            ControllerResult<Map<ClientQuotaEntity, ApiError>> result = this.clientQuotaControlManager.alterClientQuotas(quotaAlterations);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(AllocateProducerIdsRequestData request) {
        return this.appendWriteEvent("allocateProducerIds", () -> this.producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())).thenApply(result -> new AllocateProducerIdsResponseData().setProducerIdStart(result.producerIdStart()).setProducerIdLen(result.producerIdLen()));
    }

    @Override
    public CompletableFuture<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(long deadlineNs, List<CreatePartitionsRequestData.CreatePartitionsTopic> topics) {
        if (topics.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        return this.appendWriteEvent("createPartitions", deadlineNs, () -> this.replicationControl.createPartitions(topics));
    }

    @Override
    public CompletableFuture<Long> beginWritingSnapshot() {
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        this.appendControlEvent("beginWritingSnapshot", () -> {
            if (this.snapshotGeneratorManager.generator == null) {
                this.snapshotGeneratorManager.createSnapshotGenerator(this.lastCommittedOffset, this.lastCommittedEpoch, this.lastCommittedTimestamp);
            }
            future.complete(this.snapshotGeneratorManager.generator.lastContainedLogOffset());
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.appendControlEvent("waitForReadyBrokers", () -> this.clusterControl.addReadyBrokersFuture(future, minBrokers));
        return future;
    }

    @Override
    public void beginShutdown() {
        this.queue.beginShutdown("QuorumController#beginShutdown");
    }

    public int nodeId() {
        return this.nodeId;
    }

    @Override
    public int curClaimEpoch() {
        return this.curClaimEpoch;
    }

    @Override
    public void close() throws InterruptedException {
        this.queue.close();
        this.controllerMetrics.close();
    }

    CountDownLatch pause() {
        CountDownLatch latch = new CountDownLatch(1);
        this.appendControlEvent("pause", () -> {
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                this.log.info("Interrupted while waiting for unpause.", (Throwable)e);
            }
        });
        return latch;
    }

    Time time() {
        return this.time;
    }

    class QuorumMetaLogListener
    implements RaftClient.Listener<ApiMessageAndVersion> {
        QuorumMetaLogListener() {
        }

        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + "]", () -> {
                try {
                    boolean isActiveController = QuorumController.this.curClaimEpoch != -1;
                    long processedRecordsSize = 0L;
                    while (reader.hasNext()) {
                        Batch batch = (Batch)reader.next();
                        long offset = batch.lastOffset();
                        int epoch = batch.epoch();
                        List messages = batch.records();
                        if (isActiveController) {
                            QuorumController.this.log.debug("Completing purgatory items up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                            QuorumController.this.purgatory.completeUpTo(offset);
                            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset));
                        } else {
                            if (QuorumController.this.log.isDebugEnabled()) {
                                if (QuorumController.this.log.isTraceEnabled()) {
                                    QuorumController.this.log.trace("Replaying commits from the active node up to offset {} and epoch {}: {}.", new Object[]{offset, epoch, messages.stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(", "))});
                                } else {
                                    QuorumController.this.log.debug("Replaying commits from the active node up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                                }
                            }
                            for (ApiMessageAndVersion messageAndVersion : messages) {
                                QuorumController.this.replay(messageAndVersion.message(), Optional.empty(), offset);
                            }
                        }
                        QuorumController.this.lastCommittedOffset = offset;
                        QuorumController.this.lastCommittedEpoch = epoch;
                        QuorumController.this.lastCommittedTimestamp = batch.appendTimestamp();
                        processedRecordsSize += (long)batch.sizeInBytes();
                    }
                    QuorumController.this.maybeGenerateSnapshot(processedRecordsSize);
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                try {
                    boolean isActiveController;
                    boolean bl = isActiveController = QuorumController.this.curClaimEpoch != -1;
                    if (isActiveController) {
                        throw new IllegalStateException(String.format("Asked to load snasphot (%s) when it is the active controller (%s)", reader.snapshotId(), QuorumController.this.curClaimEpoch));
                    }
                    QuorumController.this.log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})", new Object[]{reader.snapshotId(), QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.resetState();
                    while (reader.hasNext()) {
                        Batch batch = reader.next();
                        long offset = batch.lastOffset();
                        List messages = batch.records();
                        if (QuorumController.this.log.isDebugEnabled()) {
                            if (QuorumController.this.log.isTraceEnabled()) {
                                QuorumController.this.log.trace("Replaying snapshot ({}) batch with last offset of {}: {}", new Object[]{reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(", "))});
                            } else {
                                QuorumController.this.log.debug("Replaying snapshot ({}) batch with last offset of {}", (Object)reader.snapshotId(), (Object)offset);
                            }
                        }
                        for (ApiMessageAndVersion messageAndVersion : messages) {
                            QuorumController.this.replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
                        }
                    }
                    QuorumController.this.lastCommittedOffset = reader.lastContainedLogOffset();
                    QuorumController.this.lastCommittedEpoch = reader.lastContainedLogEpoch();
                    QuorumController.this.lastCommittedTimestamp = reader.lastContainedLogTimestamp();
                    QuorumController.this.snapshotRegistry.getOrCreateSnapshot(QuorumController.this.lastCommittedOffset);
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleLeaderChange(LeaderAndEpoch newLeader) {
            if (newLeader.isLeader(QuorumController.this.nodeId)) {
                int newEpoch = newLeader.epoch();
                this.appendRaftEvent("handleLeaderChange[" + newEpoch + "]", () -> {
                    int curEpoch = QuorumController.this.curClaimEpoch;
                    if (curEpoch != -1) {
                        throw new RuntimeException("Tried to claim controller epoch " + newEpoch + ", but we never renounced controller epoch " + curEpoch);
                    }
                    QuorumController.this.log.info("Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.", new Object[]{newEpoch, QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.curClaimEpoch = newEpoch;
                    QuorumController.this.controllerMetrics.setActive(true);
                    QuorumController.this.writeOffset = QuorumController.this.lastCommittedOffset;
                    QuorumController.this.clusterControl.activate();
                    QuorumController.this.snapshotRegistry.getOrCreateSnapshot(QuorumController.this.lastCommittedOffset);
                });
            } else if (QuorumController.this.curClaimEpoch != -1) {
                this.appendRaftEvent("handleRenounce[" + QuorumController.this.curClaimEpoch + "]", () -> {
                    QuorumController.this.log.warn("Renouncing the leadership at oldEpoch {} due to a metadata log event. Reverting to last committed offset {}.", (Object)QuorumController.this.curClaimEpoch, (Object)QuorumController.this.lastCommittedOffset);
                    QuorumController.this.renounce();
                });
            }
        }

        public void beginShutdown() {
            QuorumController.this.queue.beginShutdown("MetaLogManager.Listener");
        }

        private void appendRaftEvent(String name, Runnable runnable) {
            QuorumController.this.appendControlEvent(name, () -> {
                if (this != QuorumController.this.metaLogListener) {
                    QuorumController.this.log.debug("Ignoring {} raft event from an old registration", (Object)name);
                } else {
                    runnable.run();
                }
            });
        }
    }

    class ControllerWriteEvent<T>
    implements EventQueue.Event,
    DeferredEvent {
        private final String name;
        private final CompletableFuture<T> future;
        private final ControllerWriteOperation<T> op;
        private final long eventCreatedTimeNs;
        private Optional<Long> startProcessingTimeNs;
        private ControllerResultAndOffset<T> resultAndOffset;

        ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = Optional.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.op = op;
            this.resultAndOffset = null;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            int controllerEpoch = QuorumController.this.curClaimEpoch;
            if (controllerEpoch == -1) {
                throw QuorumController.this.newNotControllerException();
            }
            this.startProcessingTimeNs = Optional.of(now);
            ControllerResult<T> result = this.op.generateRecordsAndResult();
            if (result.records().isEmpty()) {
                this.op.processBatchEndOffset(QuorumController.this.writeOffset);
                Optional<Long> maybeOffset = QuorumController.this.purgatory.highestPendingOffset();
                if (!maybeOffset.isPresent()) {
                    this.resultAndOffset = ControllerResultAndOffset.of(-1L, result);
                    QuorumController.this.log.debug("Completing read-only operation {} immediately because the purgatory is empty.", (Object)this);
                    this.complete(null);
                    return;
                }
                this.resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
                QuorumController.this.log.debug("Read-only operation {} will be completed when the log reaches offset {}", (Object)this, (Object)this.resultAndOffset.offset());
            } else {
                long offset = result.isAtomic() ? QuorumController.this.raftClient.scheduleAtomicAppend(controllerEpoch, result.records()) : QuorumController.this.raftClient.scheduleAppend(controllerEpoch, result.records());
                this.op.processBatchEndOffset(offset);
                QuorumController.this.writeOffset = offset;
                this.resultAndOffset = ControllerResultAndOffset.of(offset, result);
                for (ApiMessageAndVersion message : result.records()) {
                    QuorumController.this.replay(message.message(), Optional.empty(), offset);
                }
                QuorumController.this.snapshotRegistry.getOrCreateSnapshot(offset);
                QuorumController.this.log.debug("Read-write operation {} will be completed when the log reaches offset {}.", (Object)this, (Object)this.resultAndOffset.offset());
            }
            QuorumController.this.purgatory.add(this.resultAndOffset.offset(), this);
        }

        public void handleException(Throwable exception) {
            this.complete(exception);
        }

        @Override
        public void complete(Throwable exception) {
            if (exception == null) {
                QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.get());
                this.future.complete(this.resultAndOffset.response());
            } else {
                this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
            }
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    static interface ControllerWriteOperation<T> {
        public ControllerResult<T> generateRecordsAndResult() throws Exception;

        default public void processBatchEndOffset(long offset) {
        }
    }

    class ControllerReadEvent<T>
    implements EventQueue.Event {
        private final String name;
        private final CompletableFuture<T> future;
        private final Supplier<T> handler;
        private final long eventCreatedTimeNs;
        private Optional<Long> startProcessingTimeNs;

        ControllerReadEvent(String name, Supplier<T> handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = Optional.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.handler = handler;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = Optional.of(now);
            T value = this.handler.get();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.get());
            this.future.complete(value);
        }

        public void handleException(Throwable exception) {
            this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    class SnapshotGeneratorManager
    implements Runnable {
        private final ExponentialBackoff exponentialBackoff = new ExponentialBackoff(10L, 2, 5000L, 0.0);
        private SnapshotGenerator generator = null;

        SnapshotGeneratorManager() {
        }

        void createSnapshotGenerator(long committedOffset, int committedEpoch, long committedTimestamp) {
            if (this.generator != null) {
                throw new RuntimeException("Snapshot generator already exists.");
            }
            if (!QuorumController.this.snapshotRegistry.hasSnapshot(committedOffset)) {
                throw new RuntimeException(String.format("Cannot generate a snapshot at committed offset %s because it does not exists in the snapshot registry.", committedOffset));
            }
            Optional writer = QuorumController.this.raftClient.createSnapshot(committedOffset, committedEpoch, committedTimestamp);
            if (writer.isPresent()) {
                this.generator = new SnapshotGenerator(QuorumController.this.logContext, (SnapshotWriter<ApiMessageAndVersion>)((SnapshotWriter)writer.get()), 10, this.exponentialBackoff, Arrays.asList(new SnapshotGenerator.Section("features", QuorumController.this.featureControl.iterator(committedOffset)), new SnapshotGenerator.Section("cluster", QuorumController.this.clusterControl.iterator(committedOffset)), new SnapshotGenerator.Section("replication", QuorumController.this.replicationControl.iterator(committedOffset)), new SnapshotGenerator.Section("configuration", QuorumController.this.configurationControl.iterator(committedOffset)), new SnapshotGenerator.Section("clientQuotas", QuorumController.this.clientQuotaControlManager.iterator(committedOffset)), new SnapshotGenerator.Section("producerIds", QuorumController.this.producerIdControlManager.iterator(committedOffset))));
                this.reschedule(0L);
            } else {
                QuorumController.this.log.info("Skipping generation of snapshot for committed offset {} and epoch {} since it already exists", (Object)committedOffset, (Object)committedEpoch);
            }
        }

        void cancel() {
            if (this.generator == null) {
                return;
            }
            QuorumController.this.log.error("Cancelling snapshot {}", (Object)this.generator.lastContainedLogOffset());
            this.generator.writer().close();
            this.generator = null;
            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.lastCommittedOffset);
            QuorumController.this.queue.cancelDeferred(QuorumController.GENERATE_SNAPSHOT);
        }

        void reschedule(long delayNs) {
            ControlEvent event = new ControlEvent(QuorumController.GENERATE_SNAPSHOT, this);
            QuorumController.this.queue.scheduleDeferred(event.name, (Function)new EventQueue.EarliestDeadlineFunction(QuorumController.this.time.nanoseconds() + delayNs), (EventQueue.Event)event);
        }

        @Override
        public void run() {
            OptionalLong nextDelay;
            if (this.generator == null) {
                QuorumController.this.log.debug("No snapshot is in progress.");
                return;
            }
            try {
                nextDelay = this.generator.generateBatches();
            }
            catch (Exception e) {
                QuorumController.this.log.error("Error while generating snapshot {}", (Object)this.generator.lastContainedLogOffset(), (Object)e);
                this.generator.writer().close();
                this.generator = null;
                return;
            }
            if (!nextDelay.isPresent()) {
                QuorumController.this.log.info("Finished generating snapshot {}.", (Object)this.generator.lastContainedLogOffset());
                this.generator.writer().close();
                this.generator = null;
                QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(QuorumController.this.lastCommittedOffset);
                return;
            }
            this.reschedule(nextDelay.getAsLong());
        }

        OptionalLong snapshotLastOffsetFromLog() {
            if (this.generator == null) {
                return OptionalLong.empty();
            }
            return OptionalLong.of(this.generator.lastContainedLogOffset());
        }
    }

    class ControlEvent
    implements EventQueue.Event {
        private final String name;
        private final Runnable handler;
        private final long eventCreatedTimeNs;
        private Optional<Long> startProcessingTimeNs;

        ControlEvent(String name, Runnable handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = Optional.empty();
            this.name = name;
            this.handler = handler;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = Optional.of(now);
            QuorumController.this.log.debug("Executing {}.", (Object)this);
            this.handler.run();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.get());
        }

        public void handleException(Throwable exception) {
            QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception);
        }

        public String toString() {
            return this.name;
        }
    }

    public static class Builder {
        private final int nodeId;
        private Time time = Time.SYSTEM;
        private String threadNamePrefix = null;
        private LogContext logContext = null;
        private Map<ConfigResource.Type, ConfigDef> configDefs = Collections.emptyMap();
        private RaftClient<ApiMessageAndVersion> raftClient = null;
        private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
        private short defaultReplicationFactor = (short)3;
        private int defaultNumPartitions = 1;
        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
        private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
        private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18L, TimeUnit.SECONDS);
        private ControllerMetrics controllerMetrics = null;

        public Builder(int nodeId) {
            this.nodeId = nodeId;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
            return this;
        }

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder setConfigDefs(Map<ConfigResource.Type, ConfigDef> configDefs) {
            this.configDefs = configDefs;
            return this;
        }

        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> logManager) {
            this.raftClient = logManager;
            return this;
        }

        public Builder setSupportedFeatures(Map<String, VersionRange> supportedFeatures) {
            this.supportedFeatures = supportedFeatures;
            return this;
        }

        public Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        public Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        public Builder setSnapshotMaxNewRecordBytes(long value) {
            this.snapshotMaxNewRecordBytes = value;
            return this;
        }

        public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
            this.sessionTimeoutNs = sessionTimeoutNs;
            return this;
        }

        public Builder setMetrics(ControllerMetrics controllerMetrics) {
            this.controllerMetrics = controllerMetrics;
            return this;
        }

        public QuorumController build() throws Exception {
            if (this.raftClient == null) {
                throw new RuntimeException("You must set a raft client.");
            }
            if (this.threadNamePrefix == null) {
                this.threadNamePrefix = String.format("Node%d_", this.nodeId);
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(String.format("[Controller %d] ", this.nodeId));
            }
            if (this.controllerMetrics == null) {
                this.controllerMetrics = (ControllerMetrics)Class.forName("org.apache.kafka.controller.MockControllerMetrics").getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            KafkaEventQueue queue = null;
            try {
                queue = new KafkaEventQueue(this.time, this.logContext, this.threadNamePrefix);
                return new QuorumController(this.logContext, this.nodeId, queue, this.time, this.configDefs, this.raftClient, this.supportedFeatures, this.defaultReplicationFactor, this.defaultNumPartitions, this.replicaPlacer, this.snapshotMaxNewRecordBytes, this.sessionTimeoutNs, this.controllerMetrics);
            }
            catch (Exception e) {
                Utils.closeQuietly(queue, (String)"event queue");
                throw e;
            }
        }
    }
}

