/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestBalancerWithNodeGroup {
    private static final Logger LOG = LoggerFactory.getLogger((String)"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
    private static final long CAPACITY = 5000L;
    private static final String RACK0 = "/rack0";
    private static final String RACK1 = "/rack1";
    private static final String NODEGROUP0 = "/nodegroup0";
    private static final String NODEGROUP1 = "/nodegroup1";
    private static final String NODEGROUP2 = "/nodegroup2";
    private static final String fileName = "/tmp.txt";
    private static final Path filePath = new Path("/tmp.txt");
    MiniDFSClusterWithNodeGroup cluster;
    ClientProtocol client;
    static final long TIMEOUT = 40000L;
    static final double CAPACITY_ALLOWED_VARIANCE = 0.005;
    static final double BALANCE_ALLOWED_VARIANCE = 0.11;
    static final int DEFAULT_BLOCK_SIZE = 100;

    static Configuration createConf() {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        conf.setLong("dfs.blocksize", 100L);
        conf.setBoolean("dfs.use.dfs.network.topology", false);
        conf.set("net.topology.impl", NetworkTopologyWithNodeGroup.class.getName());
        conf.set("dfs.block.replicator.classname", BlockPlacementPolicyWithNodeGroup.class.getName());
        return conf;
    }

    private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace) throws IOException, TimeoutException {
        long timeout = 40000L;
        long failtime = timeout <= 0L ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
        while (true) {
            long[] status = this.client.getStats();
            double totalSpaceVariance = Math.abs((double)status[0] - (double)expectedTotalSpace) / (double)expectedTotalSpace;
            double usedSpaceVariance = Math.abs((double)status[1] - (double)expectedUsedSpace) / (double)expectedUsedSpace;
            if (totalSpaceVariance < 0.005 && usedSpaceVariance < 0.005) break;
            if (System.currentTimeMillis() > failtime) {
                throw new TimeoutException("Cluster failed to reached expected values of totalSpace (current: " + status[0] + ", expected: " + expectedTotalSpace + "), or usedSpace (current: " + status[1] + ", expected: " + expectedUsedSpace + "), in more than " + timeout + " msec.");
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private void waitForBalancer(long totalUsedSpace, long totalCapacity) throws IOException, TimeoutException {
        boolean balanced;
        long timeout = 40000L;
        long failtime = timeout <= 0L ? Long.MAX_VALUE : System.currentTimeMillis() + timeout;
        double avgUtilization = (double)totalUsedSpace / (double)totalCapacity;
        block2: do {
            DatanodeInfo[] datanodeReport = this.client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
            Assert.assertEquals((long)datanodeReport.length, (long)this.cluster.getDataNodes().size());
            balanced = true;
            for (DatanodeInfo datanode : datanodeReport) {
                double nodeUtilization = (double)datanode.getDfsUsed() / (double)datanode.getCapacity();
                if (!(Math.abs(avgUtilization - nodeUtilization) > 0.11)) continue;
                balanced = false;
                if (System.currentTimeMillis() > failtime) {
                    throw new TimeoutException("Rebalancing expected avg utilization to become " + avgUtilization + ", but on datanode " + datanode + " it remains at " + nodeUtilization + " after more than " + 40000L + " msec.");
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
                continue block2;
            }
        } while (!balanced);
    }

    private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception {
        this.waitForHeartBeat(totalUsedSpace, totalCapacity);
        Collection namenodes = DFSUtil.getInternalNsRpcUris((Configuration)conf);
        int r = Balancer.run((Collection)namenodes, (BalancerParameters)BalancerParameters.DEFAULT, (Configuration)conf);
        Assert.assertEquals((long)ExitStatus.SUCCESS.getExitCode(), (long)r);
        this.waitForHeartBeat(totalUsedSpace, totalCapacity);
        LOG.info("Rebalancing with default factor.");
        this.waitForBalancer(totalUsedSpace, totalCapacity);
    }

    private void runBalancerCanFinish(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception {
        this.waitForHeartBeat(totalUsedSpace, totalCapacity);
        Collection namenodes = DFSUtil.getInternalNsRpcUris((Configuration)conf);
        int r = Balancer.run((Collection)namenodes, (BalancerParameters)BalancerParameters.DEFAULT, (Configuration)conf);
        Assert.assertTrue((r == ExitStatus.SUCCESS.getExitCode() || r == ExitStatus.NO_MOVE_PROGRESS.getExitCode() ? 1 : 0) != 0);
        this.waitForHeartBeat(totalUsedSpace, totalCapacity);
        LOG.info("Rebalancing with default factor.");
    }

    private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
        HashSet<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
        block0: for (LocatedBlock blk : blks) {
            for (DatanodeInfo di : blk.getLocations()) {
                if (!rack.equals(NetworkTopology.getFirstHalf((String)di.getNetworkLocation()))) continue;
                ret.add(blk.getBlock());
                continue block0;
            }
        }
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testBalancerWithRackLocality() throws Exception {
        Configuration conf = TestBalancerWithNodeGroup.createConf();
        long[] capacities = new long[]{5000L, 5000L};
        String[] racks = new String[]{RACK0, RACK1};
        String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
        int numOfDatanodes = capacities.length;
        Assert.assertEquals((long)numOfDatanodes, (long)racks.length);
        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length).racks(racks).simulatedCapacities(capacities);
        MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
        this.cluster = new MiniDFSClusterWithNodeGroup(builder);
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            long totalCapacity = TestBalancer.sum(capacities);
            long totalUsedSpace = totalCapacity * 3L / 10L;
            long length = totalUsedSpace / (long)numOfDatanodes;
            TestBalancer.createFile(this.cluster, filePath, length, (short)numOfDatanodes, 0);
            LocatedBlocks lbs = this.client.getBlockLocations(filePath.toUri().getPath(), 0L, length);
            Set<ExtendedBlock> before = this.getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
            long newCapacity = 5000L;
            String newRack = RACK1;
            String newNodeGroup = NODEGROUP2;
            this.cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, new long[]{newCapacity}, new String[]{newNodeGroup});
            this.runBalancerCanFinish(conf, totalUsedSpace, totalCapacity += newCapacity);
            lbs = this.client.getBlockLocations(filePath.toUri().getPath(), 0L, length);
            Set<ExtendedBlock> after = this.getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
            Assert.assertEquals(before, after);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testBalancerWithNodeGroup() throws Exception {
        Configuration conf = TestBalancerWithNodeGroup.createConf();
        long[] capacities = new long[]{5000L, 5000L, 5000L, 5000L};
        String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
        String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
        int numOfDatanodes = capacities.length;
        Assert.assertEquals((long)numOfDatanodes, (long)racks.length);
        Assert.assertEquals((long)numOfDatanodes, (long)nodeGroups.length);
        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length).racks(racks).simulatedCapacities(capacities);
        MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
        this.cluster = new MiniDFSClusterWithNodeGroup(builder);
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            long totalCapacity = TestBalancer.sum(capacities);
            long totalUsedSpace = totalCapacity * 2L / 10L;
            TestBalancer.createFile(this.cluster, filePath, totalUsedSpace / (long)(numOfDatanodes / 2), (short)(numOfDatanodes / 2), 0);
            long newCapacity = 5000L;
            String newRack = RACK1;
            String newNodeGroup = NODEGROUP2;
            this.cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, new long[]{newCapacity}, new String[]{newNodeGroup});
            this.runBalancer(conf, totalUsedSpace, totalCapacity += newCapacity);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testBalancerEndInNoMoveProgress() throws Exception {
        Configuration conf = TestBalancerWithNodeGroup.createConf();
        long[] capacities = new long[]{5000L, 5000L, 5000L, 5000L};
        String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
        String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
        int numOfDatanodes = capacities.length;
        Assert.assertEquals((long)numOfDatanodes, (long)racks.length);
        Assert.assertEquals((long)numOfDatanodes, (long)nodeGroups.length);
        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length).racks(racks).simulatedCapacities(capacities);
        MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
        this.cluster = new MiniDFSClusterWithNodeGroup(builder);
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            long totalCapacity = TestBalancer.sum(capacities);
            long totalUsedSpace = totalCapacity * 6L / 10L;
            TestBalancer.createFile(this.cluster, filePath, totalUsedSpace / 3L, (short)3, 0);
            this.runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    static {
        TestBalancer.initTestSetup();
    }
}

