/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.usecases.MessageGroupNewConsumerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageGroupCloseTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MessageGroupNewConsumerTest.class);
    private Connection connection;
    private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
    private int messagesSent;
    private int messagesRecvd1;
    private int messagesRecvd2;
    private int messageGroupCount;
    private int errorCountFirstForConsumer;
    private int errorCountWrongConsumerClose;
    private int errorCountDuplicateClose;
    private HashMap<String, Integer> messageGroups1 = new HashMap();
    private HashMap<String, Integer> messageGroups2 = new HashMap();
    private HashSet<String> closedGroups1 = new HashSet();
    private HashSet<String> closedGroups2 = new HashSet();
    private static final String connStr = "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";

    public void testNewConsumer() throws JMSException, InterruptedException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
        this.connection = factory.createConnection();
        this.connection.start();
        final String queueName = ((Object)((Object)this)).getClass().getSimpleName();
        Thread producerThread2 = new Thread(){

            @Override
            public void run() {
                try {
                    Session session = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    Queue queue = session.createQueue(queueName);
                    MessageProducer prod = session.createProducer((Destination)queue);
                    for (int i = 0; i < 10; ++i) {
                        for (int j = 0; j < 10; ++j) {
                            int seq = j + 1;
                            if ((j + 1) % 5 == 0) {
                                seq = -1;
                            }
                            Message message = MessageGroupCloseTest.this.generateMessage(session, Integer.toString(i), seq);
                            prod.send(message);
                            session.commit();
                            ++MessageGroupCloseTest.this.messagesSent;
                            LOG.info("Sent message: group=" + i + ", seq=" + seq);
                        }
                        if (i % 100 == 0) {
                            LOG.info("Sent messages: group=" + i);
                        }
                        MessageGroupCloseTest.this.setMessageGroupCount(MessageGroupCloseTest.this.getMessageGroupCount() + 1);
                    }
                    LOG.info(MessageGroupCloseTest.this.messagesSent + " messages sent");
                    MessageGroupCloseTest.this.latchMessagesCreated.countDown();
                    prod.close();
                    session.close();
                }
                catch (Exception e) {
                    LOG.error("Producer failed", (Throwable)e);
                }
            }
        };
        Thread consumerThread1 = new Thread(){

            @Override
            public void run() {
                try {
                    Message message;
                    MessageGroupCloseTest.this.latchMessagesCreated.await();
                    LOG.info("starting consumer1");
                    Session session = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    Queue queue = session.createQueue(queueName);
                    MessageConsumer con1 = session.createConsumer((Destination)queue);
                    while ((message = con1.receive(5000L)) != null) {
                        LOG.info("Con1: got message " + MessageGroupCloseTest.this.formatMessage(message));
                        MessageGroupCloseTest.this.checkMessage(message, "Con1", MessageGroupCloseTest.this.messageGroups1, MessageGroupCloseTest.this.closedGroups1);
                        session.commit();
                        ++MessageGroupCloseTest.this.messagesRecvd1;
                        if (MessageGroupCloseTest.this.messagesRecvd1 % 100 != 0) continue;
                        LOG.info("Con1: got messages count=" + MessageGroupCloseTest.this.messagesRecvd1);
                    }
                    LOG.info("Con1: total messages=" + MessageGroupCloseTest.this.messagesRecvd1);
                    LOG.info("Con1: total message groups=" + MessageGroupCloseTest.this.messageGroups1.size());
                    con1.close();
                    session.close();
                }
                catch (Exception e) {
                    LOG.error("Consumer 1 failed", (Throwable)e);
                }
            }
        };
        Thread consumerThread2 = new Thread(){

            @Override
            public void run() {
                try {
                    Message message;
                    MessageGroupCloseTest.this.latchMessagesCreated.await();
                    LOG.info("starting consumer2");
                    Session session = MessageGroupCloseTest.this.connection.createSession(true, 0);
                    Queue queue = session.createQueue(queueName);
                    MessageConsumer con2 = session.createConsumer((Destination)queue);
                    while ((message = con2.receive(5000L)) != null) {
                        LOG.info("Con2: got message " + MessageGroupCloseTest.this.formatMessage(message));
                        MessageGroupCloseTest.this.checkMessage(message, "Con2", MessageGroupCloseTest.this.messageGroups2, MessageGroupCloseTest.this.closedGroups2);
                        session.commit();
                        ++MessageGroupCloseTest.this.messagesRecvd2;
                        if (MessageGroupCloseTest.this.messagesRecvd2 % 100 != 0) continue;
                        LOG.info("Con2: got messages count=" + MessageGroupCloseTest.this.messagesRecvd2);
                    }
                    con2.close();
                    session.close();
                    LOG.info("Con2: total messages=" + MessageGroupCloseTest.this.messagesRecvd2);
                    LOG.info("Con2: total message groups=" + MessageGroupCloseTest.this.messageGroups2.size());
                }
                catch (Exception e) {
                    LOG.error("Consumer 2 failed", (Throwable)e);
                }
            }
        };
        consumerThread2.start();
        consumerThread1.start();
        producerThread2.start();
        producerThread2.join();
        consumerThread1.join();
        consumerThread2.join();
        this.connection.close();
        MessageGroupCloseTest.assertEquals((String)"consumers should get all the messages", (int)this.messagesSent, (int)(this.messagesRecvd1 + this.messagesRecvd2));
        MessageGroupCloseTest.assertEquals((String)"not all message groups closed for consumer 1", (int)this.messageGroups1.size(), (int)this.closedGroups1.size());
        MessageGroupCloseTest.assertEquals((String)"not all message groups closed for consumer 2", (int)this.messageGroups2.size(), (int)this.closedGroups2.size());
        MessageGroupCloseTest.assertTrue((String)"producer failed to send any messages", (this.messagesSent > 0 ? 1 : 0) != 0);
        MessageGroupCloseTest.assertEquals((String)"JMSXGroupFirstForConsumer not set", (int)0, (int)this.errorCountFirstForConsumer);
        MessageGroupCloseTest.assertEquals((String)"wrong consumer got close message", (int)0, (int)this.errorCountWrongConsumerClose);
        MessageGroupCloseTest.assertEquals((String)"consumer got duplicate close message", (int)0, (int)this.errorCountDuplicateClose);
    }

    public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
        TextMessage m = session.createTextMessage();
        m.setJMSType("TEST_MESSAGE");
        m.setStringProperty("JMSXGroupID", groupId);
        m.setIntProperty("JMSXGroupSeq", seq);
        m.setText("<?xml?><testMessage/>");
        return m;
    }

    public String formatMessage(Message m) {
        try {
            return "group=" + m.getStringProperty("JMSXGroupID") + ", seq=" + m.getIntProperty("JMSXGroupSeq");
        }
        catch (Exception e) {
            return e.getClass().getSimpleName() + ": " + e.getMessage();
        }
    }

    public void checkMessage(Message m, String consumerId, Map<String, Integer> messageGroups, Set<String> closedGroups) throws JMSException {
        String groupId = m.getStringProperty("JMSXGroupID");
        int seq = m.getIntProperty("JMSXGroupSeq");
        Integer count = messageGroups.get(groupId);
        if (count == null) {
            if (!m.propertyExists("JMSXGroupFirstForConsumer") || !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
                LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" + seq);
                ++this.errorCountFirstForConsumer;
            }
            if (seq == -1) {
                closedGroups.add(groupId);
                LOG.info(consumerId + ": wrong consumer got close message for group=" + groupId);
                ++this.errorCountWrongConsumerClose;
            }
            messageGroups.put(groupId, 1);
        } else {
            if (closedGroups.contains(groupId)) {
                closedGroups.remove(groupId);
                if (!m.propertyExists("JMSXGroupFirstForConsumer") || !m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
                    LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" + seq);
                    ++this.errorCountFirstForConsumer;
                }
                if (seq == -1) {
                    LOG.info(consumerId + ": consumer got duplicate close message for group=" + groupId);
                    ++this.errorCountDuplicateClose;
                }
            }
            if (seq == -1) {
                closedGroups.add(groupId);
            }
            messageGroups.put(groupId, count + 1);
        }
    }

    public int getMessageGroupCount() {
        return this.messageGroupCount;
    }

    public void setMessageGroupCount(int messageGroupCount) {
        this.messageGroupCount = messageGroupCount;
    }
}

