/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.security.token;

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.core.security.token.DelegationTokenReceiver;
import org.apache.flink.runtime.security.token.DelegationTokenContainer;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultDelegationTokenManager
implements DelegationTokenManager {
    private static final String PROVIDER_RECEIVER_INCONSISTENCY_ERROR = "There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created.";
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
    private final Configuration configuration;
    @Nullable
    private final PluginManager pluginManager;
    private final double tokensRenewalTimeRatio;
    private final long renewalRetryBackoffPeriod;
    @VisibleForTesting
    final Map<String, DelegationTokenProvider> delegationTokenProviders;
    private final DelegationTokenReceiverRepository delegationTokenReceiverRepository;
    @Nullable
    private final ScheduledExecutor scheduledExecutor;
    @Nullable
    private final ExecutorService ioExecutor;
    private final Object tokensUpdateFutureLock = new Object();
    @Nullable
    @GuardedBy(value="tokensUpdateFutureLock")
    private ScheduledFuture<?> tokensUpdateFuture;
    @Nullable
    private DelegationTokenManager.Listener listener;

    public DefaultDelegationTokenManager(Configuration configuration, @Nullable PluginManager pluginManager, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) {
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration, (String)"Flink configuration must not be null");
        this.pluginManager = pluginManager;
        this.tokensRenewalTimeRatio = (Double)configuration.get(SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
        this.renewalRetryBackoffPeriod = ((Duration)configuration.get(SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF)).toMillis();
        this.delegationTokenProviders = this.loadProviders();
        this.delegationTokenReceiverRepository = new DelegationTokenReceiverRepository(configuration, pluginManager);
        this.scheduledExecutor = scheduledExecutor;
        this.ioExecutor = ioExecutor;
        DefaultDelegationTokenManager.checkProviderAndReceiverConsistency(this.delegationTokenProviders, this.delegationTokenReceiverRepository.delegationTokenReceivers);
    }

    private Map<String, DelegationTokenProvider> loadProviders() {
        LOG.info("Loading delegation token providers");
        HashMap<String, DelegationTokenProvider> providers = new HashMap<String, DelegationTokenProvider>();
        Consumer<DelegationTokenProvider> loadProvider = provider -> {
            try {
                if (DefaultDelegationTokenManager.isProviderEnabled(this.configuration, provider.serviceName())) {
                    provider.init(this.configuration);
                    LOG.info("Delegation token provider {} loaded and initialized", (Object)provider.serviceName());
                    Preconditions.checkState((!providers.containsKey(provider.serviceName()) ? 1 : 0) != 0, (String)"Delegation token provider with service name {} has multiple implementations", (Object[])new Object[]{provider.serviceName()});
                    providers.put(provider.serviceName(), (DelegationTokenProvider)provider);
                } else {
                    LOG.info("Delegation token provider {} is disabled so not loaded", (Object)provider.serviceName());
                }
            }
            catch (Exception | NoClassDefFoundError e) {
                LOG.error("Failed to initialize delegation token provider {}", (Object)provider.serviceName(), (Object)e);
                throw new FlinkRuntimeException(e);
            }
        };
        ServiceLoader.load(DelegationTokenProvider.class).iterator().forEachRemaining(loadProvider);
        if (this.pluginManager != null) {
            this.pluginManager.load(DelegationTokenProvider.class).forEachRemaining(loadProvider);
        }
        LOG.info("Delegation token providers loaded successfully");
        return providers;
    }

    static boolean isProviderEnabled(Configuration configuration, String serviceName) {
        return SecurityOptions.forProvider((Configuration)configuration, (String)serviceName).getBoolean(SecurityOptions.DELEGATION_TOKEN_PROVIDER_ENABLED);
    }

    @VisibleForTesting
    boolean isProviderLoaded(String serviceName) {
        return this.delegationTokenProviders.containsKey(serviceName);
    }

    @VisibleForTesting
    boolean isReceiverLoaded(String serviceName) {
        return this.delegationTokenReceiverRepository.isReceiverLoaded(serviceName);
    }

    @VisibleForTesting
    static void checkProviderAndReceiverConsistency(Map<String, DelegationTokenProvider> providers, Map<String, DelegationTokenReceiver> receivers) {
        LOG.info("Checking provider and receiver instances consistency");
        if (providers.size() != receivers.size()) {
            HashSet<String> missingReceiverServiceNames = new HashSet<String>(providers.keySet());
            missingReceiverServiceNames.removeAll(receivers.keySet());
            if (!missingReceiverServiceNames.isEmpty()) {
                throw new IllegalStateException("There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created. Missing receivers: " + String.join((CharSequence)",", missingReceiverServiceNames));
            }
            HashSet<String> missingProviderServiceNames = new HashSet<String>(receivers.keySet());
            missingProviderServiceNames.removeAll(providers.keySet());
            if (!missingProviderServiceNames.isEmpty()) {
                throw new IllegalStateException("There is an inconsistency between loaded delegation token providers and receivers. One must implement a DelegationTokenProvider and a DelegationTokenReceiver with the same service name and add them together to the classpath to make the system consistent. The mentioned classes are loaded with Java's service loader so the appropriate META-INF registration also needs to be created. Missing providers: " + String.join((CharSequence)",", missingProviderServiceNames));
            }
        }
        LOG.info("Provider and receiver instances are consistent");
    }

    @Override
    public void obtainDelegationTokens(DelegationTokenContainer container) throws Exception {
        LOG.info("Obtaining delegation tokens");
        this.obtainDelegationTokensAndGetNextRenewal(container);
        LOG.info("Delegation tokens obtained successfully");
    }

    protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(DelegationTokenContainer container) {
        return this.delegationTokenProviders.values().stream().map(p -> {
            Optional nr = Optional.empty();
            try {
                if (p.delegationTokensRequired()) {
                    LOG.debug("Obtaining delegation token for service {}", (Object)p.serviceName());
                    DelegationTokenProvider.ObtainedDelegationTokens t = p.obtainDelegationTokens();
                    Preconditions.checkNotNull((Object)t, (String)"Obtained delegation tokens must not be null");
                    container.addToken(p.serviceName(), t.getTokens());
                    nr = t.getValidUntil();
                    LOG.debug("Obtained delegation token for service {} successfully", (Object)p.serviceName());
                } else {
                    LOG.debug("Service {} does not need to obtain delegation token", (Object)p.serviceName());
                }
            }
            catch (Exception e) {
                LOG.error("Failed to obtain delegation token for provider {}", (Object)p.serviceName(), (Object)e);
                throw new FlinkRuntimeException((Throwable)e);
            }
            return nr;
        }).flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty)).min(Long::compare);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(DelegationTokenManager.Listener listener) throws Exception {
        Preconditions.checkNotNull((Object)this.scheduledExecutor, (String)"Scheduled executor must not be null");
        Preconditions.checkNotNull((Object)this.ioExecutor, (String)"IO executor must not be null");
        this.listener = (DelegationTokenManager.Listener)Preconditions.checkNotNull((Object)listener, (String)"Listener must not be null");
        Object object = this.tokensUpdateFutureLock;
        synchronized (object) {
            Preconditions.checkState((this.tokensUpdateFuture == null ? 1 : 0) != 0, (Object)"Manager is already started");
        }
        this.startTokensUpdate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void startTokensUpdate() {
        block12: {
            try {
                LOG.info("Starting tokens update task");
                DelegationTokenContainer container = new DelegationTokenContainer();
                Optional<Long> nextRenewal = this.obtainDelegationTokensAndGetNextRenewal(container);
                if (container.hasTokens()) {
                    this.delegationTokenReceiverRepository.onNewTokensObtained(container);
                    LOG.info("Notifying listener about new tokens");
                    Preconditions.checkNotNull((Object)this.listener, (String)"Listener must not be null");
                    this.listener.onNewTokensObtained(InstantiationUtil.serializeObject((Object)container));
                    LOG.info("Listener notified successfully");
                } else {
                    LOG.warn("No tokens obtained so skipping listener notification");
                }
                if (nextRenewal.isPresent()) {
                    long renewalDelay = this.calculateRenewalDelay(Clock.systemDefaultZone(), nextRenewal.get());
                    Object object = this.tokensUpdateFutureLock;
                    synchronized (object) {
                        this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> this.ioExecutor.execute(this::startTokensUpdate), renewalDelay, TimeUnit.MILLISECONDS);
                    }
                    LOG.info("Tokens update task started with {} ms delay", (Object)renewalDelay);
                    break block12;
                }
                LOG.warn("Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date");
            }
            catch (InterruptedException e) {
                LOG.debug("Interrupted", (Throwable)e);
            }
            catch (Exception e) {
                Object object = this.tokensUpdateFutureLock;
                synchronized (object) {
                    this.tokensUpdateFuture = this.scheduledExecutor.schedule(() -> this.ioExecutor.execute(this::startTokensUpdate), this.renewalRetryBackoffPeriod, TimeUnit.MILLISECONDS);
                }
                LOG.warn("Failed to update tokens, will try again in {} ms", (Object)this.renewalRetryBackoffPeriod, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void stopTokensUpdate() {
        Object object = this.tokensUpdateFutureLock;
        synchronized (object) {
            if (this.tokensUpdateFuture != null) {
                this.tokensUpdateFuture.cancel(true);
                this.tokensUpdateFuture = null;
            }
        }
    }

    @VisibleForTesting
    long calculateRenewalDelay(Clock clock, long nextRenewal) {
        long now = clock.millis();
        long renewalDelay = Math.round(this.tokensRenewalTimeRatio * (double)(nextRenewal - now));
        LOG.debug("Calculated delay on renewal is {}, based on next renewal {} and the ratio {}, and current time {}", new Object[]{renewalDelay, nextRenewal, this.tokensRenewalTimeRatio, now});
        return renewalDelay;
    }

    @Override
    public void stop() {
        LOG.info("Stopping credential renewal");
        this.stopTokensUpdate();
        LOG.info("Stopped credential renewal");
    }
}

