/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.io.FilenameFilter;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.leveldb.LevelDBStoreViewMBean;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4677Test {
    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class);
    private static BrokerService brokerService;
    @Rule
    public TestName name = new TestName();
    private File dataDirFile;

    @Before
    public void setUp() throws Exception {
        this.dataDirFile = new File("target/LevelDBCleanupTest");
        brokerService = new BrokerService();
        brokerService.setBrokerName("LevelDBBroker");
        brokerService.setPersistent(true);
        brokerService.setUseJmx(true);
        brokerService.setAdvisorySupport(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setDataDirectoryFile(this.dataDirFile);
        LevelDBStore persistenceFactory = new LevelDBStore();
        persistenceFactory.setDirectory(this.dataDirFile);
        brokerService.setPersistenceAdapter((PersistenceAdapter)persistenceFactory);
        brokerService.start();
        brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    @Test
    public void testSendAndReceiveAllMessages() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://LevelDBBroker");
        Connection connection = connectionFactory.createConnection();
        connection.setClientID(this.getClass().getName());
        connection.start();
        final Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(this.name.toString());
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.setDeliveryMode(2);
        final LevelDBStoreViewMBean levelDBView = this.getLevelDBStoreMBean();
        Assert.assertNotNull((Object)levelDBView);
        levelDBView.compact();
        int SIZE = 10240;
        int MSG_COUNT = 30000;
        final CountDownLatch done = new CountDownLatch(30000);
        byte[] buffer = new byte[10240];
        for (i = 0; i < 10240; ++i) {
            buffer[i] = -128;
        }
        for (i = 0; i < 30000; ++i) {
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(buffer);
            producer.send((Message)message);
            if (i % 1000 != 0) continue;
            LOG.info("Sent message #{}", (Object)i);
            session.commit();
        }
        session.commit();
        LOG.info("Finished sending all messages.");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                if (done.getCount() % 1000L == 0L) {
                    try {
                        LOG.info("Received message #{}", (Object)(30000L - done.getCount()));
                        session.commit();
                    }
                    catch (JMSException jMSException) {
                        // empty catch block
                    }
                }
                done.countDown();
            }
        });
        done.await(15L, TimeUnit.MINUTES);
        session.commit();
        LOG.info("Finished receiving all messages.");
        Assert.assertTrue((String)"Should < 3 logfiles left.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                levelDBView.compact();
                return AMQ4677Test.this.countLogFiles() < 3L;
            }
        }, (long)TimeUnit.MINUTES.toMillis(5L), (long)((int)TimeUnit.SECONDS.toMillis(30L))));
        levelDBView.compact();
        LOG.info("Current number of logs {}", (Object)this.countLogFiles());
    }

    protected long countLogFiles() {
        String[] logFiles = this.dataDirFile.list(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.endsWith("log");
            }
        });
        LOG.info("Current number of logs {}", (Object)logFiles.length);
        return logFiles.length;
    }

    protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception {
        ObjectName levelDbViewMBeanQuery = new ObjectName("org.apache.activemq:type=Broker,brokerName=LevelDBBroker,service=PersistenceAdapter,instanceName=LevelDB*");
        Set names = brokerService.getManagementContext().queryNames(null, (QueryExp)levelDbViewMBeanQuery);
        if (names.isEmpty() || names.size() > 1) {
            throw new IllegalStateException("Can't find levelDB store name.");
        }
        LevelDBStoreViewMBean proxy = (LevelDBStoreViewMBean)brokerService.getManagementContext().newProxyInstance((ObjectName)names.iterator().next(), LevelDBStoreViewMBean.class, true);
        return proxy;
    }
}

