/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.AbstractStoreStatTestSupport;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractPendingMessageCursorTest
extends AbstractStoreStatTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractPendingMessageCursorTest.class);
    protected BrokerService broker;
    protected URI brokerConnectURI;
    protected String defaultQueueName = "test.queue";
    protected String defaultTopicName = "test.topic";
    protected static int maxMessageSize = 1000;
    protected final boolean prioritizedMessages;
    protected boolean enableSubscriptionStatistics;
    @Rule
    public Timeout globalTimeout = new Timeout(60L, TimeUnit.SECONDS);

    public AbstractPendingMessageCursorTest(boolean prioritizedMessages) {
        this.prioritizedMessages = prioritizedMessages;
    }

    @Before
    public void startBroker() throws Exception {
        this.setUpBroker(true);
    }

    protected void setUpBroker(boolean clearDataDir) throws Exception {
        this.broker = new BrokerService();
        this.initPersistence(this.broker);
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        PolicyEntry policy = new PolicyEntry();
        policy.setTopicPrefetch(100);
        policy.setDurableTopicPrefetch(100);
        policy.setPrioritizedMessages(this.isPrioritizedMessages());
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        this.broker.setDestinationPolicy(pMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Override
    protected BrokerService getBroker() {
        return this.broker;
    }

    @Override
    protected URI getBrokerConnectURI() {
        return this.brokerConnectURI;
    }

    protected abstract void initPersistence(BrokerService var1) throws IOException;

    protected boolean isPrioritizedMessages() {
        return this.prioritizedMessages;
    }

    @Test
    public void testQueueMessageSize() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.verifyPendingStats(dest, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
    }

    @Test
    public void testQueueBrowserMessageSize() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.browseTestQueueMessages(dest.getName());
        this.verifyPendingStats(dest, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
    }

    @Test
    public void testQueueMessageSizeNonPersistent() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(200, 1, publishedMessageSize);
        this.verifyPendingStats(dest, 200, publishedMessageSize.get());
    }

    @Test
    public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedNonPersistentMessageSize = new AtomicLong();
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(100, 2, publishedMessageSize);
        dest = this.publishTestQueueMessages(100, 1, publishedNonPersistentMessageSize);
        this.verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get());
        this.verifyStoreStats((Destination)dest, 100, publishedMessageSize.get());
    }

    @Test
    public void testQueueMessageSizeAfterConsumption() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.verifyPendingStats(dest, 200, publishedMessageSize.get());
        this.consumeTestQueueMessages();
        this.verifyPendingStats(dest, 0, 0L);
        this.verifyStoreStats((Destination)dest, 0, 0L);
    }

    @Test
    public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
        Assume.assumeFalse((boolean)this.enableSubscriptionStatistics);
        AtomicLong publishedMessageSize = new AtomicLong();
        Queue dest = this.publishTestQueueMessages(200, 1, publishedMessageSize);
        this.verifyPendingStats(dest, 200, publishedMessageSize.get());
        this.consumeTestQueueMessages();
        this.verifyPendingStats(dest, 0, 0L);
        this.verifyStoreStats((Destination)dest, 0, 0L);
    }

    @Test
    public void testTopicMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(this.defaultTopicName));
        Topic dest = this.publishTestTopicMessages(200, publishedMessageSize);
        this.verifyPendingStats(dest, 100, 10000L);
        this.consumeTestMessages(consumer, 200);
        this.verifyPendingStats(dest, 0, 0L);
        connection.close();
    }

    @Test
    public void testTopicNonPersistentMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(this.defaultTopicName));
        Topic dest = this.publishTestTopicMessages(200, 1, publishedMessageSize);
        this.verifyPendingStats(dest, 100, 10000L);
        this.consumeTestMessages(consumer, 200);
        this.verifyPendingStats(dest, 0, 0L);
        connection.close();
    }

    @Test
    public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((javax.jms.Destination)new ActiveMQTopic(this.defaultTopicName));
        Topic dest = this.publishTestTopicMessages(100, 1, publishedMessageSize);
        dest = this.publishTestTopicMessages(100, 2, publishedMessageSize);
        this.verifyPendingStats(dest, 100, 10000L);
        this.consumeTestMessages(consumer, 200);
        this.verifyPendingStats(dest, 0, 0L);
        connection.close();
    }

    @Test
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        Topic dest = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, publishedMessageSize, 2);
        this.verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
        Assert.assertEquals((long)((DurableTopicSubscription)dest.getDurableTopicSubs().get(subKey)).getPendingMessageSize(), (long)dest.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        this.consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
        this.verifyPendingStats(dest, subKey, 0, 0L);
        this.verifyStoreStats((Destination)dest, 0, 0L);
        connection.close();
    }

    @Test
    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        Topic dest = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, publishedMessageSize, 2);
        this.verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
        this.consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
        this.verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 150, publishedMessageSize.get());
        connection.close();
    }

    @Test
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Topic dest = this.publishTestMessagesDurable(connection, new String[]{"sub1", "sub2"}, 200, publishedMessageSize, 2);
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        this.verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
        this.consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
        this.verifyPendingStats(dest, subKey, 0, 0L);
        this.verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
        connection.stop();
    }

    protected void verifyPendingStats(Queue queue, int count, long minimumSize) throws Exception {
        this.verifyPendingStats(queue, count, minimumSize, count, minimumSize);
    }

    protected void verifyPendingStats(final Queue queue, final int count, long minimumSize, int storeCount, long minimumStoreSize) throws Exception {
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return queue.getPendingMessageCount() == (long)count;
            }
        }));
        this.verifySize(count, new MessageSizeCalculator(){

            @Override
            public long getMessageSize() throws Exception {
                return queue.getPendingMessageSize();
            }
        }, minimumSize);
    }

    protected void verifyPendingStats(Topic topic, final int count, long minimumSize) throws Exception {
        final TopicSubscription sub = (TopicSubscription)topic.getConsumers().get(0);
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return sub.getPendingQueueSize() == count;
            }
        }));
        this.verifySize(count, new MessageSizeCalculator(){

            @Override
            public long getMessageSize() throws Exception {
                return sub.getPendingMessageSize();
            }
        }, minimumSize);
    }

    protected void verifyPendingStats(Topic topic, SubscriptionKey subKey, final int count, long minimumSize) throws Exception {
        final DurableTopicSubscription sub = (DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey);
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return sub.getPendingQueueSize() == count;
            }
        }));
        this.verifySize(count, new MessageSizeCalculator(){

            @Override
            public long getMessageSize() throws Exception {
                return sub.getPendingMessageSize();
            }
        }, minimumSize);
    }

    protected void verifyStoreStats(Destination dest, final int storeCount, long minimumStoreSize) throws Exception {
        final MessageStore messageStore = dest.getMessageStore();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return messageStore.getMessageCount() == storeCount;
            }
        }));
        this.verifySize(storeCount, new MessageSizeCalculator(){

            @Override
            public long getMessageSize() throws Exception {
                return messageStore.getMessageSize();
            }
        }, minimumStoreSize);
    }

    protected void verifySize(int count, final MessageSizeCalculator messageSizeCalculator, final long minimumSize) throws Exception {
        if (count > 0) {
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return messageSizeCalculator.getMessageSize() > minimumSize;
                }
            }));
        } else {
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return messageSizeCalculator.getMessageSize() == 0L;
                }
            }));
        }
    }

    protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception {
        return this.consumeTestMessages(consumer, size, this.defaultTopicName);
    }

    protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception {
        ActiveMQTopic activeMqTopic = new ActiveMQTopic(topicName);
        Destination dest = this.broker.getDestination((ActiveMQDestination)activeMqTopic);
        for (int i = 0; i < size; ++i) {
            consumer.receive();
        }
        return dest;
    }

    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
        return this.consumeDurableTestMessages(connection, sub, size, this.defaultTopicName, publishedMessageSize);
    }

    protected Topic publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception {
        return this.publishTestMessagesDurable(connection, subNames, this.defaultTopicName, publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize, false, deliveryMode);
    }

    protected Topic publishTestTopicMessages(int publishSize, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestTopicMessages(publishSize, 2, publishedMessageSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Topic publishTestTopicMessages(int publishSize, int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId2");
        connection.start();
        ActiveMQTopic activeMqTopic = new ActiveMQTopic(this.defaultTopicName);
        Topic dest = (Topic)this.broker.getDestination((ActiveMQDestination)activeMqTopic);
        Session session = connection.createSession(false, 1);
        javax.jms.Topic topic = session.createTopic(this.defaultTopicName);
        try {
            MessageProducer prod = session.createProducer((javax.jms.Destination)topic);
            prod.setDeliveryMode(deliveryMode);
            for (int i = 0; i < publishSize; ++i) {
                prod.send((Message)this.createMessage(i, session, maxMessageSize, publishedMessageSize));
            }
        }
        finally {
            connection.close();
        }
        return dest;
    }

    protected Queue publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestQueueMessages(count, this.defaultQueueName, 2, maxMessageSize, publishedMessageSize);
    }

    protected Queue publishTestQueueMessages(int count, int deliveryMode, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestQueueMessages(count, this.defaultQueueName, deliveryMode, maxMessageSize, publishedMessageSize);
    }

    protected Destination consumeTestQueueMessages() throws Exception {
        return this.consumeTestQueueMessages(this.defaultQueueName);
    }

    protected static interface MessageSizeCalculator {
        public long getMessageSize() throws Exception;
    }
}

