/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.net.URI;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsQueueBrowserExpirationTest {
    private static final int MESSAGES_TO_SEND = 50;
    private static final long TTL = 1000L;
    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserExpirationTest.class);
    private BrokerService broker;
    private URI connectUri;
    private ActiveMQConnectionFactory factory;
    private final ActiveMQQueue queue = new ActiveMQQueue("TEST");

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        TransportConnector connector = this.broker.addConnector("vm://localhost");
        this.broker.deleteAllMessages();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectUri = connector.getConnectUri();
        this.factory = new ActiveMQConnectionFactory(this.connectUri);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    @Test(timeout=10000L)
    public void testBrowsingExpiration() throws JMSException, InterruptedException {
        this.sendTestMessages();
        Connection browserConnection = this.factory.createConnection();
        browserConnection.start();
        int browsed = this.browse(this.queue, browserConnection);
        Assert.assertEquals((long)50L, (long)browsed);
        long begin = System.nanoTime();
        while (browsed != 0) {
            Thread.sleep(100L);
            browsed = this.browse(this.queue, browserConnection);
            LOG.info("[{}ms] found {}", (Object)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin), (Object)browsed);
        }
        LOG.info("Finished");
        browserConnection.close();
    }

    @Test(timeout=10000L)
    public void testDoNotReceiveExpiredMessage() throws Exception {
        int WAIT_TIME = 1000;
        Connection connection = this.factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        Queue producerQueue = session.createQueue("MyTestQueue");
        MessageProducer producer = session.createProducer((Destination)producerQueue);
        producer.setTimeToLive((long)WAIT_TIME);
        TextMessage message = session.createTextMessage("Test message");
        producer.send((Destination)producerQueue, (Message)message);
        int count = this.getMessageCount(producerQueue, session);
        Assert.assertEquals((long)1L, (long)count);
        Thread.sleep(WAIT_TIME + 1000);
        count = this.getMessageCount(producerQueue, session);
        Assert.assertEquals((long)0L, (long)count);
        producer.close();
        session.close();
        connection.close();
    }

    private int getMessageCount(Queue destination, Session session) throws Exception {
        int result = 0;
        QueueBrowser browser = session.createBrowser(destination);
        Enumeration enumeration = browser.getEnumeration();
        while (enumeration.hasMoreElements()) {
            ++result;
            enumeration.nextElement();
        }
        browser.close();
        return result;
    }

    private int browse(ActiveMQQueue queue, Connection connection) throws JMSException {
        Session session = connection.createSession(false, 1);
        QueueBrowser browser = session.createBrowser((Queue)queue);
        Enumeration enumeration = browser.getEnumeration();
        int browsed = 0;
        while (enumeration.hasMoreElements()) {
            TextMessage m = (TextMessage)enumeration.nextElement();
            LOG.debug("B[{}]: {}", (Object)(++browsed), (Object)m.getText());
        }
        browser.close();
        session.close();
        return browsed;
    }

    protected void sendTestMessages() throws JMSException {
        Connection prodConnection = this.factory.createConnection();
        prodConnection.start();
        Session prodSession = prodConnection.createSession(false, 1);
        MessageProducer producer = prodSession.createProducer((Destination)this.queue);
        producer.setDeliveryMode(1);
        producer.setTimeToLive(1000L);
        for (int i = 1; i <= 50; ++i) {
            String msgStr = "Message: " + i;
            producer.send((Message)prodSession.createTextMessage(msgStr));
            LOG.info("P&C: {}", (Object)msgStr);
        }
        prodSession.close();
    }
}

