/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KahaDBOffsetRecoveryListenerTest {
    protected BrokerService brokerService = null;
    protected KahaDBStore kaha = null;

    @Before
    public void beforeEach() throws Exception {
    }

    @After
    public void afterEach() {
        this.brokerService = null;
        this.kaha = null;
    }

    protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(false);
        broker.setPersistenceAdapter((PersistenceAdapter)kaha);
        broker.start();
        broker.waitUntilStarted(10000L);
        return broker;
    }

    private KahaDBStore createStore(boolean delete) throws IOException {
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(new File("target/activemq-data/kahadb-recovery-tests"));
        if (delete) {
            kaha.deleteAllMessages();
        }
        return kaha;
    }

    protected void runOffsetTest(int sendCount, int expectedMessageCount, int recoverOffset, int recoverCount, int expectedRecoverCount, int expectedRecoverIndex, String queueName) throws Exception {
        this.kaha = this.createStore(true);
        this.kaha.setJournalMaxFileLength(102400);
        this.brokerService = this.createBroker(this.kaha);
        this.sendMessages(sendCount, queueName);
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
        TestMessageRecoveryListener testMessageRecoveryListener = new TestMessageRecoveryListener();
        this.kaha = this.createStore(false);
        this.kaha.start();
        MessageStore messageStore = this.kaha.createQueueMessageStore(new ActiveMQQueue(queueName));
        messageStore.start();
        Assert.assertEquals((Object)expectedMessageCount, (Object)messageStore.getMessageCount());
        messageStore.recoverNextMessages(recoverOffset, recoverCount, (MessageRecoveryListener)testMessageRecoveryListener);
        messageStore.stop();
        this.kaha.stop();
        Assert.assertEquals((Object)expectedRecoverCount, (Object)testMessageRecoveryListener.getRecoveredMessages().size());
        if (expectedRecoverIndex >= 0) {
            Assert.assertEquals((Object)expectedRecoverIndex, (Object)((Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index")));
        }
        this.brokerService = this.createBroker(this.kaha);
        Assert.assertEquals((long)sendCount, (long)this.receiveMessages(queueName));
    }

    @Test
    public void testOffsetZero() throws Exception {
        this.runOffsetTest(1000, 1000, 0, 1, 1, 0, "TEST.OFFSET.ZERO");
    }

    @Test
    public void testOffsetOne() throws Exception {
        this.runOffsetTest(1000, 1000, 1, 1, 1, 1, "TEST.OFFSET.ONE");
    }

    @Test
    public void testOffsetLastMinusOne() throws Exception {
        this.runOffsetTest(1000, 1000, 999, 1, 1, 999, "TEST.OFFSET.LASTMINUSONE");
    }

    @Test
    public void testOffsetLast() throws Exception {
        this.runOffsetTest(1000, 1000, 1000, 1, 0, -1, "TEST.OFFSET.LAST");
    }

    @Test
    public void testOffsetBeyondQueueSizeNoError() throws Exception {
        this.runOffsetTest(1000, 1000, 10000, 1, 0, -1, "TEST.OFFSET.BEYOND");
    }

    @Test
    public void testOffsetEmptyQueue() throws Exception {
        this.runOffsetTest(0, 0, 10000, 1, 0, -1, "TEST.OFFSET.EMPTY");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessages(int count, String queueName) throws JMSException {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        cf.setUseAsyncSend(true);
        cf.setProducerWindowSize(1024);
        cf.setWatchTopicAdvisories(false);
        try (Connection connection = cf.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(queueName));
            for (int i = 0; i < count; ++i) {
                TextMessage textMessage = session.createTextMessage(this.createContent(i));
                textMessage.setIntProperty("index", i);
                producer.send((javax.jms.Message)textMessage);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int receiveMessages(String queueName) throws JMSException {
        int rc = 0;
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        try (Connection connection = cf.createConnection();){
            connection.start();
            Session session = connection.createSession(false, 1);
            MessageConsumer messageConsumer = session.createConsumer((Destination)new ActiveMQQueue(queueName));
            while (messageConsumer.receive(1000L) != null) {
                ++rc;
            }
            int n = rc;
            return n;
        }
    }

    private String createContent(int i) {
        StringBuilder sb = new StringBuilder(i + ":");
        while (sb.length() < 1024) {
            sb.append("*");
        }
        return sb.toString();
    }

    static class TestMessageRecoveryListener
    implements MessageRecoveryListener {
        List<MessageId> recoveredMessageIds = new LinkedList<MessageId>();
        List<Message> recoveredMessages = new LinkedList<Message>();

        TestMessageRecoveryListener() {
        }

        public boolean hasSpace() {
            return true;
        }

        public boolean isDuplicate(MessageId messageId) {
            return this.recoveredMessageIds.contains(messageId);
        }

        public boolean recoverMessage(Message message) throws Exception {
            if (this.recoveredMessages.contains(message)) {
                return false;
            }
            return this.recoveredMessages.add(message);
        }

        public boolean recoverMessageReference(MessageId messageId) throws Exception {
            if (this.recoveredMessageIds.contains(messageId)) {
                return false;
            }
            return this.recoveredMessageIds.add(messageId);
        }

        public List<MessageId> getRecoveredMessageIds() {
            return this.recoveredMessageIds;
        }

        public List<Message> getRecoveredMessages() {
            return this.recoveredMessages;
        }
    }
}

