/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.PmemVolumeManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestCacheByPmemMappableBlockLoader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class TestPmemCacheRecovery {
    protected static final Logger LOG = LoggerFactory.getLogger(TestCacheByPmemMappableBlockLoader.class);
    protected static final long CACHE_AMOUNT = 65536L;
    protected static final long BLOCK_SIZE = 4096L;
    private static Configuration conf;
    private static MiniDFSCluster cluster;
    private static DistributedFileSystem fs;
    private static DataNode dn;
    private static FsDatasetCache cacheManager;
    private static String blockPoolId;
    private static ReadWriteLock lock;
    private static NativeIO.POSIX.CacheManipulator prevCacheManipulator;
    private static DataNodeFaultInjector oldInjector;
    private static final String PMEM_DIR_0;
    private static final String PMEM_DIR_1;

    @BeforeClass
    public static void setUpClass() throws Exception {
        oldInjector = DataNodeFaultInjector.get();
        DataNodeFaultInjector.set((DataNodeFaultInjector)new DataNodeFaultInjector(){

            public void startOfferService() throws Exception {
                lock.readLock().lock();
            }

            public void endOfferService() throws Exception {
                lock.readLock().unlock();
            }
        });
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        DataNodeFaultInjector.set((DataNodeFaultInjector)oldInjector);
    }

    @Before
    public void setUp() throws Exception {
        conf = new HdfsConfiguration();
        conf.setBoolean("dfs.datanode.pmem.cache.recovery", true);
        conf.setLong("dfs.namenode.path.based.cache.refresh.interval.ms", 100L);
        conf.setLong("dfs.cachereport.intervalMsec", 500L);
        conf.setLong("dfs.blocksize", 4096L);
        conf.setLong("dfs.heartbeat.interval", 1L);
        conf.setInt("dfs.datanode.fsdatasetcache.max.threads.per.volume", 10);
        new File(PMEM_DIR_0).getAbsoluteFile().mkdir();
        new File(PMEM_DIR_1).getAbsoluteFile().mkdir();
        conf.set("dfs.datanode.pmem.cache.dirs", PMEM_DIR_0 + "," + PMEM_DIR_1);
        prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)new NativeIO.POSIX.NoMlockCacheManipulator());
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        dn = cluster.getDataNodes().get(0);
        cacheManager = ((FsDatasetImpl)TestPmemCacheRecovery.dn.getFSDataset()).cacheManager;
    }

    @After
    public void tearDown() throws Exception {
        if (fs != null) {
            fs.close();
            fs = null;
        }
        if (cluster != null) {
            cluster.shutdown();
            cluster = null;
        }
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)prevCacheManipulator);
    }

    protected static void restartCluster() throws Exception {
        conf = new HdfsConfiguration();
        conf.setBoolean("dfs.datanode.pmem.cache.recovery", true);
        conf.setLong("dfs.namenode.path.based.cache.refresh.interval.ms", 100L);
        conf.setLong("dfs.cachereport.intervalMsec", 500L);
        conf.setLong("dfs.blocksize", 4096L);
        conf.setLong("dfs.heartbeat.interval", 1L);
        conf.setInt("dfs.datanode.fsdatasetcache.max.threads.per.volume", 10);
        conf.set("dfs.datanode.pmem.cache.dirs", PMEM_DIR_0 + "," + PMEM_DIR_1);
        prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
        NativeIO.POSIX.setCacheManipulator((NativeIO.POSIX.CacheManipulator)new NativeIO.POSIX.NoMlockCacheManipulator());
        FsDatasetImpl.setBlockPoolId((String)blockPoolId);
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        dn = cluster.getDataNodes().get(0);
        cacheManager = ((FsDatasetImpl)TestPmemCacheRecovery.dn.getFSDataset()).cacheManager;
    }

    protected static void shutdownCluster() {
        if (cluster != null) {
            cluster.shutdown();
            cluster = null;
        }
        PmemVolumeManager.reset();
    }

    public List<ExtendedBlockId> getExtendedBlockId(Path filePath, long fileLen) throws IOException {
        HdfsBlockLocation[] locs;
        ArrayList<ExtendedBlockId> keys = new ArrayList<ExtendedBlockId>();
        for (HdfsBlockLocation loc : locs = (HdfsBlockLocation[])fs.getFileBlockLocations(filePath, 0L, fileLen)) {
            long bkid = loc.getLocatedBlock().getBlock().getBlockId();
            String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
            keys.add(new ExtendedBlockId(bkid, bpid));
        }
        return keys;
    }

    @Test(timeout=60000L)
    public void testCacheRecovery() throws Exception {
        String expectPath;
        String fileName;
        Path path;
        String cachePath;
        final int cacheBlocksNum = Ints.checkedCast((long)16L);
        BlockReaderTestUtil.enableHdfsCachingTracing();
        Assert.assertEquals((long)0L, (long)0L);
        Path testFile = new Path("/testFile");
        long testFileLen = (long)cacheBlocksNum * 4096L;
        DFSTestUtil.createFile((FileSystem)fs, testFile, testFileLen, (short)1, 48879L);
        List<ExtendedBlockId> blockKeys = this.getExtendedBlockId(testFile, testFileLen);
        fs.addCachePool(new CachePoolInfo("testPool"));
        long cacheDirectiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("testPool").setPath(testFile).setReplication(Short.valueOf((short)1)).build());
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
                long blocksCached = MetricsAsserts.getLongCounter((String)"BlocksCached", (MetricsRecordBuilder)dnMetrics);
                if (blocksCached != (long)cacheBlocksNum) {
                    LOG.info("waiting for " + cacheBlocksNum + " blocks to be cached. Right now " + blocksCached + " blocks are cached.");
                    return false;
                }
                LOG.info(cacheBlocksNum + " blocks are now cached.");
                return true;
            }
        }, (long)1000L, (long)30000L);
        Assert.assertEquals((long)65536L, (long)cacheManager.getCacheUsed());
        Map blockKeyToVolume = PmemVolumeManager.getInstance().getBlockKeyToVolume();
        Assert.assertEquals((long)blockKeyToVolume.size(), (long)cacheBlocksNum);
        Assert.assertTrue((boolean)blockKeyToVolume.keySet().containsAll(blockKeys));
        for (ExtendedBlockId key : blockKeys) {
            if (blockPoolId.isEmpty()) {
                blockPoolId = key.getBlockPoolId();
            }
            cachePath = cacheManager.getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
            Assert.assertNotNull((Object)cachePath);
            path = new Path(cachePath);
            fileName = path.getName();
            if (cachePath.startsWith(PMEM_DIR_0)) {
                expectPath = PmemVolumeManager.getRealPmemDir((String)PMEM_DIR_0) + "/" + key.getBlockPoolId();
                Assert.assertTrue((boolean)path.toString().startsWith(expectPath));
                Assert.assertTrue((key.getBlockId() == Long.parseLong(fileName) ? 1 : 0) != 0);
                continue;
            }
            if (cachePath.startsWith(PMEM_DIR_1)) {
                expectPath = PmemVolumeManager.getRealPmemDir((String)PMEM_DIR_1) + "/" + key.getBlockPoolId();
                Assert.assertTrue((boolean)path.toString().startsWith(expectPath));
                Assert.assertTrue((key.getBlockId() == Long.parseLong(fileName) ? 1 : 0) != 0);
                continue;
            }
            Assert.fail((String)("The cache path is not the expected one: " + cachePath));
        }
        TestPmemCacheRecovery.shutdownCluster();
        TestPmemCacheRecovery.restartCluster();
        Assert.assertEquals((long)65536L, (long)cacheManager.getCacheUsed());
        blockKeyToVolume = PmemVolumeManager.getInstance().getBlockKeyToVolume();
        Assert.assertEquals((long)blockKeyToVolume.size(), (long)cacheBlocksNum);
        Assert.assertTrue((boolean)blockKeyToVolume.keySet().containsAll(blockKeys));
        for (ExtendedBlockId key : blockKeys) {
            cachePath = cacheManager.getReplicaCachePath(key.getBlockPoolId(), key.getBlockId());
            Assert.assertNotNull((Object)cachePath);
            path = new Path(cachePath);
            fileName = path.getName();
            if (cachePath.startsWith(PMEM_DIR_0)) {
                expectPath = PmemVolumeManager.getRealPmemDir((String)PMEM_DIR_0) + "/" + key.getBlockPoolId();
                Assert.assertTrue((boolean)path.toString().startsWith(expectPath));
                Assert.assertTrue((key.getBlockId() == Long.parseLong(fileName) ? 1 : 0) != 0);
                continue;
            }
            if (cachePath.startsWith(PMEM_DIR_1)) {
                expectPath = PmemVolumeManager.getRealPmemDir((String)PMEM_DIR_1) + "/" + key.getBlockPoolId();
                Assert.assertTrue((boolean)path.toString().startsWith(expectPath));
                Assert.assertTrue((key.getBlockId() == Long.parseLong(fileName) ? 1 : 0) != 0);
                continue;
            }
            Assert.fail((String)("The cache path is not the expected one: " + cachePath));
        }
        for (ExtendedBlockId key : blockKeys) {
            cacheManager.uncacheBlock(blockPoolId, key.getBlockId());
        }
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                MetricsRecordBuilder dnMetrics = MetricsAsserts.getMetrics((String)dn.getMetrics().name());
                long blocksUncached = MetricsAsserts.getLongCounter((String)"BlocksUncached", (MetricsRecordBuilder)dnMetrics);
                if (blocksUncached != (long)cacheBlocksNum) {
                    LOG.info("waiting for " + cacheBlocksNum + " blocks to be uncached. Right now " + blocksUncached + " blocks are uncached.");
                    return false;
                }
                LOG.info(cacheBlocksNum + " blocks have been uncached.");
                return true;
            }
        }, (long)1000L, (long)30000L);
        Assert.assertEquals((long)0L, (long)cacheManager.getCacheUsed());
        Assert.assertEquals((long)blockKeyToVolume.size(), (long)0L);
    }

    static {
        cluster = null;
        blockPoolId = "";
        lock = new ReentrantReadWriteLock(true);
        PMEM_DIR_0 = MiniDFSCluster.getBaseDirectory() + "pmem0";
        PMEM_DIR_1 = MiniDFSCluster.getBaseDirectory() + "pmem1";
        GenericTestUtils.setLogLevel((Logger)LoggerFactory.getLogger(FsDatasetCache.class), (Level)Level.DEBUG);
    }
}

