/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.security.authorizer;

import io.confluent.security.authorizer.AccessRule;
import io.confluent.security.authorizer.Action;
import io.confluent.security.authorizer.AuthorizePolicy;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.Authorizer;
import io.confluent.security.authorizer.ConfluentAuthorizerConfig;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.RequestContext;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.Auditable;
import io.confluent.security.authorizer.provider.AuthorizeRule;
import io.confluent.security.authorizer.provider.ConfluentAuthorizationEvent;
import io.confluent.security.authorizer.provider.GroupProvider;
import io.confluent.security.authorizer.provider.InvalidScopeException;
import io.confluent.security.authorizer.provider.MetadataProvider;
import io.confluent.security.authorizer.provider.Provider;
import io.confluent.security.authorizer.provider.ProviderFailedException;
import io.confluent.security.authorizer.utils.ThreadUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.audit.AuditEvent;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.audit.NoOpAuditLogProvider;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedAuthorizer
implements Authorizer {
    protected static final Logger log = LoggerFactory.getLogger((String)"kafka.authorizer.logger");
    protected final Set<Provider> providersCreated = new HashSet<Provider>();
    private GroupProvider groupProvider;
    private List<AccessRuleProvider> accessRuleProviders;
    private AuditLogProvider auditLogProvider;
    protected ConfluentAuthorizerConfig authorizerConfig;
    private MetadataProvider metadataProvider;
    private boolean allowEveryoneIfNoAcl;
    private Set<KafkaPrincipal> superUsers = Collections.emptySet();
    protected Set<KafkaPrincipal> brokerUsers = Collections.emptySet();
    protected String interBrokerListener;
    private Duration initTimeout;
    private volatile boolean ready;
    private volatile String clusterId;
    private volatile Scope scope = Scope.ROOT_SCOPE;
    private AuthorizerMetrics authorizerMetrics;

    public void configure(Map<String, ?> configs) {
        this.authorizerConfig = new ConfluentAuthorizerConfig(configs);
        this.allowEveryoneIfNoAcl = this.authorizerConfig.allowEveryoneIfNoAcl;
        this.superUsers = this.authorizerConfig.superUsers;
        this.brokerUsers = this.authorizerConfig.brokerUsers;
    }

    public void configureServerInfo(ConfluentAuthorizerServerInfo serverInfo) {
        this.clusterId = serverInfo.clusterResource().clusterId();
        log.debug("Configuring scope for Kafka cluster with cluster id {}", (Object)this.clusterId);
        this.scope = Scope.kafkaClusterScope(this.clusterId);
        this.interBrokerListener = (String)serverInfo.interBrokerEndpoint().listenerName().get();
        this.authorizerMetrics = new AuthorizerMetrics(serverInfo.metrics());
        ConfluentAuthorizerConfig.Providers providers = this.authorizerConfig.createProviders(this.clusterId);
        this.providersCreated.addAll(providers.accessRuleProviders);
        if (providers.groupProvider != null) {
            this.providersCreated.add(providers.groupProvider);
        }
        if (providers.metadataProvider != null) {
            this.providersCreated.add(providers.metadataProvider);
        }
        this.providersCreated.stream().filter(provider -> provider instanceof Auditable).forEach(provider -> ((Auditable)((Object)provider)).auditLogProvider(serverInfo.auditLogProvider()));
        this.configureProviders(providers.accessRuleProviders, providers.groupProvider, providers.metadataProvider, serverInfo.auditLogProvider());
    }

    @Override
    public List<AuthorizeResult> authorize(RequestContext requestContext, List<Action> actions) {
        return actions.stream().map(action -> this.authorize(requestContext, (Action)action)).collect(Collectors.toList());
    }

    public GroupProvider groupProvider() {
        return this.groupProvider;
    }

    public AccessRuleProvider accessRuleProvider(String providerName) {
        Optional<AccessRuleProvider> provider = this.accessRuleProviders.stream().filter(p -> p.providerName().equals(providerName)).findFirst();
        if (provider.isPresent()) {
            return provider.get();
        }
        throw new IllegalArgumentException("Access rule provider not found: " + providerName);
    }

    public MetadataProvider metadataProvider() {
        return this.metadataProvider;
    }

    protected List<AccessRuleProvider> accessRuleProviders() {
        return this.accessRuleProviders;
    }

    public AuditLogProvider auditLogProvider() {
        return this.auditLogProvider;
    }

    public CompletableFuture<Void> start(ConfluentAuthorizerServerInfo serverInfo, Map<String, ?> interBrokerListenerConfigs, Runnable initTask) {
        boolean usesMetadataFromThisKafkaCluster;
        this.initTimeout = this.authorizerConfig.initTimeout;
        if (this.authorizerMetrics == null) {
            this.authorizerMetrics = new AuthorizerMetrics(serverInfo.metrics());
        }
        HashSet<Provider> allProviders = new HashSet<Provider>();
        if (this.groupProvider != null) {
            allProviders.add(this.groupProvider);
        }
        allProviders.addAll(this.accessRuleProviders);
        if (this.metadataProvider != null) {
            allProviders.add(this.metadataProvider);
        }
        List<CompletableFuture> futures = allProviders.stream().map(provider -> provider.start(serverInfo, interBrokerListenerConfigs)).map(CompletionStage::toCompletableFuture).collect(Collectors.toList());
        CompletableFuture[] futureArray = futures.toArray(new CompletableFuture[futures.size()]);
        CompletionStage readyFuture = ((CompletableFuture)((CompletableFuture)CompletableFuture.allOf(futureArray).thenAccept(unused -> {
            this.ready = true;
        })).thenRunAsync(initTask)).thenAccept(unused -> this.auditLogProvider.start(interBrokerListenerConfigs));
        CompletableFuture<Void> future = this.futureOrTimeout((CompletableFuture<Void>)readyFuture, this.initTimeout);
        boolean bl = usesMetadataFromThisKafkaCluster = this.groupProvider != null && this.groupProvider.usesMetadataFromThisKafkaCluster() || this.metadataProvider != null && this.metadataProvider.usesMetadataFromThisKafkaCluster() || this.accessRuleProviders.stream().anyMatch(Provider::usesMetadataFromThisKafkaCluster) || this.auditLogProvider.usesMetadataFromThisKafkaCluster();
        if (!usesMetadataFromThisKafkaCluster) {
            future.join();
        }
        return future;
    }

    protected void configureProviders(List<AccessRuleProvider> accessRuleProviders, GroupProvider groupProvider, MetadataProvider metadataProvider, AuditLogProvider auditLogProvider) {
        this.accessRuleProviders = accessRuleProviders;
        this.groupProvider = groupProvider;
        this.metadataProvider = metadataProvider;
        this.auditLogProvider = auditLogProvider == null ? NoOpAuditLogProvider.INSTANCE : auditLogProvider;
    }

    protected boolean ready() {
        return this.ready;
    }

    protected boolean isSuperUser(KafkaPrincipal sessionPrincipal, KafkaPrincipal userOrGroupPrincipal, Action action) {
        return this.superUsers.contains(userOrGroupPrincipal);
    }

    private AuthorizeResult authorize(RequestContext requestContext, Action action) {
        try {
            AuthorizePolicy authorizePolicy;
            KafkaPrincipal sessionPrincipal = requestContext.principal();
            String host = requestContext.clientAddress().getHostAddress();
            KafkaPrincipal userPrincipal = this.userPrincipal(sessionPrincipal);
            if (this.isSuperUser(sessionPrincipal, userPrincipal, action)) {
                log.debug("principal = {} is a super user, allowing operation without checking any providers.", (Object)userPrincipal);
                authorizePolicy = new AuthorizePolicy.SuperUser(AuthorizePolicy.PolicyType.SUPER_USER, userPrincipal);
            } else {
                Set<KafkaPrincipal> groupPrincipals = this.groupProvider.groups(sessionPrincipal);
                Optional<KafkaPrincipal> superGroup = groupPrincipals.stream().filter(group -> this.isSuperUser(sessionPrincipal, (KafkaPrincipal)group, action)).findFirst();
                if (superGroup.isPresent()) {
                    log.debug("principal = {} belongs to super group {}, allowing operation without checking acls.", (Object)userPrincipal, (Object)superGroup.get());
                    authorizePolicy = new AuthorizePolicy.SuperUser(AuthorizePolicy.PolicyType.SUPER_GROUP, superGroup.get());
                } else {
                    authorizePolicy = this.authorize(sessionPrincipal, groupPrincipals, host, action);
                }
            }
            AuthorizeResult authorizeResult = authorizePolicy.policyType().accessGranted() ? AuthorizeResult.ALLOWED : AuthorizeResult.DENIED;
            try {
                this.logAuditMessage(this.scope, requestContext, action, authorizeResult, authorizePolicy);
            }
            catch (Exception e) {
                log.error("Failed to log Audit message.\n  scope: {}\n  context: {}\n  principal: {}\n  action: {}\n  result: {}\n  policy: {}", new Object[]{this.scope, requestContext, requestContext.principal(), action, authorizeResult, authorizePolicy, e});
            }
            this.authorizerMetrics.recordAuthorizerMetrics(authorizeResult);
            return authorizeResult;
        }
        catch (InvalidScopeException e) {
            log.error("Authorizer failed with unknown scope: {}", (Object)action.scope(), (Object)e);
            return AuthorizeResult.UNKNOWN_SCOPE;
        }
        catch (ProviderFailedException e) {
            log.error("Authorization provider has failed", (Throwable)((Object)e));
            return AuthorizeResult.AUTHORIZER_FAILED;
        }
        catch (Throwable t) {
            log.error("Authorization failed with unexpected exception", t);
            return AuthorizeResult.UNKNOWN_ERROR;
        }
    }

    private AuthorizePolicy authorize(KafkaPrincipal sessionPrincipal, Set<KafkaPrincipal> groupPrincipals, String host, Action action) {
        Scope scope = action.scope();
        if (this.accessRuleProviders.stream().anyMatch(p -> p.isSuperUser(sessionPrincipal, scope))) {
            return new AuthorizePolicy.SuperUser(AuthorizePolicy.PolicyType.SUPER_USER, sessionPrincipal);
        }
        Optional<KafkaPrincipal> superGroup = groupPrincipals.stream().filter(principal -> this.accessRuleProviders.stream().anyMatch(p -> p.isSuperUser((KafkaPrincipal)principal, scope))).findAny();
        if (superGroup.isPresent()) {
            return new AuthorizePolicy.SuperUser(AuthorizePolicy.PolicyType.SUPER_GROUP, superGroup.get());
        }
        ResourcePattern resource = action.resourcePattern();
        Operation operation = action.operation();
        AuthorizeRule authorizeRule = new AuthorizeRule();
        this.accessRuleProviders.stream().filter(AccessRuleProvider::mayDeny).forEach(p -> authorizeRule.add(p.findRule(sessionPrincipal, groupPrincipals, host, action)));
        Optional<AuthorizePolicy> authorizePolicy = this.authorizePolicy(operation, resource, host, authorizeRule);
        if (authorizePolicy.isPresent()) {
            return authorizePolicy.get();
        }
        this.accessRuleProviders.stream().filter(p -> !p.mayDeny()).forEach(p -> authorizeRule.add(p.findRule(sessionPrincipal, groupPrincipals, host, action)));
        authorizePolicy = this.authorizePolicy(operation, resource, host, authorizeRule);
        if (authorizePolicy.isPresent()) {
            return authorizePolicy.get();
        }
        return authorizePolicy.orElse(this.authorizePolicyWithNoMatchingRule(resource, authorizeRule));
    }

    @Override
    public void close() {
        log.debug("Closing embedded authorizer");
        AtomicReference<Object> firstException = new AtomicReference<Object>();
        this.providersCreated.forEach(provider -> Utils.closeQuietly((AutoCloseable)provider, (String)provider.providerName(), (AtomicReference)firstException));
        Throwable exception = firstException.getAndSet(null);
        if (exception != null) {
            log.error("Failed to close authorizer cleanly", exception);
        }
    }

    protected Scope scope() {
        return this.scope;
    }

    protected void setupAuthorizerMetrics(Metrics metrics) {
        this.authorizerMetrics = new AuthorizerMetrics(metrics);
    }

    private Optional<AuthorizePolicy> authorizePolicy(Operation op, ResourcePattern resource, String host, AuthorizeRule authorizeRule) {
        Optional<AccessRule> ruleOpt = authorizeRule.denyRule().isPresent() ? authorizeRule.denyRule() : authorizeRule.allowRule();
        ruleOpt.ifPresent(rule -> log.debug("operation = {} on resource = {} from host = {} is {} based on policy = {}", new Object[]{op, resource, host, rule.permissionType(), rule}));
        return ruleOpt.map(Function.identity());
    }

    private AuthorizePolicy authorizePolicyWithNoMatchingRule(ResourcePattern resource, AuthorizeRule authorizeRule) {
        if (authorizeRule.noResourceAcls()) {
            log.debug("No acl found for resource {}, authorized = {}", (Object)resource, (Object)this.allowEveryoneIfNoAcl);
            return this.allowEveryoneIfNoAcl ? AuthorizePolicy.ALLOW_ON_NO_RULE : AuthorizePolicy.DENY_ON_NO_RULE;
        }
        return AuthorizePolicy.NO_MATCHING_RULE;
    }

    private KafkaPrincipal userPrincipal(KafkaPrincipal sessionPrincipal) {
        return sessionPrincipal.getClass() != KafkaPrincipal.class ? new KafkaPrincipal(sessionPrincipal.getPrincipalType(), sessionPrincipal.getName()) : sessionPrincipal;
    }

    protected void logAuditMessage(Scope sourceScope, RequestContext requestContext, Action action, AuthorizeResult authorizeResult, AuthorizePolicy authorizePolicy) {
        ConfluentAuthorizationEvent auditEvent = new ConfluentAuthorizationEvent(sourceScope, requestContext, action, authorizeResult, authorizePolicy);
        this.logAuthorization(auditEvent);
        this.auditLogProvider.logEvent((AuditEvent)auditEvent);
    }

    private void logAuthorization(ConfluentAuthorizationEvent authZEvent) {
        String logMessage = "Principal = {} is {} Operation = {} from host = {} on resource = {}";
        KafkaPrincipal principal = authZEvent.requestContext().principal();
        String host = authZEvent.requestContext().clientAddress().getHostAddress();
        String operation = authZEvent.action().operation().name();
        String resource = SecurityUtils.toPascalCase((String)authZEvent.action().resourceType().name()) + ":" + authZEvent.action().resourcePattern().patternType() + ":" + authZEvent.action().resourceName();
        if (authZEvent.authorizeResult() == AuthorizeResult.ALLOWED) {
            if (authZEvent.action().logIfAllowed()) {
                log.debug(logMessage, new Object[]{principal, "Allowed", operation, host, resource});
            } else {
                log.trace(logMessage, new Object[]{principal, "Allowed", operation, host, resource});
            }
        } else if (authZEvent.action().logIfDenied()) {
            log.info(logMessage, new Object[]{principal, "Denied", operation, host, resource});
        } else {
            log.trace(logMessage, new Object[]{principal, "Denied", operation, host, resource});
        }
    }

    CompletableFuture<Void> futureOrTimeout(CompletableFuture<Void> readyFuture, Duration timeout) {
        if (readyFuture.isDone()) {
            return readyFuture;
        }
        CompletableFuture timeoutFuture = new CompletableFuture();
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("authorizer-%d", true));
        executor.schedule(() -> {
            String errorMessage = String.format("Authorizer did not start up within the timeout %s=%d ms. This may be due to one or more brokers being down for longer than this interval. Please start all brokers in the cluster within '%s' of each other to ensure that all partitions required for authorizer start up are available within this timeout.", "confluent.authorizer.init.timeout.ms", this.initTimeout.toMillis(), "confluent.authorizer.init.timeout.ms");
            timeoutFuture.completeExceptionally((Throwable)new TimeoutException(errorMessage));
        }, timeout.toMillis(), TimeUnit.MILLISECONDS);
        return ((CompletableFuture)CompletableFuture.anyOf(readyFuture, timeoutFuture).thenApply(unused -> null)).whenComplete((unused, e) -> executor.shutdownNow());
    }

    protected Metrics metrics() {
        return this.authorizerMetrics.metrics();
    }

    void setupAuthorizerMetrics(Time time) {
        this.authorizerMetrics = new AuthorizerMetrics(time);
    }

    protected Time metricsTime() {
        return this.authorizerMetrics.metricsTime();
    }

    class AuthorizerMetrics {
        public static final String GROUP_NAME = "confluent-authorizer-metrics";
        public static final String AUTHORIZATION_REQUEST_RATE_MINUTE = "authorization-request-rate-per-minute";
        public static final String AUTHORIZATION_ALLOWED_RATE_MINUTE = "authorization-allowed-rate-per-minute";
        public static final String AUTHORIZATION_DENIED_RATE_MINUTE = "authorization-denied-rate-per-minute";
        private static final String AUTHORIZER_AUTHORIZATION_ALLOWED_SENSOR = "authorizer-authorization-allowed";
        private static final String AUTHORIZER_AUTHORIZATION_DENIED_SENSOR = "authorizer-authorization-denied";
        private static final String AUTHORIZER_AUTHORIZATION_REQUEST_SENSOR = "authorizer-authorization-request";
        private Time time = null;
        private Metrics metrics = null;
        private Sensor authorizationAllowedSensor = null;
        private Sensor authorizationDeniedSensor = null;
        private Sensor authorizationRequestSensor = null;

        AuthorizerMetrics(Metrics metrics) {
            this.metrics = metrics;
            this.setupMetrics();
        }

        AuthorizerMetrics(Time time) {
            this.time = time;
            this.metrics = new Metrics(time);
            this.setupMetrics();
        }

        void recordAuthorizerMetrics(AuthorizeResult result) {
            this.authorizationRequestSensor.record();
            if (result == AuthorizeResult.ALLOWED) {
                this.authorizationAllowedSensor.record();
            } else {
                this.authorizationDeniedSensor.record();
            }
        }

        Metrics metrics() {
            return this.metrics;
        }

        Time metricsTime() {
            return this.time;
        }

        void setupMetrics() {
            this.authorizationAllowedSensor = this.metrics.sensor(AUTHORIZER_AUTHORIZATION_ALLOWED_SENSOR);
            this.authorizationAllowedSensor.add(this.metrics.metricName(AUTHORIZATION_ALLOWED_RATE_MINUTE, GROUP_NAME, "The number of authorization allowed per minute"), (MeasurableStat)new Rate(TimeUnit.MINUTES, (SampledStat)new WindowedCount()));
            this.authorizationDeniedSensor = this.metrics.sensor(AUTHORIZER_AUTHORIZATION_DENIED_SENSOR);
            this.authorizationDeniedSensor.add(this.metrics.metricName(AUTHORIZATION_DENIED_RATE_MINUTE, GROUP_NAME, "The number of authorization denied per minute"), (MeasurableStat)new Rate(TimeUnit.MINUTES, (SampledStat)new WindowedCount()));
            this.authorizationRequestSensor = this.metrics.sensor(AUTHORIZER_AUTHORIZATION_REQUEST_SENSOR);
            this.authorizationRequestSensor.add(this.metrics.metricName(AUTHORIZATION_REQUEST_RATE_MINUTE, GROUP_NAME, "The number of authorization request per minute"), (MeasurableStat)new Rate(TimeUnit.MINUTES, (SampledStat)new WindowedCount()));
        }
    }
}

