/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverDuplicateTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class);
    private static final String QUEUE_NAME = "TestQueue";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;

    public void tearDown() throws Exception {
        this.stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup);
        this.broker.start();
    }

    public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup, bindAddress);
        this.broker.start();
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        return this.createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(bindAddress);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
        this.url = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
        factory.setAuditMaximumProducerNumber(2048);
        factory.setOptimizeAcknowledge(true);
    }

    public void testFailoverSendReplyLost() throws Exception {
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
        final CountDownLatch producersDone = new CountDownLatch(1);
        final AtomicBoolean first = new AtomicBoolean(false);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (first.compareAndSet(false, true)) {
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                LOG.info("Waiting for recepit");
                                TestCase.assertTrue((String)"message received on time", (boolean)gotMessageLatch.await(60L, TimeUnit.SECONDS));
                                TestCase.assertTrue((String)"new producers done on time", (boolean)producersDone.await(120L, TimeUnit.SECONDS));
                                LOG.info("Stopping connection post send and receive and multiple producers");
                                producerExchange.getConnectionContext().getConnection().stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.watchTopicAdvisories=false");
        this.configureConnectionFactory(cf);
        Connection sendConnection = cf.createConnection();
        sendConnection.start();
        final Session sendSession = sendConnection.createSession(false, 1);
        final Queue destination = sendSession.createQueue(QUEUE_NAME);
        final AtomicInteger receivedCount = new AtomicInteger();
        MessageListener listener = new MessageListener(){

            public void onMessage(javax.jms.Message message) {
                gotMessageLatch.countDown();
                receivedCount.incrementAndGet();
            }
        };
        Session receiveSession = null;
        Connection receiveConnection = cf.createConnection();
        receiveConnection.start();
        receiveSession = receiveConnection.createSession(false, 1);
        receiveSession.createConsumer((Destination)destination).setMessageListener(listener);
        final CountDownLatch sendDoneLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async send...");
                try {
                    FailoverDuplicateTest.this.produceMessage(sendSession, destination, "will resend", 1);
                }
                catch (JMSException e) {
                    LOG.error("got send exception: ", (Throwable)e);
                    TestCase.fail((String)("got unexpected send exception" + e));
                }
                sendDoneLatch.countDown();
                LOG.info("done async send");
            }
        });
        FailoverDuplicateTest.assertTrue((String)"one message got through on time", (boolean)gotMessageLatch.await(20L, TimeUnit.SECONDS));
        int numProducers = 1050;
        int numPerProducer = 2;
        int totalSent = 2101;
        for (int i = 0; i < 1050; ++i) {
            this.produceMessage(receiveSession, destination, "new producer " + i, 2);
            if (i != 1025) continue;
            LOG.info("count down producers done");
            producersDone.countDown();
        }
        FailoverDuplicateTest.assertTrue((String)"message sent complete through failover", (boolean)sendDoneLatch.await(30L, TimeUnit.SECONDS));
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("received count:" + receivedCount.get());
                return 2101 <= receivedCount.get();
            }
        });
        FailoverDuplicateTest.assertEquals((String)"we got all produced messages", (int)2101, (int)receivedCount.get());
        sendConnection.close();
        receiveConnection.close();
        FailoverDuplicateTest.assertEquals((String)"expect all messages are dequeued with one duplicate to dlq", (long)2103L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("dequeues : " + ((RegionBroker)FailoverDuplicateTest.this.broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
                return 2102L <= ((RegionBroker)FailoverDuplicateTest.this.broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
            }
        });
        FailoverDuplicateTest.assertEquals((String)"dequeue correct, including duplicate dispatch poisoned", (long)2102L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with second restart..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        sendConnection = cf.createConnection();
        sendConnection.start();
        Session session2 = sendConnection.createSession(false, 1);
        MessageConsumer consumer = session2.createConsumer((Destination)destination);
        javax.jms.Message msg = consumer.receive(1000L);
        if (msg == null) {
            msg = consumer.receive(5000L);
        }
        FailoverDuplicateTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        sendConnection.close();
    }

    private void produceMessage(Session producerSession, Queue destination, String text, int count) throws JMSException {
        MessageProducer producer = producerSession.createProducer((Destination)destination);
        for (int i = 0; i < count; ++i) {
            TextMessage message = producerSession.createTextMessage(text + ", count:" + i);
            producer.send((javax.jms.Message)message);
        }
        producer.close();
    }
}

