/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.virtualhost.plugins;

import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.ConfiguredQueueBindingListener;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;

public class SlowConsumerDetection
extends VirtualHostHouseKeepingPlugin {
    private SlowConsumerDetectionConfiguration _config;
    private ConfiguredQueueBindingListener _listener;

    public void configure(ConfigurationPlugin config) {
        this._config = (SlowConsumerDetectionConfiguration)config;
        this._listener = new ConfiguredQueueBindingListener(this.getVirtualHost().getName());
        ExchangeRegistry exchangeRegistry = this.getVirtualHost().getExchangeRegistry();
        for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames()) {
            exchangeRegistry.getExchange(exchangeName).addBindingListener(this._listener);
        }
    }

    public SlowConsumerDetection(VirtualHost vhost) {
        super(vhost);
    }

    public void execute() {
        CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
        Set<AMQQueue> cache = this._listener.getQueueCache();
        for (AMQQueue q : cache) {
            CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
            try {
                SlowConsumerDetectionQueueConfiguration config = (SlowConsumerDetectionQueueConfiguration)q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
                if (!this.checkQueueStatus(q, config)) continue;
                SlowConsumerPolicyPlugin policy = config.getPolicy();
                if (policy == null) {
                    this.getLogger().warn((Object)("No slow consumer policy for queue " + q.getName()));
                    continue;
                }
                policy.performPolicy(q);
            }
            catch (Exception e) {
                this.getLogger().error((Object)("Exception in SlowConsumersDetection for queue: " + q.getName()), (Throwable)e);
            }
        }
        CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
    }

    public long getDelay() {
        return this._config.getDelay();
    }

    public TimeUnit getTimeUnit() {
        return this._config.getTimeUnit();
    }

    private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) {
        if (config != null) {
            if (this.getLogger().isInfoEnabled()) {
                this.getLogger().info((Object)("Retrieved Queue(" + q.getName() + ") Config:" + config));
            }
            int count = q.getMessageCount();
            if (config.getMessageCount() != 0L && (long)count >= config.getMessageCount() || config.getDepth() != 0L && q.getQueueDepth() >= config.getDepth() || config.getMessageAge() != 0L && count > 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()) {
                if (this.getLogger().isDebugEnabled()) {
                    this.getLogger().debug((Object)("Detected Slow Consumer on Queue(" + q.getName() + ")"));
                    this.getLogger().debug((Object)("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()));
                    this.getLogger().debug((Object)("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()));
                    this.getLogger().debug((Object)("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()));
                }
                return true;
            }
        }
        return false;
    }

    public static class SlowConsumerFactory
    implements VirtualHostPluginFactory {
        public SlowConsumerDetection newInstance(VirtualHost vhost) {
            SlowConsumerDetectionConfiguration config = (SlowConsumerDetectionConfiguration)vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
            if (config == null) {
                return null;
            }
            SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
            plugin.configure(config);
            return plugin;
        }
    }
}

