/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.server.DynamicBrokerConfig$;
import kafka.server.DynamicConfig$Broker$;
import kafka.server.KafkaConfig;
import kafka.utils.Implicits;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.MetadataVersionValidator;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.zookeeper.client.ZKClientConfig;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.Map;
import scala.collection.Seq;
import scala.compat.java8.OptionConverters;
import scala.compat.java8.OptionConverters$;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public final class KafkaConfig$ {
    public static final KafkaConfig$ MODULE$ = new KafkaConfig$();
    private static final ConfigDef configDef = new ConfigDef().define("zookeeper.connect", ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\nThe server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.").define("zookeeper.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)18000), ConfigDef.Importance.HIGH, "Zookeeper session timeout").define("zookeeper.connection.timeout.ms", ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in zookeeper.session.timeout.ms is used").define("zookeeper.set.acl", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH, "Set client to use secure ACLs").define("zookeeper.max.in.flight.requests", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking.").define("zookeeper.ssl.client.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC).define("zookeeper.clientCnxnSocket", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC).define("zookeeper.ssl.keystore.location", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC).define("zookeeper.ssl.keystore.password", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC).define("zookeeper.ssl.keystore.type", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC).define("zookeeper.ssl.truststore.location", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC).define("zookeeper.ssl.truststore.password", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC).define("zookeeper.ssl.truststore.type", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC).define("zookeeper.ssl.protocol", ConfigDef.Type.STRING, (Object)"TLSv1.2", ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC).define("zookeeper.ssl.enabled.protocols", ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC).define("zookeeper.ssl.cipher.suites", ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC).define("zookeeper.ssl.endpoint.identification.algorithm", ConfigDef.Type.STRING, (Object)"HTTPS", ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC).define("zookeeper.ssl.crl.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC).define("zookeeper.ssl.ocsp.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC).define("broker.id.generation.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.MEDIUM, "Enable automatic broker id generation on the server. When enabled the value configured for reserved.broker.max.id should be reviewed.").define("reserved.broker.max.id", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "Max number that can be used for a broker.id").define("broker.id", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)-1), ConfigDef.Importance.HIGH, "The broker id for this server. If unset, a unique broker id will be generated.To avoid conflicts between ZooKeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1.").define("message.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x10000C), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, "The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level <code>max.message.bytes</code> config.").define("num.network.threads", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)3), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of threads that the server uses for receiving requests from the network and sending responses to the network. Noted: each listener (except for controller listener) creates its own thread pool.").define("num.io.threads", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)8), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of threads that the server uses for processing requests, which may include disk I/O").define("num.replica.alter.log.dirs.threads", ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "The number of threads that can move replicas between log directories, which may include disk I/O").define("background.threads", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of threads to use for various background processing tasks").define("queued.max.requests", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)500), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of queued requests allowed for data-plane, before blocking the network threads").define("queued.max.request.bytes", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)-1), ConfigDef.Importance.MEDIUM, "The number of queued bytes allowed before no more requests are read").define("request.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)30000), ConfigDef.Importance.HIGH, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define("socket.connection.setup.timeout.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)ServerConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS), ConfigDef.Importance.MEDIUM, "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the <code>socket.connection.setup.timeout.max.ms</code> value.").define("socket.connection.setup.timeout.max.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS), ConfigDef.Importance.MEDIUM, "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.").define("metadata.log.max.record.bytes.between.snapshots", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)0x1400000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot. The default value is 20971520. To generate snapshots based on the time elapsed, see the <code>metadata.log.max.snapshot.interval.ms</code> configuration. The Kafka node will generate a snapshot when either the maximum time interval is reached or the maximum bytes limit is reached.").define("metadata.log.max.snapshot.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC).define("process.roles", ConfigDef.Type.LIST, Collections.emptyList(), (ConfigDef.Validator)ConfigDef.ValidList.in((String[])new String[]{"broker", "controller"}), ConfigDef.Importance.HIGH, "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. This configuration is only applicable for clusters in KRaft (Kafka Raft) mode (instead of ZooKeeper). Leave this config undefined or empty for ZooKeeper clusters.").define("node.id", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)-1), null, ConfigDef.Importance.HIGH, "The node ID associated with the roles this process is playing when <code>process.roles</code> is non-empty. This is required configuration when running in KRaft mode.").define("initial.broker.registration.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)60000), null, ConfigDef.Importance.MEDIUM, "When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.").define("broker.heartbeat.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2000), null, ConfigDef.Importance.MEDIUM, "The length of time in milliseconds between broker heartbeats. Used when running in KRaft mode.").define("broker.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)9000), null, ConfigDef.Importance.MEDIUM, "The length of time in milliseconds that a broker lease lasts if no heartbeats are made. Used when running in KRaft mode.").define("controller.listener.names", ConfigDef.Type.STRING, null, null, ConfigDef.Importance.HIGH, "A comma-separated list of the names of the listeners used by the controller. This is required if running in KRaft mode. When communicating with the controller quorum, the broker will always use the first listener in this list.\n Note: The ZooKeeper-based controller should not set this configuration.").define("sasl.mechanism.controller.protocol", ConfigDef.Type.STRING, (Object)"GSSAPI", null, ConfigDef.Importance.HIGH, "SASL mechanism used for communication with controllers. Default is GSSAPI.").define("metadata.log.dir", ConfigDef.Type.STRING, null, null, ConfigDef.Importance.HIGH, "This configuration determines where we put the metadata log for clusters in KRaft mode. If it is not set, the metadata log is placed in the first log directory from log.dirs.").define("metadata.log.segment.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x40000000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(12)), ConfigDef.Importance.HIGH, "The maximum size of a single metadata log file.").defineInternal("metadata.log.segment.min.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x800000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(12)), ConfigDef.Importance.HIGH, "Override the minimum size for a single metadata log file. This should be used for testing only.").define("metadata.log.segment.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)604800000L), null, ConfigDef.Importance.HIGH, "The maximum time before a new metadata log file is rolled out (in milliseconds).").define("metadata.max.retention.bytes", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)0x6400000), null, ConfigDef.Importance.HIGH, "The maximum combined size of the metadata log and snapshots before deleting old snapshots and log files. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.").define("metadata.max.retention.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)604800000L), null, ConfigDef.Importance.HIGH, "The number of milliseconds to keep a metadata log file or snapshot before deleting it. Since at least one snapshot must exist before any logs can be deleted, this is a soft limit.").define("metadata.max.idle.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)500), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.LOW, "This configuration controls how often the active controller should write no-op records to the metadata partition. If the value is 0, no-op records are not appended to the metadata partition. The default value is 500").defineInternal("server.max.startup.time.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The maximum number of milliseconds we will wait for the server to come up. By default there is no limit. This should be used for testing only.").define("zookeeper.metadata.migration.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH, "Enable ZK to KRaft migration").define("eligible.leader.replicas.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH, "Enable the Eligible leader replicas").defineInternal("zookeeper.metadata.migration.min.batch.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)200), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "Soft minimum batch size to use when migrating metadata from ZooKeeper to KRaft").define("authorizer.class.name", ConfigDef.Type.STRING, (Object)"", (ConfigDef.Validator)new ConfigDef.NonNullValidator(), ConfigDef.Importance.LOW, ServerConfigs.AUTHORIZER_CLASS_NAME_DOC).define("early.start.listeners", ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "A comma-separated list of listener names which may be started before the authorizer has finished initialization. This is useful when the authorizer is dependent on the cluster itself for bootstrapping, as is the case for the StandardAuthorizer (which stores ACLs in the metadata log.) By default, all listeners included in controller.listener.names will also be early start listeners. A listener should not appear in this list if it accepts external traffic.").define("listeners", ConfigDef.Type.STRING, (Object)"PLAINTEXT://:9092", ConfigDef.Importance.HIGH, SocketServerConfigs.LISTENERS_DOC).define("advertised.listeners", ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SocketServerConfigs.ADVERTISED_LISTENERS_DOC).define("listener.security.protocol.map", ConfigDef.Type.STRING, (Object)SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, ConfigDef.Importance.LOW, "Map between listener names and security protocols. This must be defined for the same security protocol to be usable in more than one port or IP. For example, internal and external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners with names INTERNAL and EXTERNAL and this property as: <code>INTERNAL:SSL,EXTERNAL:SSL</code>. As shown, key and value are separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. Different security (SSL and SASL) settings can be configured for each listener by adding a normalised prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). Note that in KRaft a default mapping from the listener names defined by <code>controller.listener.names</code> to PLAINTEXT is assumed if no explicit mapping is provided and no other security protocol is in use.").define("control.plane.listener.name", ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_DOC).define("socket.send.buffer.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)102400), ConfigDef.Importance.HIGH, "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.").define("socket.receive.buffer.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)102400), ConfigDef.Importance.HIGH, "The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.").define("socket.request.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x6400000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The maximum number of bytes in a socket request").define("socket.listen.backlog.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)50), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum number of pending connections on the socket. In Linux, you may also need to configure <code>somaxconn</code> and <code>tcp_max_syn_backlog</code> kernel parameters accordingly to make the configuration takes effect.").define("max.connections.per.ip", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, SocketServerConfigs.MAX_CONNECTIONS_PER_IP_DOC).define("max.connections.per.ip.overrides", ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, "A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is \"hostName:100,127.0.0.1:200\"").define("max.connections", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, SocketServerConfigs.MAX_CONNECTIONS_DOC).define("max.connection.creation.rate", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_DOC).define("connections.max.idle.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)600000L), ConfigDef.Importance.MEDIUM, "Idle connections timeout: the server socket processor threads close the connections that idle more than this").define("connection.failed.authentication.delay.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)100), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.LOW, SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_DOC).define("broker.rack", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: <code>RACK1</code>, <code>us-east-1d</code>").define("num.partitions", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The default number of log partitions per topic").define("log.dir", ConfigDef.Type.STRING, (Object)"/tmp/kafka-logs", ConfigDef.Importance.HIGH, "The directory in which the log data is kept (supplemental for log.dirs property)").define("log.dirs", ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "A comma-separated list of the directories where the log data is stored. If not set, the value in log.dir is used.").define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x40000000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(14)), ConfigDef.Importance.HIGH, "The maximum size of a single log file").define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, ConfigDef.Type.LONG, null, ConfigDef.Importance.HIGH, "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in log.roll.hours is used").define("log.roll.hours", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)((int)TimeUnit.MILLISECONDS.toHours(604800000L))), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC).define(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, ConfigDef.Type.LONG, null, ConfigDef.Importance.HIGH, "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is used").define("log.roll.jitter.hours", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)((int)TimeUnit.MILLISECONDS.toHours(0L))), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_DOC).define(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, ConfigDef.Type.LONG, null, ConfigDef.Importance.HIGH, "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. If set to -1, no time limit is applied.").define("log.retention.minutes", ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_DOC).define("log.retention.hours", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)((int)TimeUnit.MILLISECONDS.toHours(604800000L))), ConfigDef.Importance.HIGH, ServerLogConfigs.LOG_RETENTION_TIME_HOURS_DOC).define(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)-1L), ConfigDef.Importance.HIGH, "The maximum size of the log before deleting it").define("log.retention.check.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)300000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion").define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, ConfigDef.Type.LIST, (Object)"delete", (ConfigDef.Validator)ConfigDef.ValidList.in((String[])new String[]{"compact", "delete"}), ConfigDef.Importance.MEDIUM, "The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: \"delete\" and \"compact\"").define("log.cleaner.threads", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The number of background threads to use for log cleaning").define("log.cleaner.io.max.bytes.per.second", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)Double.MAX_VALUE), ConfigDef.Importance.MEDIUM, "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average").define("log.cleaner.dedupe.buffer.size", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)0x8000000L), ConfigDef.Importance.MEDIUM, "The total memory used for log deduplication across all cleaner threads").define("log.cleaner.io.buffer.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)524288), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The total memory used for log cleaner I/O buffers across all cleaner threads").define("log.cleaner.io.buffer.load.factor", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.9), ConfigDef.Importance.MEDIUM, "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value will allow more log to be cleaned at once but will lead to more hash collisions").define("log.cleaner.backoff.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)15000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The amount of time to sleep when there are no logs to clean").define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.5), (ConfigDef.Validator)ConfigDef.Range.between((Number)Predef$.MODULE$.int2Integer(0), (Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC).define("log.cleaner.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.MEDIUM, "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.").define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)86400000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise  tombstones messages may be collected before a consumer completes their scan).").define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)0L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.").define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.").define(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0xA00000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(4)), ConfigDef.Importance.MEDIUM, "The maximum size in bytes of the offset index").define(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)4096), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The interval with which we add an entry to the offset index.").define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of messages accumulated on a log partition before messages are flushed to disk.").define(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)60000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, "The amount of time to wait before deleting a file from the filesystem. If the value is 0 and there is no file to delete, the system will wait 1 millisecond. Low value will cause busy waiting").define("log.flush.scheduler.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), ConfigDef.Importance.HIGH, "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk").define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, null, ConfigDef.Importance.HIGH, "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is used").define("log.flush.offset.checkpoint.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)60000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, "The frequency with which we update the persistent record of the last flush which acts as the log recovery point.").define("log.flush.start.offset.checkpoint.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)60000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.HIGH, "The frequency with which we update the persistent record of log start offset").define(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.MEDIUM, "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.").define("num.recovery.threads.per.data.dir", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown").define("auto.create.topics.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.HIGH, "Enable auto creation of topic on the server.").define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "When a producer sets acks to \"all\" (or \"-1\"), <code>min.insync.replicas</code> specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either <code>NotEnoughReplicas</code> or <code>NotEnoughReplicasAfterAppend</code>).<br>When used together, <code>min.insync.replicas</code> and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set <code>min.insync.replicas</code> to 2, and produce with acks of \"all\". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.").define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, ConfigDef.Type.STRING, (Object)ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, (ConfigDef.Validator)new MetadataVersionValidator(), ConfigDef.Importance.MEDIUM, "Specify the message format version the broker will use to append messages to the logs. The value should be a valid MetadataVersion. Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check MetadataVersion for more details. By setting a particular message format version, the user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting this value incorrectly will cause consumers with older versions to break as they will receive messages with a format that they don't understand.").define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)"CreateTime", (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{"CreateTime", "LogAppendTime"}), ConfigDef.Importance.MEDIUM, "Define whether the timestamp in the message is message create time or log append time. The value should be either <code>CreateTime</code> or <code>LogAppendTime</code>.").define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "[DEPRECATED] The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling.").define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "This configuration sets the allowable timestamp difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.").define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "This configuration sets the allowable timestamp difference between the message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime.").define("create.topic.policy.class.name", ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "The create topic policy class that should be used for validation. The class should implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface.").define("alter.config.policy.class.name", ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "The alter configs policy class that should be used for validation. The class should implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface.").define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.LOW, "This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, broker will not perform down-conversion for consumers expecting an older message format. The broker responds with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configurationdoes not apply to any message format conversion that might be required for replication to followers.").defineInternal("log.initial.task.delay.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)30000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.LOW, "The initial task delay in millisecond when initializing tasks in LogManager. This should be used for testing only.").define("log.dir.failure.timeout.ms", ConfigDef.Type.LONG, (Object)ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "If the broker is unable to successfully communicate to the controller that some log directory has failed for longer than this time, the broker will fail and shut down.").define("controller.socket.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)30000), ConfigDef.Importance.MEDIUM, "The socket timeout for controller-to-broker channels.").define("default.replication.factor", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), ConfigDef.Importance.MEDIUM, "The default replication factors for automatically created topics.").define("replica.lag.time.max.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)30000L), ConfigDef.Importance.HIGH, "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr").define("replica.socket.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)30000), ConfigDef.Importance.HIGH, "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms").define("replica.socket.receive.buffer.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)65536), ConfigDef.Importance.HIGH, "The socket receive buffer for network requests to the leader for replicating data").define("replica.fetch.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x100000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config).").define("replica.fetch.wait.max.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)500), ConfigDef.Importance.HIGH, "The maximum wait time for each fetcher request issued by follower replicas. This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics").define("replica.fetch.backoff.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The amount of time to sleep when fetch partition error occurs.").define("replica.fetch.min.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), ConfigDef.Importance.HIGH, "Minimum bytes expected for each fetch response. If not enough bytes, wait up to <code>replica.fetch.wait.max.ms</code> (broker config).").define("replica.fetch.response.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0xA00000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "Maximum bytes expected for the entire fetch response. Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or <code>max.message.bytes</code> (topic config).").define("num.replica.fetchers", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), ConfigDef.Importance.HIGH, "Number of fetcher threads used to replicate records from each source broker. The total number of fetchers on each broker is bound by <code>num.replica.fetchers</code> multiplied by the number of brokers in the cluster.Increasing this value can increase the degree of I/O parallelism in the follower and leader broker at the cost of higher CPU and memory utilization.").define("replica.high.watermark.checkpoint.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)5000L), ConfigDef.Importance.HIGH, "The frequency with which the high watermark is saved out to disk").define("fetch.purgatory.purge.interval.requests", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), ConfigDef.Importance.MEDIUM, "The purge interval (in number of requests) of the fetch request purgatory").define("producer.purgatory.purge.interval.requests", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), ConfigDef.Importance.MEDIUM, "The purge interval (in number of requests) of the producer request purgatory").define("delete.records.purgatory.purge.interval.requests", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), ConfigDef.Importance.MEDIUM, "The purge interval (in number of requests) of the delete records request purgatory").define("auto.leader.rebalance.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.HIGH, ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_DOC).define("leader.imbalance.per.broker.percentage", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10), ConfigDef.Importance.HIGH, "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.").define("leader.imbalance.check.interval.seconds", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)300), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The frequency with which the partition rebalance check is triggered by the controller").define(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH, "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss").define("security.inter.broker.protocol", ConfigDef.Type.STRING, (Object)ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])Utils.enumOptions(SecurityProtocol.class)), ConfigDef.Importance.MEDIUM, ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_DOC).define("inter.broker.protocol.version", ConfigDef.Type.STRING, (Object)ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_DEFAULT, (ConfigDef.Validator)new MetadataVersionValidator(), ConfigDef.Importance.MEDIUM, "Specify which version of the inter-broker protocol will be used.\n. This is typically bumped after all brokers were upgraded to a new version.\n Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check MetadataVersion for the full list.").define("inter.broker.listener.name", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Name of listener used for communication between brokers. If this is unset, the listener name is defined by security.inter.broker.protocolIt is an error to set this and security.inter.broker.protocol properties at the same time.").define("replica.selector.class", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader.").define("controlled.shutdown.max.retries", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)3), ConfigDef.Importance.MEDIUM, "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens").define("controlled.shutdown.retry.backoff.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)5000), ConfigDef.Importance.MEDIUM, "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying.").define("controlled.shutdown.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.MEDIUM, "Enable controlled shutdown of the server.").define("group.min.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)6000), ConfigDef.Importance.MEDIUM, "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources.").define("group.max.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1800000), ConfigDef.Importance.MEDIUM, "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.").define("group.initial.rebalance.delay.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)3000), ConfigDef.Importance.MEDIUM, "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.").define("group.max.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum number of consumers that a single consumer group can accommodate.").define("group.coordinator.rebalance.protocols", ConfigDef.Type.LIST, (Object)GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, (ConfigDef.Validator)ConfigDef.ValidList.in((String[])Utils.enumOptions(Group.GroupType.class)), ConfigDef.Importance.MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC).define("group.coordinator.threads", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The number of threads used by the group coordinator.").define("group.coordinator.append.linger.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The duration in milliseconds that the coordinator will wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.").defineInternal("group.coordinator.new.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), null, ConfigDef.Importance.MEDIUM, "Enable the new group coordinator.").define("group.consumer.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)45000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The timeout to detect client failures when using the consumer group protocol.").define("group.consumer.min.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)45000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The minimum allowed session timeout for registered consumers.").define("group.consumer.max.session.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)60000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum allowed session timeout for registered consumers.").define("group.consumer.heartbeat.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)5000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The heartbeat interval given to the members of a consumer group.").define("group.consumer.min.heartbeat.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)5000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The minimum heartbeat interval for registered consumers.").define("group.consumer.max.heartbeat.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)15000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum heartbeat interval for registered consumers.").define("group.consumer.max.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum number of consumers that a single consumer group can accommodate. This value will only impact the new consumer coordinator. To configure the classic consumer coordinator check group.max.size instead.").define("group.consumer.assignors", ConfigDef.Type.LIST, (Object)GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, ConfigDef.Importance.MEDIUM, "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor.").define("group.consumer.migration.policy", ConfigDef.Type.STRING, (Object)GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), ConfigDef.Importance.MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DOC).define("offset.metadata.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)4096), ConfigDef.Importance.HIGH, "The maximum size for a metadata entry associated with an offset commit.").define("offsets.load.buffer.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x500000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large).").define("offsets.topic.replication.factor", ConfigDef.Type.SHORT, (Object)BoxesRunTime.boxToShort((short)3), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.").define("offsets.topic.num.partitions", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)50), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of partitions for the offset commit topic (should not change after deployment).").define("offsets.topic.segment.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x6400000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads.").define("offsets.topic.compression.codec", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id), ConfigDef.Importance.HIGH, "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits.").define("offsets.retention.minutes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10080), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); 2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period.").define("offsets.retention.check.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)600000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "Frequency at which to check for stale offsets").define("offsets.commit.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)5000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.").define("offsets.commit.required.acks", ConfigDef.Type.SHORT, (Object)BoxesRunTime.boxToShort((short)-1), ConfigDef.Importance.HIGH, "DEPRECATED: The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.").define("delete.topic.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.HIGH, "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off").define(ServerConfigs.COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, (Object)LogConfig.DEFAULT_COMPRESSION_TYPE, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])((String[])CollectionConverters$.MODULE$.ListHasAsScala(BrokerCompressionType.names()).asScala().toSeq().toArray(ClassTag$.MODULE$.apply(String.class)))), ConfigDef.Importance.HIGH, "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.").define(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)-1), (ConfigDef.Validator)new GzipCompression.LevelValidator(), ConfigDef.Importance.MEDIUM, ServerConfigs.COMPRESSION_GZIP_LEVEL_DOC).define(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)9), (ConfigDef.Validator)ConfigDef.Range.between((Number)Predef$.MODULE$.int2Integer(1), (Number)Predef$.MODULE$.int2Integer(17)), ConfigDef.Importance.MEDIUM, ServerConfigs.COMPRESSION_LZ4_LEVEL_DOC).define(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)ZstdCompression.DEFAULT_LEVEL), (ConfigDef.Validator)ConfigDef.Range.between((Number)Predef$.MODULE$.int2Integer(ZstdCompression.MIN_LEVEL), (Number)Predef$.MODULE$.int2Integer(ZstdCompression.MAX_LEVEL)), ConfigDef.Importance.MEDIUM, ServerConfigs.COMPRESSION_ZSTD_LEVEL_DOC).define("transactional.id.expiration.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The time in ms that the transaction coordinator will wait without receiving any transaction status updates for the current transaction before expiring its transactional id. Transactional IDs will not expire while a the transaction is still ongoing.").define("transaction.max.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The maximum allowed timeout for transactions. If a client\u2019s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.").define("transaction.state.log.min.isr", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The minimum number of replicas that must acknowledge a write to transaction topic in order to be considered successful.").define("transaction.state.log.load.buffer.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x500000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large).").define("transaction.state.log.replication.factor", ConfigDef.Type.SHORT, (Object)BoxesRunTime.boxToShort((short)3), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.").define("transaction.state.log.num.partitions", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)50), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The number of partitions for the transaction topic (should not change after deployment).").define("transaction.state.log.segment.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x6400000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.HIGH, "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads").define("transaction.abort.timed.out.transaction.cleanup.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The interval at which to rollback transactions that have timed out").define("transaction.remove.expired.transaction.cleanup.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing").define("transaction.partition.verification.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.LOW, "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition").define("producer.id.expiration.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)86400000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than <code>delivery.timeout.ms</code> can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases.").defineInternal("producer.id.expiration.check.interval.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)600000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing.").define("max.incremental.fetch.session.cache.slots", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(0)), ConfigDef.Importance.MEDIUM, "The maximum number of total incremental fetch sessions that we will maintain. FetchSessionCache is sharded into 8 shards and the limit is equally divided among all shards. Sessions are allocated to each shard in round-robin. Only entries within a shard are considered eligible for eviction.").define("fetch.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x3700000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1024)), ConfigDef.Importance.MEDIUM, "The maximum number of bytes we will return for a fetch request. Must be at least 1024.").define("max.request.partition.size.limit", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The maximum number of partitions can be served in one request.").define("metrics.num.samples", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The number of samples maintained to compute metrics.").define("metrics.sample.window.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToInteger((int)30000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The window of time a metrics sample is computed over.").define("metric.reporters", ConfigDef.Type.LIST, (Object)"", ConfigDef.Importance.LOW, "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.").define("metrics.recording.level", ConfigDef.Type.STRING, (Object)MetricConfigs.METRIC_RECORDING_LEVEL_DEFAULT, ConfigDef.Importance.LOW, "The highest recording level for metrics.").define("auto.include.jmx.reporter", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)true), ConfigDef.Importance.LOW, "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.").define("kafka.metrics.reporters", ConfigDef.Type.LIST, (Object)"", ConfigDef.Importance.LOW, "A list of classes to use as Yammer metrics custom reporters. The reporters should implement <code>kafka.metrics.KafkaMetricsReporter</code> trait. If a client wants to expose JMX operations on a custom reporter, the custom reporter needs to additionally implement an MBean trait that extends <code>kafka.metrics.KafkaMetricsReporterMBean</code> trait so that the registered MBean is compliant with the standard MBean convention.").define("kafka.metrics.polling.interval.secs", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)10), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The metrics polling interval (in seconds) which can be used inkafka.metrics.reporters implementations.").define("telemetry.max.bytes", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)0x100000), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The maximum size (after compression if compression is used) of telemetry metrics pushed from a client to the broker. The default value is 1048576 (1 MB).").define("quota.window.num", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)11), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The number of samples to retain in memory for client quotas").define("replication.quota.window.num", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)11), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The number of samples to retain in memory for replication quotas").define("alter.log.dirs.replication.quota.window.num", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)11), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The number of samples to retain in memory for alter log dirs replication quotas").define("controller.quota.window.num", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)11), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The number of samples to retain in memory for controller mutation quotas").define("quota.window.size.seconds", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The time span of each sample for client quotas").define("replication.quota.window.size.seconds", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The time span of each sample for replication quotas").define("alter.log.dirs.replication.quota.window.size.seconds", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The time span of each sample for alter log dirs replication quotas").define("controller.quota.window.size.seconds", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "The time span of each sample for controller mutations quotas").define("client.quota.callback.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "The fully qualified name of a class that implements the ClientQuotaCallback interface, which is used to determine quota limits applied to client requests. By default, the &lt;user&gt; and &lt;client-id&gt; quotas that are stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal of the session and the client-id of the request is applied.").define("connections.max.reauth.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)0L), ConfigDef.Importance.MEDIUM, "When explicitly set to a positive number (the default is 0, not a positive number), a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000").define("sasl.server.max.receive.size", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)524288), ConfigDef.Importance.MEDIUM, "The maximum receive size allowed before and during initial SASL authentication. Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice, PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.").define("security.providers", ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, "A list of configurable creator classes each returning a provider implementing security algorithms. These classes should implement the <code>org.apache.kafka.common.security.auth.SecurityProviderCreator</code> interface.").define("principal.builder.class", ConfigDef.Type.CLASS, (Object)BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DEFAULT, ConfigDef.Importance.MEDIUM, "The fully qualified name of a class that implements the KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during authorization. If no principal builder is defined, the default behavior depends on the security protocol in use. For SSL authentication,  the principal will be derived using the rules defined by <code>ssl.principal.mapping.rules</code> applied on the distinguished name from the client certificate if one is provided; otherwise, if client authentication is not required, the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the rules defined by <code>sasl.kerberos.principal.to.local.rules</code> if GSSAPI is in use, and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.").define("ssl.protocol", ConfigDef.Type.STRING, (Object)SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, "The SSL protocol used to generate the SSLContext. The default is 'TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are 'TLSv1.2' and 'TLSv1.3'. 'TLS', 'TLSv1.1', 'SSL', 'SSLv2' and 'SSLv3' may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this config and 'ssl.enabled.protocols', clients will downgrade to 'TLSv1.2' if the server does not support 'TLSv1.3'. If this config is set to 'TLSv1.2', clients will not use 'TLSv1.3' even if it is one of the values in ssl.enabled.protocols and the server only supports 'TLSv1.3'.").define("ssl.provider", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.").define("ssl.enabled.protocols", ConfigDef.Type.LIST, (Object)SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, "The list of protocols enabled for SSL connections. The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the config documentation for `ssl.protocol`.").define("ssl.keystore.type", ConfigDef.Type.STRING, (Object)"JKS", ConfigDef.Importance.MEDIUM, "The file format of the key store file. This is optional for client. The values currently supported by the default `ssl.engine.factory.class` are [JKS, PKCS12, PEM].").define("ssl.keystore.location", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The location of the key store file. This is optional for client and can be used for two-way authentication for client.").define("ssl.keystore.password", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "The store password for the key store file. This is optional for client and only needed if 'ssl.keystore.location' is configured. Key store password is not supported for PEM format.").define("ssl.key.password", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "The password of the private key in the key store file or the PEM key specified in 'ssl.keystore.key'.").define("ssl.keystore.key", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "Private key in the format specified by 'ssl.keystore.type'. Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, key password must be specified using 'ssl.key.password'").define("ssl.keystore.certificate.chain", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "Certificate chain in the format specified by 'ssl.keystore.type'. Default SSL engine factory supports only PEM format with a list of X.509 certificates").define("ssl.truststore.type", ConfigDef.Type.STRING, (Object)"JKS", ConfigDef.Importance.MEDIUM, "The file format of the trust store file. The values currently supported by the default `ssl.engine.factory.class` are [JKS, PKCS12, PEM].").define("ssl.truststore.location", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The location of the trust store file.").define("ssl.truststore.password", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "The password for the trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. Trust store password is not supported for PEM format.").define("ssl.truststore.certificates", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "Trusted certificates in the format specified by 'ssl.truststore.type'. Default SSL engine factory supports only PEM format with X.509 certificates.").define("ssl.keymanager.algorithm", ConfigDef.Type.STRING, (Object)SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.MEDIUM, "The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.").define("ssl.trustmanager.algorithm", ConfigDef.Type.STRING, (Object)SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.MEDIUM, "The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.").define("ssl.endpoint.identification.algorithm", ConfigDef.Type.STRING, (Object)"https", ConfigDef.Importance.LOW, "The endpoint identification algorithm to validate server hostname using server certificate. ").define("ssl.secure.random.implementation", ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, "The SecureRandom PRNG implementation to use for SSL cryptography operations. ").define("ssl.client.auth", ConfigDef.Type.STRING, (Object)BrokerSecurityConfigs.SSL_CLIENT_AUTH_DEFAULT, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.MEDIUM, "Configures kafka broker to request client authentication. The following settings are common:  <ul> <li><code>ssl.client.auth=required</code> If set to required client authentication is required. <li><code>ssl.client.auth=requested</code> This means client authentication is optional. unlike required, if this option is set client can choose not to provide authentication information about itself <li><code>ssl.client.auth=none</code> This means client authentication is not needed.</ul>").define("ssl.cipher.suites", ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported.").define("ssl.principal.mapping.rules", ConfigDef.Type.STRING, (Object)"DEFAULT", ConfigDef.Importance.LOW, "A list of rules for mapping from distinguished name from the client certificate to short name. The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, distinguished name of the X.500 certificate will be the principal. For more details on the format please see <a href=\"#security_authz\"> security authorization and acls</a>. Note that this configuration is ignored if an extension of KafkaPrincipalBuilder is provided by the <code>principal.builder.class</code> configuration.").define("ssl.engine.factory.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "The class of type org.apache.kafka.common.security.auth.SslEngineFactory to provide SSLEngine objects. Default value is org.apache.kafka.common.security.ssl.DefaultSslEngineFactory. Alternatively, setting this to org.apache.kafka.common.security.ssl.CommonNameLoggingSslEngineFactory will log the common name of expired SSL certificates used by clients to authenticate at any of the brokers with log level INFO. Note that this will cause a tiny delay during establishment of new connections from mTLS clients to brokers due to the extra code for examining the certificate chain provided by the client. Note further that the implementation uses a custom truststore based on the standard Java truststore and thus might be considered a security risk due to not being as mature as the standard one.").define("ssl.allow.dn.changes", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.LOW, "Indicates whether changes to the certificate distinguished name should be allowed during a dynamic reconfiguration of certificates or not.").define("ssl.allow.san.changes", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.LOW, "Indicates whether changes to the certificate subject alternative names should be allowed during a dynamic reconfiguration of certificates or not.").define("sasl.mechanism.inter.broker.protocol", ConfigDef.Type.STRING, (Object)"GSSAPI", ConfigDef.Importance.MEDIUM, "SASL mechanism used for inter-broker communication. Default is GSSAPI.").define("sasl.jaas.config", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described <a href=\"https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html\">here</a>. The format for the value is: <code>loginModuleClass controlFlag (optionName=optionValue)*;</code>. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;").define("sasl.enabled.mechanisms", ConfigDef.Type.LIST, (Object)BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, ConfigDef.Importance.MEDIUM, "The list of SASL mechanisms enabled in the Kafka server. The list may contain any mechanism for which a security provider is available. Only GSSAPI is enabled by default.").define("sasl.server.callback.handler.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "The fully qualified name of a SASL server callback handler class that implements the AuthenticateCallbackHandler interface. Server callback handlers must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler.").define("sasl.client.callback.handler.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.").define("sasl.login.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin").define("sasl.login.callback.handler.class", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler").define("sasl.kerberos.service.name", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.").define("sasl.kerberos.kinit.cmd", ConfigDef.Type.STRING, (Object)"/usr/bin/kinit", ConfigDef.Importance.MEDIUM, "Kerberos kinit command path.").define("sasl.kerberos.ticket.renew.window.factor", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.8), ConfigDef.Importance.MEDIUM, "Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.").define("sasl.kerberos.ticket.renew.jitter", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.05), ConfigDef.Importance.MEDIUM, "Percentage of random jitter added to the renewal time.").define("sasl.kerberos.min.time.before.relogin", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)60000L), ConfigDef.Importance.MEDIUM, "Login thread sleep time between refresh attempts.").define("sasl.kerberos.principal.to.local.rules", ConfigDef.Type.LIST, (Object)BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Importance.MEDIUM, "A list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form <code>{username}/{hostname}@{REALM}</code> are mapped to <code>{username}</code>. For more details on the format please see <a href=\"#security_authz\"> security authorization and acls</a>. Note that this configuration is ignored if an extension of <code>KafkaPrincipalBuilder</code> is provided by the <code>principal.builder.class</code> configuration.").define("sasl.login.refresh.window.factor", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.8), ConfigDef.Importance.MEDIUM, "Login refresh thread will sleep until the specified window factor relative to the credential's lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER.").define("sasl.login.refresh.window.jitter", ConfigDef.Type.DOUBLE, (Object)BoxesRunTime.boxToDouble((double)0.05), ConfigDef.Importance.MEDIUM, "The maximum amount of random jitter relative to the credential's lifetime that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.").define("sasl.login.refresh.min.period.seconds", ConfigDef.Type.SHORT, (Object)BoxesRunTime.boxToShort((short)60), ConfigDef.Importance.MEDIUM, "The desired minimum time for the login refresh thread to wait before refreshing a credential, in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified.  This value and  sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.").define("sasl.login.refresh.buffer.seconds", ConfigDef.Type.SHORT, (Object)BoxesRunTime.boxToShort((short)300), ConfigDef.Importance.MEDIUM, "The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of  300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.").define("sasl.login.connect.timeout.ms", ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the external authentication provider connection timeout. Currently applies only to OAUTHBEARER.").define("sasl.login.read.timeout.ms", ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the external authentication provider read timeout. Currently applies only to OAUTHBEARER.").define("sasl.login.retry.backoff.max.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)10000L), ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the maximum wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.").define("sasl.login.retry.backoff.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)100L), ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the initial wait between login attempts to the external authentication provider. Login uses an exponential backoff algorithm with an initial wait based on the sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.login.retry.backoff.max.ms setting. Currently applies only to OAUTHBEARER.").define("sasl.oauthbearer.scope.claim.name", ConfigDef.Type.STRING, (Object)"scope", ConfigDef.Importance.LOW, "The OAuth claim for the scope is often named \"scope\", but this (optional) setting can provide a different name to use for the scope included in the JWT payload's claims if the OAuth/OIDC provider uses a different name for that claim.").define("sasl.oauthbearer.sub.claim.name", ConfigDef.Type.STRING, (Object)"sub", ConfigDef.Importance.LOW, "The OAuth claim for the subject is often named \"sub\", but this (optional) setting can provide a different name to use for the subject included in the JWT payload's claims if the OAuth/OIDC provider uses a different name for that claim.").define("sasl.oauthbearer.token.endpoint.url", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token endpoint URL to which requests will be made to login based on the configuration in sasl.jaas.config. If the URL is file-based, it specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.").define("sasl.oauthbearer.jwks.endpoint.url", ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "The OAuth/OIDC provider URL from which the provider's <a href=\"https://datatracker.ietf.org/doc/html/rfc7517#section-5\">JWKS (JSON Web Key Set)</a> can be retrieved. The URL can be HTTP(S)-based or file-based. If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a \"kid\" header claim value that isn't yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received. If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a \"kid\" header value that isn't in the JWKS file, the broker will reject the JWT and authentication will fail.").define("sasl.oauthbearer.jwks.endpoint.refresh.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)3600000L), ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the broker to wait between refreshing its JWKS (JSON Web Key Set) cache that contains the keys to verify the signature of the JWT.").define("sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)100L), ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the initial wait between JWKS (JSON Web Key Set) retrieval attempts from the external authentication provider. JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.").define("sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)10000L), ConfigDef.Importance.LOW, "The (optional) value in milliseconds for the maximum wait between attempts to retrieve the JWKS (JSON Web Key Set) from the external authentication provider. JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.").define("sasl.oauthbearer.clock.skew.seconds", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)30), ConfigDef.Importance.LOW, "The (optional) value in seconds to allow for differences between the time of the OAuth/OIDC identity provider and the broker.").define("sasl.oauthbearer.expected.audience", ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, "The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the expected audiences. The JWT will be inspected for the standard OAuth \"aud\" claim and if this value is set, the broker will match the value from JWT's \"aud\" claim  to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.").define("sasl.oauthbearer.expected.issuer", ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, "The (optional) setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no match, the broker will reject the JWT and authentication will fail.").define("delegation.token.master.key", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "DEPRECATED: An alias for delegation.token.secret.key, which should be used instead of this config.").define("delegation.token.secret.key", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers.  If using Kafka with KRaft, the key must also be set across all controllers.  If the key is not set or set to empty string, brokers will disable the delegation token support.").define("delegation.token.max.lifetime.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)604800000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days.").define("delegation.token.expiry.time.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)86400000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.MEDIUM, "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day.").define("delegation.token.expiry.check.interval.ms", ConfigDef.Type.LONG, (Object)BoxesRunTime.boxToLong((long)3600000L), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1)), ConfigDef.Importance.LOW, "Scan interval to remove expired delegation tokens.").define("password.encoder.secret", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "The secret used for encoding dynamically configured passwords for this broker.").define("password.encoder.old.secret", ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, "The old secret that was used for encoding dynamically configured passwords. This is required only when the secret is updated. If specified, all dynamically encoded passwords are decoded using this old secret and re-encoded using password.encoder.secret when broker starts up.").define("password.encoder.keyfactory.algorithm", ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise.").define("password.encoder.cipher.algorithm", ConfigDef.Type.STRING, (Object)"AES/CBC/PKCS5Padding", ConfigDef.Importance.LOW, "The Cipher algorithm used for encoding dynamically configured passwords.").define("password.encoder.key.length", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)128), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(8)), ConfigDef.Importance.LOW, "The key length used for encoding dynamically configured passwords.").define("password.encoder.iterations", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)4096), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)Predef$.MODULE$.int2Integer(1024)), ConfigDef.Importance.LOW, "The iteration count used for encoding dynamically configured passwords.").define("controller.quorum.voters", ConfigDef.Type.LIST, (Object)QuorumConfig.DEFAULT_QUORUM_VOTERS, (ConfigDef.Validator)new QuorumConfig.ControllerQuorumVotersValidator(), ConfigDef.Importance.HIGH, "Map of id/endpoint information for the set of voters in a comma-separated list of <code>{id}@{host}:{port}</code> entries. For example: <code>1@localhost:9092,2@localhost:9093,3@localhost:9094</code>").defineInternal("controller.quorum.bootstrap.servers", ConfigDef.Type.LIST, (Object)QuorumConfig.DEFAULT_QUORUM_BOOTSTRAP_SERVERS, (ConfigDef.Validator)new QuorumConfig.ControllerQuorumBootstrapServersValidator(), ConfigDef.Importance.HIGH, "List of endpoints to use for bootstrapping the cluster metadata. The endpoints are specified in comma-separated list of <code>{host}:{port}</code> entries. For example: <code>localhost:9092,localhost:9093,localhost:9094</code>.").define("controller.quorum.election.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), null, ConfigDef.Importance.HIGH, "Maximum time in milliseconds to wait without being able to fetch from the leader before triggering a new election").define("controller.quorum.fetch.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2000), null, ConfigDef.Importance.HIGH, "Maximum time without a successful fetch from the current leader before becoming a candidate and triggering an election for voters; Maximum time a leader can go without receiving valid fetch or fetchSnapshot request from a majority of the quorum before resigning.").define("controller.quorum.election.backoff.max.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)1000), null, ConfigDef.Importance.HIGH, "Maximum time in milliseconds before starting new elections. This is used in the binary exponential backoff mechanism that helps prevent gridlocked elections").define("controller.quorum.append.linger.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)25), null, ConfigDef.Importance.MEDIUM, "The duration in milliseconds that the leader will wait for writes to accumulate before flushing them to disk.").define("controller.quorum.request.timeout.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)2000), null, ConfigDef.Importance.MEDIUM, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define("controller.quorum.retry.backoff.ms", ConfigDef.Type.INT, (Object)BoxesRunTime.boxToInteger((int)20), null, ConfigDef.Importance.LOW, "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, up to the <code>retry.backoff.max.ms</code> value.").defineInternal("unstable.api.versions.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH).defineInternal("unstable.feature.versions.enable", ConfigDef.Type.BOOLEAN, (Object)BoxesRunTime.boxToBoolean((boolean)false), ConfigDef.Importance.HIGH);

    static {
        RemoteLogManagerConfig.configDef().configKeys().values().forEach(key -> MODULE$.configDef().define(key));
    }

    public void main(String[] args) {
        System.out.println(this.configDef().toHtml(4, config -> new StringBuilder(14).append("brokerconfigs_").append((String)config).toString(), DynamicBrokerConfig$.MODULE$.dynamicConfigUpdateModes()));
    }

    public Option<String> zooKeeperClientProperty(ZKClientConfig clientConfig, String kafkaPropName) {
        return Option$.MODULE$.apply((Object)clientConfig.getProperty((String)ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)));
    }

    public void setZooKeeperClientProperty(ZKClientConfig clientConfig, String kafkaPropName, Object kafkaPropValue) {
        String string;
        String string2;
        block10: {
            block9: {
                string2 = (String)ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName);
                switch (kafkaPropName == null ? 0 : kafkaPropName.hashCode()) {
                    case -858437830: {
                        if ("zookeeper.ssl.cipher.suites".equals(kafkaPropName)) {
                            break;
                        }
                        break block9;
                    }
                    case 1154513386: {
                        if ("zookeeper.ssl.enabled.protocols".equals(kafkaPropName)) {
                            break;
                        }
                        break block9;
                    }
                    case 1699909284: {
                        if (!"zookeeper.ssl.endpoint.identification.algorithm".equals(kafkaPropName)) break block9;
                        String string3 = kafkaPropValue.toString().toUpperCase();
                        String string4 = "HTTPS";
                        string = Boolean.toString(string3 != null && string3.equals(string4));
                        break block10;
                    }
                    default: {
                        break block9;
                    }
                }
                if (kafkaPropValue instanceof List) {
                    List list = (List)kafkaPropValue;
                    string = CollectionConverters$.MODULE$.ListHasAsScala(list).asScala().mkString(",");
                } else {
                    string = kafkaPropValue.toString();
                }
                break block10;
            }
            string = kafkaPropValue.toString();
        }
        clientConfig.setProperty(string2, string);
    }

    public boolean zkTlsClientAuthEnabled(ZKClientConfig zkClientConfig) {
        return this.zooKeeperClientProperty(zkClientConfig, "zookeeper.ssl.client.enable").contains((Object)"true") && this.zooKeeperClientProperty(zkClientConfig, "zookeeper.clientCnxnSocket").isDefined() && this.zooKeeperClientProperty(zkClientConfig, "zookeeper.ssl.keystore.location").isDefined();
    }

    public ConfigDef configDef() {
        return configDef;
    }

    public Seq<String> configNames() {
        return (Seq)CollectionConverters$.MODULE$.SetHasAsScala(this.configDef().names()).asScala().toBuffer().sorted((Ordering)Ordering.String$.MODULE$);
    }

    public Map<String, ?> defaultValues() {
        return CollectionConverters$.MODULE$.MapHasAsScala(this.configDef().defaultValues()).asScala();
    }

    public Map<String, ConfigDef.ConfigKey> configKeys() {
        return CollectionConverters$.MODULE$.MapHasAsScala(this.configDef().configKeys()).asScala();
    }

    public KafkaConfig fromProps(Properties props) {
        boolean fromProps_doLog = true;
        return new KafkaConfig(props, fromProps_doLog);
    }

    public KafkaConfig fromProps(Properties props, boolean doLog) {
        return new KafkaConfig(props, doLog);
    }

    public KafkaConfig fromProps(Properties defaults, Properties overrides) {
        return this.fromProps(defaults, overrides, true);
    }

    public KafkaConfig fromProps(Properties defaults, Properties overrides, boolean doLog) {
        Properties props = new Properties();
        new Implicits.PropertiesOps(props).$plus$plus$eq(defaults);
        new Implicits.PropertiesOps(props).$plus$plus$eq(overrides);
        return new KafkaConfig(props, doLog);
    }

    public KafkaConfig apply(java.util.Map<?, ?> props, boolean doLog) {
        return new KafkaConfig(props, doLog);
    }

    public boolean apply$default$2() {
        return true;
    }

    private Option<ConfigDef.Type> typeOf(String name) {
        return Option$.MODULE$.apply(this.configDef().configKeys().get(name)).map((Function1 & Serializable)x$1 -> x$1.type);
    }

    public Option<ConfigDef.Type> configType(String configName) {
        Option<ConfigDef.Type> configType = this.configTypeExact(configName);
        if (configType.isDefined()) {
            return configType;
        }
        Option<ConfigDef.Type> option = this.typeOf(configName);
        if (option instanceof Some) {
            ConfigDef.Type t = (ConfigDef.Type)((Some)option).value();
            return new Some((Object)t);
        }
        if (None$.MODULE$.equals(option)) {
            return DynamicBrokerConfig$.MODULE$.brokerConfigSynonyms(configName, true).flatMap((Function1 & Serializable)name -> MODULE$.typeOf((String)name)).headOption();
        }
        throw new MatchError(option);
    }

    private Option<ConfigDef.Type> configTypeExact(String exactName) {
        ConfigDef.Type configType = (ConfigDef.Type)this.typeOf(exactName).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl());
        if (configType != null) {
            return new Some((Object)configType);
        }
        ConfigDef.ConfigKey configKey = (ConfigDef.ConfigKey)DynamicConfig$Broker$.MODULE$.brokerConfigDef().configKeys().get(exactName);
        if (configKey != null) {
            return new Some((Object)configKey.type);
        }
        return None$.MODULE$;
    }

    public boolean maybeSensitive(Option<ConfigDef.Type> configType) {
        return configType.isEmpty() || configType.contains((Object)ConfigDef.Type.PASSWORD);
    }

    public String loggableValue(ConfigResource.Type resourceType, String name, String value) {
        if (ConfigResource.Type.BROKER.equals(resourceType) ? this.maybeSensitive(this.configType(name)) : (ConfigResource.Type.TOPIC.equals(resourceType) ? this.maybeSensitive((Option<ConfigDef.Type>)OptionConverters.RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(LogConfig.configType((String)name)))) : (ConfigResource.Type.BROKER_LOGGER.equals(resourceType) ? false : !ConfigResource.Type.CLIENT_METRICS.equals(resourceType)))) {
            return "[hidden]";
        }
        return value;
    }

    public java.util.Map<Object, Object> populateSynonyms(java.util.Map<?, ?> input) {
        HashMap<Object, Object> output = new HashMap<Object, Object>(input);
        Object brokerId = output.get("broker.id");
        Object nodeId = output.get("node.id");
        if (brokerId == null && nodeId != null) {
            output.put("broker.id", nodeId);
        } else if (brokerId != null && nodeId == null) {
            output.put("node.id", brokerId);
        }
        return output;
    }

    private KafkaConfig$() {
    }
}

