/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.api;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.test.junit.ClusterTestExtensions;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.junit.jupiter.api.extension.ExtendWith;

@ClusterTestDefaults(controllers=3, types={Type.KRAFT}, serverProperties={@ClusterConfigProperty(id=3000, key="client.quota.callback.class", value="kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), @ClusterConfigProperty(id=3001, key="client.quota.callback.class", value="kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"), @ClusterConfigProperty(id=3002, key="client.quota.callback.class", value="kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback")})
@ExtendWith(value={ClusterTestExtensions.class})
public class CustomQuotaCallbackTest {
    private final ClusterInstance cluster;

    public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
        this.cluster = clusterInstance;
    }

    @ClusterTest
    public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException {
        try (Admin admin = this.cluster.admin(Map.of());){
            admin.createTopics(List.of(new NewTopic("topic", 1, 1)));
            TestUtils.waitForCondition(() -> CustomQuotaCallback.COUNTERS.size() == 3 && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), (String)"The CustomQuotaCallback not triggered in all controllers. ");
            CustomQuotaCallback.COUNTERS.clear();
            admin.deleteTopics(List.of("topic"));
            TestUtils.waitForCondition(() -> CustomQuotaCallback.COUNTERS.size() == 3 && CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0), (String)"The CustomQuotaCallback not triggered in all controllers. ");
        }
    }

    public static class CustomQuotaCallback
    implements ClientQuotaCallback {
        public static final Map<String, AtomicInteger> COUNTERS = new ConcurrentHashMap<String, AtomicInteger>();
        private String nodeId;

        public Map<String, String> quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) {
            return Map.of();
        }

        public Double quotaLimit(ClientQuotaType quotaType, Map<String, String> metricTags) {
            return Double.MAX_VALUE;
        }

        public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue) {
        }

        public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity) {
        }

        public boolean quotaResetRequired(ClientQuotaType quotaType) {
            return true;
        }

        public boolean updateClusterMetadata(Cluster cluster) {
            COUNTERS.computeIfAbsent(this.nodeId, k -> new AtomicInteger()).incrementAndGet();
            return true;
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
            this.nodeId = (String)configs.get("node.id");
        }
    }
}

