/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, MediumTests.class})
public class TestReplicationSource {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSource.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSource.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static final HBaseTestingUtility TEST_UTIL_PEER = new HBaseTestingUtility();
    private static FileSystem FS;
    private static Path oldLogDir;
    private static Path logDir;
    private static Configuration conf;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL.startMiniDFSCluster(1);
        FS = TEST_UTIL.getDFSCluster().getFileSystem();
        Path rootDir = TEST_UTIL.createRootDir();
        oldLogDir = new Path(rootDir, "oldWALs");
        if (FS.exists(oldLogDir)) {
            FS.delete(oldLogDir, true);
        }
        if (FS.exists(logDir = new Path(rootDir, "WALs"))) {
            FS.delete(logDir, true);
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL_PEER.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniHBaseCluster();
        TEST_UTIL.shutdownMiniDFSCluster();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDefaultSkipsMetaWAL() throws IOException {
        ReplicationSource rs = new ReplicationSource();
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        conf.setInt("replication.source.maxretriesmultiplier", 1);
        ReplicationPeer mockPeer = (ReplicationPeer)Mockito.mock(ReplicationPeer.class);
        Mockito.when((Object)mockPeer.getConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)mockPeer.getPeerBandwidth()).thenReturn((Object)0L);
        ReplicationPeerConfig peerConfig = (ReplicationPeerConfig)Mockito.mock(ReplicationPeerConfig.class);
        Mockito.when((Object)peerConfig.getReplicationEndpointImpl()).thenReturn((Object)DoNothingReplicationEndpoint.class.getName());
        Mockito.when((Object)mockPeer.getPeerConfig()).thenReturn((Object)peerConfig);
        ReplicationSourceManager manager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
        Mockito.when((Object)manager.getTotalBufferUsed()).thenReturn((Object)new AtomicLong());
        String queueId = "qid";
        RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName((String)"a.b.c,1,1"));
        rs.init(conf, null, manager, null, mockPeer, (Server)rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId));
        try {
            rs.startup();
            Assert.assertTrue((boolean)rs.isSourceActive());
            Assert.assertEquals((long)0L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
            rs.enqueueLog(new Path("a.1.meta"));
            Assert.assertEquals((long)0L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
            rs.enqueueLog(new Path("a.1"));
            Assert.assertEquals((long)1L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
        }
        finally {
            rs.terminate("Done");
            rss.stop("Done");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWALEntryFilter() throws IOException {
        ReplicationSource rs = new ReplicationSource();
        UUID uuid = UUID.randomUUID();
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationPeer mockPeer = (ReplicationPeer)Mockito.mock(ReplicationPeer.class);
        Mockito.when((Object)mockPeer.getConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)mockPeer.getPeerBandwidth()).thenReturn((Object)0L);
        ReplicationPeerConfig peerConfig = (ReplicationPeerConfig)Mockito.mock(ReplicationPeerConfig.class);
        Mockito.when((Object)peerConfig.getReplicationEndpointImpl()).thenReturn((Object)DoNothingReplicationEndpoint.class.getName());
        Mockito.when((Object)mockPeer.getPeerConfig()).thenReturn((Object)peerConfig);
        ReplicationSourceManager manager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
        Mockito.when((Object)manager.getTotalBufferUsed()).thenReturn((Object)new AtomicLong());
        String queueId = "qid";
        RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName((String)"a.b.c,1,1"));
        rs.init(conf, null, manager, null, mockPeer, (Server)rss, queueId, uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
        try {
            rs.startup();
            TEST_UTIL.waitFor(30000L, () -> rs.getWalEntryFilter() != null);
            WALEntryFilter wef = rs.getWalEntryFilter();
            WALEdit we = new WALEdit().add(CellBuilderFactory.create((CellBuilderType)CellBuilderType.DEEP_COPY).setRow(HConstants.EMPTY_START_ROW).setFamily(HConstants.CATALOG_FAMILY).setType(Cell.Type.Put).build());
            WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.valueOf((String)"test"), -1L, -1L, uuid), we);
            Assert.assertTrue((wef.filter(e) == e ? 1 : 0) != 0);
            e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1L, -1L, uuid), we);
            Assert.assertNull((Object)wef.filter(e));
        }
        finally {
            rs.terminate("Done");
            rss.stop("Done");
        }
    }

    @Test
    public void testLogMoving() throws Exception {
        Path logPath = new Path(logDir, "log");
        if (!FS.exists(logDir)) {
            FS.mkdirs(logDir);
        }
        if (!FS.exists(oldLogDir)) {
            FS.mkdirs(oldLogDir);
        }
        WALProvider.Writer writer = WALFactory.createWALWriter((FileSystem)FS, (Path)logPath, (Configuration)TEST_UTIL.getConfiguration());
        for (int i = 0; i < 3; ++i) {
            byte[] b = Bytes.toBytes((String)Integer.toString(i));
            KeyValue kv = new KeyValue(b, b, b);
            WALEdit edit = new WALEdit();
            edit.add((Cell)kv);
            WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf((byte[])b), 0L, 0L, HConstants.DEFAULT_CLUSTER_ID);
            writer.append(new WAL.Entry(key, edit));
            writer.sync(false);
        }
        writer.close();
        WAL.Reader reader = WALFactory.createReader((FileSystem)FS, (Path)logPath, (Configuration)TEST_UTIL.getConfiguration());
        WAL.Entry entry = reader.next();
        Assert.assertNotNull((Object)entry);
        Path oldLogPath = new Path(oldLogDir, "log");
        FS.rename(logPath, oldLogPath);
        entry = reader.next();
        Assert.assertNotNull((Object)entry);
        reader.next();
        entry = reader.next();
        Assert.assertNull((Object)entry);
        reader.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTerminateTimeout() throws Exception {
        ReplicationSource source = new ReplicationSource();
        DoNothingReplicationEndpoint replicationEndpoint = new DoNothingReplicationEndpoint();
        try {
            replicationEndpoint.start();
            ReplicationPeer mockPeer = (ReplicationPeer)Mockito.mock(ReplicationPeer.class);
            Mockito.when((Object)mockPeer.getPeerBandwidth()).thenReturn((Object)0L);
            Configuration testConf = HBaseConfiguration.create();
            testConf.setInt("replication.source.maxretriesmultiplier", 1);
            ReplicationSourceManager manager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
            Mockito.when((Object)manager.getTotalBufferUsed()).thenReturn((Object)new AtomicLong());
            source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, p -> OptionalLong.empty(), null);
            ExecutorService executor = Executors.newSingleThreadExecutor();
            Future<?> future = executor.submit(() -> source.terminate("testing source termination"));
            long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000L);
            Waiter.waitFor((Configuration)testConf, (long)(sleepForRetries * 2L), future::isDone);
        }
        finally {
            replicationEndpoint.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerShutdownRecoveredQueue() throws Exception {
        try {
            conf.set("hbase.wal.provider", "defaultProvider");
            conf.setInt("replication.sleep.before.failover", 2000);
            conf.set("hbase.regionserver.impl", ShutdownDelayRegionServer.class.getName());
            MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
            TEST_UTIL_PEER.startMiniCluster(1);
            HRegionServer serverA = cluster.getRegionServer(0);
            final ReplicationSourceManager managerA = serverA.getReplicationSourceService().getReplicationManager();
            HRegionServer serverB = cluster.getRegionServer(1);
            final ReplicationSourceManager managerB = serverB.getReplicationSourceService().getReplicationManager();
            Admin admin = TEST_UTIL.getAdmin();
            String peerId = "TestPeer";
            admin.addReplicationPeer("TestPeer", ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() {
                    return !managerA.getSources().isEmpty() && !managerB.getSources().isEmpty();
                }
            });
            admin.disableReplicationPeer("TestPeer");
            cluster.stopRegionServer(serverA.getServerName());
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return managerB.getOldSources().size() == 1;
                }
            });
            final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
            serverC.waitForServerOnline();
            Waiter.waitFor((Configuration)conf, (long)20000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return serverC.getReplicationSourceService() != null;
                }
            });
            ReplicationSourceManager managerC = ((Replication)serverC.getReplicationSourceService()).getReplicationManager();
            Assert.assertEquals((long)0L, (long)managerC.getOldSources().size());
            cluster.stopRegionServer(serverB.getServerName());
            Waiter.waitFor((Configuration)conf, (long)20000L, () -> managerC.getOldSources().size() == 2);
            admin.enableReplicationPeer("TestPeer");
            Waiter.waitFor((Configuration)conf, (long)20000L, () -> managerC.getOldSources().size() == 0);
        }
        finally {
            conf.set("hbase.regionserver.impl", HRegionServer.class.getName());
        }
    }

    @Test
    public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
        String walGroupId = "fake-wal-group-id";
        ServerName serverName = ServerName.valueOf((String)"www.example.com", (int)12006, (long)1524679704418L);
        ServerName deadServer = ServerName.valueOf((String)"www.deadServer.com", (int)12006, (long)1524679704419L);
        PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<Path>();
        queue.put(new Path("/www/html/test"));
        RecoveredReplicationSource source = (RecoveredReplicationSource)Mockito.mock(RecoveredReplicationSource.class);
        Server server = (Server)Mockito.mock(Server.class);
        Mockito.when((Object)server.getServerName()).thenReturn((Object)serverName);
        Mockito.when((Object)source.getServer()).thenReturn((Object)server);
        Mockito.when((Object)source.getServerWALsBelongTo()).thenReturn((Object)deadServer);
        ReplicationQueueStorage storage = (ReplicationQueueStorage)Mockito.mock(ReplicationQueueStorage.class);
        Mockito.when((Object)storage.getWALPosition((ServerName)Mockito.eq((Object)serverName), (String)Mockito.any(), (String)Mockito.any())).thenReturn((Object)1001L);
        Mockito.when((Object)storage.getWALPosition((ServerName)Mockito.eq((Object)deadServer), (String)Mockito.any(), (String)Mockito.any())).thenReturn((Object)-1L);
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        conf.setInt("replication.source.maxretriesmultiplier", -1);
        RecoveredReplicationSourceShipper shipper = new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
        Assert.assertEquals((long)1001L, (long)shipper.getStartPosition());
    }

    private RegionServerServices setupForAbortTests(ReplicationSource rs, Configuration conf, String endpointName) throws IOException {
        conf.setInt("replication.source.maxretriesmultiplier", 1);
        ReplicationPeer mockPeer = (ReplicationPeer)Mockito.mock(ReplicationPeer.class);
        Mockito.when((Object)mockPeer.getConfiguration()).thenReturn((Object)conf);
        Mockito.when((Object)mockPeer.getPeerBandwidth()).thenReturn((Object)0L);
        ReplicationPeerConfig peerConfig = (ReplicationPeerConfig)Mockito.mock(ReplicationPeerConfig.class);
        FaultyReplicationEndpoint.count = 0;
        Mockito.when((Object)peerConfig.getReplicationEndpointImpl()).thenReturn((Object)endpointName);
        Mockito.when((Object)mockPeer.getPeerConfig()).thenReturn((Object)peerConfig);
        ReplicationSourceManager manager = (ReplicationSourceManager)Mockito.mock(ReplicationSourceManager.class);
        Mockito.when((Object)manager.getTotalBufferUsed()).thenReturn((Object)new AtomicLong());
        String queueId = "qid";
        RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName((String)"a.b.c,1,1"));
        rs.init(conf, null, manager, null, mockPeer, (Server)rss, queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId));
        return rss;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAbortFalseOnError() throws IOException {
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        conf.setBoolean("replication.source.regionserver.abort", false);
        ReplicationSource rs = new ReplicationSource();
        RegionServerServices rss = this.setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
        try {
            rs.startup();
            Assert.assertTrue((boolean)rs.isSourceActive());
            Assert.assertEquals((long)0L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
            rs.enqueueLog(new Path("a.1.meta"));
            Assert.assertEquals((long)0L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
            rs.enqueueLog(new Path("a.1"));
            Assert.assertEquals((long)1L, (long)rs.getSourceMetrics().getSizeOfLogQueue());
        }
        finally {
            rs.terminate("Done");
            rss.stop("Done");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAbortFalseOnErrorDoesntBlockMainThread() throws IOException {
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationSource rs = new ReplicationSource();
        RegionServerServices rss = this.setupForAbortTests(rs, conf, FaultyReplicationEndpoint.class.getName());
        try {
            rs.startup();
            Assert.assertTrue((boolean)true);
        }
        finally {
            rs.terminate("Done");
            rss.stop("Done");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAbortTrueOnError() throws IOException {
        Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
        ReplicationSource rs = new ReplicationSource();
        RegionServerServices rss = this.setupForAbortTests(rs, conf, FlakyReplicationEndpoint.class.getName());
        try {
            rs.startup();
            Assert.assertTrue((boolean)rs.isSourceActive());
            Waiter.waitFor((Configuration)conf, (long)1000L, () -> rss.isAborted());
            Assert.assertTrue((boolean)rss.isAborted());
            Waiter.waitFor((Configuration)conf, (long)1000L, () -> !rs.isSourceActive());
            Assert.assertFalse((boolean)rs.isSourceActive());
        }
        finally {
            rs.terminate("Done");
            rss.stop("Done");
        }
    }

    static {
        conf = TEST_UTIL.getConfiguration();
    }

    public static class FaultyReplicationEndpoint
    extends DoNothingReplicationEndpoint {
        static int count = 0;

        @Override
        public synchronized UUID getPeerUUID() {
            throw new RuntimeException();
        }
    }

    public static class FlakyReplicationEndpoint
    extends DoNothingReplicationEndpoint {
        static int count = 0;

        @Override
        public synchronized UUID getPeerUUID() {
            if (count == 0) {
                ++count;
                throw new RuntimeException();
            }
            return super.getPeerUUID();
        }
    }

    public static class DoNothingReplicationEndpoint
    extends HBaseInterClusterReplicationEndpoint {
        private final UUID uuid = UUID.randomUUID();

        public void init(ReplicationEndpoint.Context context) throws IOException {
            this.ctx = context;
        }

        public WALEntryFilter getWALEntryfilter() {
            return null;
        }

        public synchronized UUID getPeerUUID() {
            return this.uuid;
        }

        protected void doStart() {
            this.notifyStarted();
        }

        protected void doStop() {
            this.notifyStopped();
        }

        public boolean canReplicateToSameCluster() {
            return true;
        }
    }

    public static class ShutdownDelayRegionServer
    extends HRegionServer {
        public ShutdownDelayRegionServer(Configuration conf) throws IOException {
            super(conf);
        }

        protected void stopServiceThreads() {
            LOG.info("Adding a delay to the regionserver shutdown");
            try {
                Thread.sleep(2000L);
            }
            catch (InterruptedException ex) {
                LOG.error("Interrupted while sleeping");
            }
            super.stopServiceThreads();
        }
    }
}

