/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.File;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KahaDBDurableMessageRecoveryTest {
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
    private BrokerService broker;
    private URI brokerConnectURI;
    private boolean recoverIndex;
    private boolean enableSubscriptionStats;

    @Parameterized.Parameters(name="recoverIndex={0},enableSubscriptionStats={1}")
    public static Collection<Object[]> data() {
        return Arrays.asList({false, false}, {false, true}, {true, false}, {true, true});
    }

    @Before
    public void setUpBroker() throws Exception {
        this.startBroker(false);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex, boolean enableSubscriptionStats) {
        this.recoverIndex = recoverIndex;
        this.enableSubscriptionStats = enableSubscriptionStats;
    }

    protected void startBroker(boolean recoverIndex) throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(true);
        this.broker.setDataDirectoryFile(this.dataFileDir.getRoot());
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        this.configurePersistence(this.broker, recoverIndex);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
        adapter.setForceRecoverIndex(forceRecoverIndex);
        adapter.setEnableSubscriptionStatistics(this.enableSubscriptionStats);
        adapter.setJournalMaxFileLength(20480);
    }

    protected void restartBroker(boolean deleteIndex) throws Exception {
        this.stopBroker();
        this.startBroker(deleteIndex);
    }

    protected Session getSession(int ackMode) throws Exception {
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId1");
        connection.start();
        Session session = connection.createSession(false, ackMode);
        return session;
    }

    @Test
    public void durableRecoveryIndividualAcknowledge() throws Exception {
        TextMessage received;
        int i;
        String testTopic = "test.topic";
        Session session = this.getSession(4);
        ActiveMQTopic topic = (ActiveMQTopic)session.createTopic(testTopic);
        MessageProducer producer = session.createProducer((Destination)topic);
        TopicSubscriber subscriber = session.createDurableSubscriber((javax.jms.Topic)topic, "sub1");
        for (i = 1; i <= 10; ++i) {
            producer.send((javax.jms.Message)session.createTextMessage("msg: " + i));
        }
        producer.close();
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        for (i = 1; i <= 10; ++i) {
            received = (TextMessage)subscriber.receive(1000L);
            Assert.assertNotNull((Object)received);
            if (i != 5) continue;
            received.acknowledge();
        }
        Assert.assertTrue((boolean)Wait.waitFor(() -> 9L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        subscriber.close();
        this.restartBroker(this.recoverIndex);
        Assert.assertTrue((boolean)Wait.waitFor(() -> 9L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        session = this.getSession(1);
        subscriber = session.createDurableSubscriber((javax.jms.Topic)topic, "sub1");
        for (i = 1; i <= 4; ++i) {
            received = (TextMessage)subscriber.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        for (i = 6; i <= 10; ++i) {
            received = (TextMessage)subscriber.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        subscriber.close();
        Assert.assertTrue((boolean)Wait.waitFor(() -> 0L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
    }

    @Test
    public void multipleDurableRecoveryIndividualAcknowledge() throws Exception {
        TextMessage received;
        int i;
        int i2;
        String testTopic = "test.topic";
        Session session = this.getSession(4);
        ActiveMQTopic topic = (ActiveMQTopic)session.createTopic(testTopic);
        MessageProducer producer = session.createProducer((Destination)topic);
        TopicSubscriber subscriber1 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub1");
        TopicSubscriber subscriber2 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub2");
        for (i2 = 1; i2 <= 10; ++i2) {
            producer.send((javax.jms.Message)session.createTextMessage("msg: " + i2));
        }
        producer.close();
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub2"), (long)3000L, (long)500L));
        for (i2 = 1; i2 <= 10; ++i2) {
            TextMessage received2 = (TextMessage)subscriber1.receive(1000L);
            Assert.assertNotNull((Object)received2);
            if (i2 != 3 && i2 != 7) continue;
            received2.acknowledge();
        }
        Assert.assertTrue((boolean)Wait.waitFor(() -> 8L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub2"), (long)3000L, (long)500L));
        long sub1PendingSizeBeforeRestart = this.getPendingMessageSize(topic, "clientId1", "sub1");
        long sub2PendingSizeBeforeRestart = this.getPendingMessageSize(topic, "clientId1", "sub2");
        Assert.assertTrue((sub1PendingSizeBeforeRestart > 0L ? 1 : 0) != 0);
        Assert.assertTrue((sub2PendingSizeBeforeRestart > 0L ? 1 : 0) != 0);
        Assert.assertTrue((sub1PendingSizeBeforeRestart < sub2PendingSizeBeforeRestart ? 1 : 0) != 0);
        subscriber1.close();
        subscriber2.close();
        this.restartBroker(this.recoverIndex);
        Assert.assertTrue((boolean)Wait.waitFor(() -> 8L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub2"), (long)3000L, (long)500L));
        Assert.assertEquals((long)sub1PendingSizeBeforeRestart, (long)this.getPendingMessageSize(topic, "clientId1", "sub1"));
        Assert.assertEquals((long)sub2PendingSizeBeforeRestart, (long)this.getPendingMessageSize(topic, "clientId1", "sub2"));
        session = this.getSession(1);
        subscriber1 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub1");
        subscriber2 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub2");
        for (i = 1; i <= 2; ++i) {
            received = (TextMessage)subscriber1.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        for (i = 4; i <= 6; ++i) {
            received = (TextMessage)subscriber1.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        for (i = 8; i <= 10; ++i) {
            received = (TextMessage)subscriber1.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        for (i = 1; i <= 10; ++i) {
            received = (TextMessage)subscriber2.receive(1000L);
            Assert.assertNotNull((Object)received);
            Assert.assertEquals((Object)("msg: " + i), (Object)received.getText());
        }
        subscriber1.close();
        subscriber2.close();
        Assert.assertTrue((boolean)Wait.waitFor(() -> 0L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> 0L == this.getPendingMessageCount(topic, "clientId1", "sub2"), (long)3000L, (long)500L));
    }

    @Test
    public void multipleDurableTestRecoverSubscription() throws Exception {
        int i;
        String testTopic = "test.topic";
        Session session = this.getSession(4);
        ActiveMQTopic topic = (ActiveMQTopic)session.createTopic(testTopic);
        MessageProducer producer = session.createProducer((Destination)topic);
        TopicSubscriber subscriber1 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub1");
        TopicSubscriber subscriber2 = session.createDurableSubscriber((javax.jms.Topic)topic, "sub2");
        for (i = 1; i <= 10; ++i) {
            producer.send((javax.jms.Message)session.createTextMessage("msg: " + i));
        }
        producer.close();
        for (i = 1; i <= 10; ++i) {
            TextMessage received = (TextMessage)subscriber1.receive(1000L);
            Assert.assertNotNull((Object)received);
            if (i != 3 && i != 7) continue;
            received.acknowledge();
        }
        Assert.assertTrue((boolean)Wait.waitFor(() -> 8L == this.getPendingMessageCount(topic, "clientId1", "sub1"), (long)3000L, (long)500L));
        Assert.assertTrue((boolean)Wait.waitFor(() -> 10L == this.getPendingMessageCount(topic, "clientId1", "sub2"), (long)3000L, (long)500L));
        subscriber1.close();
        subscriber2.close();
        this.restartBroker(this.recoverIndex);
        Topic brokerTopic = (Topic)this.broker.getDestination((ActiveMQDestination)topic);
        TopicMessageStore store = (TopicMessageStore)brokerTopic.getMessageStore();
        final AtomicInteger sub1Recovered = new AtomicInteger();
        final AtomicInteger sub2Recovered = new AtomicInteger();
        store.recoverSubscription("clientId1", "sub1", new MessageRecoveryListener(){

            public boolean recoverMessageReference(MessageId ref) throws Exception {
                return false;
            }

            public boolean recoverMessage(Message message) throws Exception {
                TextMessage textMessage = (TextMessage)message;
                if (textMessage.getText().equals("msg: 3") || textMessage.getText().equals("msg: 7")) {
                    throw new IllegalStateException("Got wrong message: " + textMessage.getText());
                }
                sub1Recovered.incrementAndGet();
                return true;
            }

            public boolean isDuplicate(MessageId ref) {
                return false;
            }

            public boolean hasSpace() {
                return true;
            }
        });
        store.recoverSubscription("clientId1", "sub2", new MessageRecoveryListener(){

            public boolean recoverMessageReference(MessageId ref) throws Exception {
                return false;
            }

            public boolean recoverMessage(Message message) throws Exception {
                sub2Recovered.incrementAndGet();
                return true;
            }

            public boolean isDuplicate(MessageId ref) {
                return false;
            }

            public boolean hasSpace() {
                return true;
            }
        });
        Assert.assertEquals((long)8L, (long)sub1Recovered.get());
        Assert.assertEquals((long)10L, (long)sub2Recovered.get());
    }

    protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
        Topic brokerTopic = (Topic)this.broker.getDestination((ActiveMQDestination)topic);
        TopicMessageStore store = (TopicMessageStore)brokerTopic.getMessageStore();
        return store.getMessageCount(clientId, subId);
    }

    protected long getPendingMessageSize(ActiveMQTopic topic, String clientId, String subId) throws Exception {
        Topic brokerTopic = (Topic)this.broker.getDestination((ActiveMQDestination)topic);
        TopicMessageStore store = (TopicMessageStore)brokerTopic.getMessageStore();
        return store.getMessageSize(clientId, subId);
    }
}

