/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class NettyShuffleEnvironmentTest
extends TestLogger {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testRegisterTaskWithLimitedBuffers() throws Exception {
        int bufferCount = 18 + 10 * (Integer)NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
        this.testRegisterTaskWithLimitedBuffers(bufferCount);
    }

    @Test
    public void testRegisterTaskWithInsufficientBuffers() throws Exception {
        int bufferCount = 10 + 10 * (Integer)NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Insufficient number of network buffers");
        this.testRegisterTaskWithLimitedBuffers(bufferCount);
    }

    @Test
    public void testSlowIODoesNotBlockRelease() throws Exception {
        final BlockerSync sync = new BlockerSync();
        ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager(){

            public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
                sync.blockNonInterruptible();
                super.releasePartition(partitionId, cause);
            }
        };
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(blockingResultPartitionManager).setIoExecutor(Executors.newFixedThreadPool(1)).build();
        shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
        sync.awaitBlocker();
        sync.releaseBlocker();
    }

    @Test
    public void testRegisteringDebloatingMetrics() throws IOException {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = NettyShuffleEnvironmentTest.createTaskMetricGroup(metrics);
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)true);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().setDebloatConfig(BufferDebloatConfiguration.fromConfiguration((ReadableConfig)config)).build();
        shuffleEnvironment.createInputGates(shuffleEnvironment.createShuffleIOOwnerContext("test", ExecutionGraphTestUtils.createExecutionAttemptId(), (MetricGroup)taskMetricGroup), (dsid, id, consumer) -> {}, Arrays.asList(new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{new NettyShuffleDescriptorBuilder().buildRemote()}), new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 1, new ShuffleDescriptor[]{new NettyShuffleDescriptorBuilder().buildRemote()})));
        for (int i = 0; i < 2; ++i) {
            Assert.assertEquals((long)((MemorySize)TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(), (long)((Integer)((Gauge)this.getDebloatingMetric(metrics, i, "debloatedBufferSize")).getValue()).intValue());
            Assert.assertEquals((long)0L, (long)((Long)((Gauge)this.getDebloatingMetric(metrics, i, "estimatedTimeToConsumeBuffersMs")).getValue()));
        }
    }

    private Metric getDebloatingMetric(Map<String, Metric> metrics, int i, String metricName) {
        String inputScope = "taskmanager.job.task.Shuffle.Netty.Input";
        return metrics.get("taskmanager.job.task.Shuffle.Netty.Input." + i + "." + metricName);
    }

    private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(bufferPoolSize).build();
        ConnectionManager connManager = InputChannelTestUtils.createDummyConnectionManager();
        int channels = 2;
        int rp4Channels = 4;
        int floatingBuffers = network.getConfiguration().floatingNetworkBuffersPerGate();
        int exclusiveBuffers = network.getConfiguration().networkBuffersPerChannel();
        int expectedBuffers = channels * exclusiveBuffers + floatingBuffers;
        int expectedRp4Buffers = rp4Channels * exclusiveBuffers + floatingBuffers;
        ResultPartition rp1 = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, channels);
        ResultPartition rp2 = PartitionTestUtils.createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, channels);
        ResultPartition rp3 = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
        ResultPartition rp4 = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
        ResultPartition[] resultPartitions = new ResultPartition[]{rp1, rp2, rp3, rp4};
        SingleInputGate ig1 = this.createSingleInputGate(network, ResultPartitionType.PIPELINED, channels);
        SingleInputGate ig2 = this.createSingleInputGate(network, ResultPartitionType.BLOCKING, channels);
        SingleInputGate ig3 = this.createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, channels);
        SingleInputGate ig4 = this.createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, rp4Channels);
        InputChannel[] ic1 = new InputChannel[channels];
        InputChannel[] ic2 = new InputChannel[channels];
        InputChannel[] ic3 = new InputChannel[channels];
        InputChannel[] ic4 = new InputChannel[rp4Channels];
        SingleInputGate[] inputGates = new SingleInputGate[]{ig1, ig2, ig3, ig4};
        ic4[0] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig4, 0, rp1, connManager);
        ic4[1] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig4, 0, rp2, connManager);
        ic4[2] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig4, 0, rp3, connManager);
        ic4[3] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig4, 0, rp4, connManager);
        ig4.setInputChannels(ic4);
        ic1[0] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig1, 1, rp1, connManager);
        ic1[1] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig1, 1, rp4, connManager);
        ig1.setInputChannels(ic1);
        ic2[0] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig2, 1, rp2, connManager);
        ic2[1] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig2, 2, rp4, connManager);
        ig2.setInputChannels(ic2);
        ic3[0] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig3, 1, rp3, connManager);
        ic3[1] = NettyShuffleEnvironmentTest.createRemoteInputChannel(ig3, 3, rp4, connManager);
        ig3.setInputChannels(ic3);
        Task.setupPartitionsAndGates((ResultPartitionWriter[])resultPartitions, (InputGate[])inputGates);
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)Integer.MAX_VALUE, (long)rp2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)expectedBuffers, (long)rp3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)expectedRp4Buffers, (long)rp4.getBufferPool().getMaxNumberOfMemorySegments());
        for (ResultPartition resultPartition : resultPartitions) {
            Assert.assertEquals((long)(resultPartition.getNumberOfSubpartitions() + 1), (long)resultPartition.getBufferPool().getNumberOfRequiredMemorySegments());
            Assert.assertEquals((long)(resultPartition.getNumberOfSubpartitions() + 1), (long)resultPartition.getBufferPool().getNumBuffers());
        }
        Assert.assertEquals((long)1L, (long)ig1.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)1L, (long)ig2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)1L, (long)ig3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)1L, (long)ig4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals((long)floatingBuffers, (long)ig1.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)floatingBuffers, (long)ig2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)floatingBuffers, (long)ig3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals((long)floatingBuffers, (long)ig4.getBufferPool().getMaxNumberOfMemorySegments());
        ((SingleInputGate)Mockito.verify((Object)ig1, (VerificationMode)Mockito.times((int)1))).setupChannels();
        ((SingleInputGate)Mockito.verify((Object)ig2, (VerificationMode)Mockito.times((int)1))).setupChannels();
        ((SingleInputGate)Mockito.verify((Object)ig3, (VerificationMode)Mockito.times((int)1))).setupChannels();
        ((SingleInputGate)Mockito.verify((Object)ig4, (VerificationMode)Mockito.times((int)1))).setupChannels();
        for (ResultPartition resultPartition : resultPartitions) {
            resultPartition.release();
        }
        for (ResultPartition resultPartition : inputGates) {
            resultPartition.close();
        }
        network.close();
    }

    private SingleInputGate createSingleInputGate(NettyShuffleEnvironment network, ResultPartitionType partitionType, int numberOfChannels) {
        return (SingleInputGate)PowerMockito.spy((Object)new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).setResultPartitionType(partitionType).setupBufferPoolFactory(network).build());
    }

    private static RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartition resultPartition, ConnectionManager connManager) {
        return InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setPartitionId(resultPartition.getPartitionId()).setConnectionManager(connManager).buildRemoteChannel(inputGate);
    }

    private static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics) {
        return TaskManagerMetricGroup.createTaskManagerMetricGroup((MetricRegistry)new TestMetricRegistry(metrics), (String)"localhost", (ResourceID)ResourceID.generate()).addJob(new JobID(), "jobName").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "test");
    }

    private static class TestMetricRegistry
    extends NoOpMetricRegistry {
        private final Map<String, Metric> metrics;

        TestMetricRegistry(Map<String, Metric> metrics) {
            this.metrics = metrics;
        }

        public void register(Metric metric, String metricName, AbstractMetricGroup group) {
            this.metrics.put(group.getLogicalScope(CharacterFilter.NO_OP_FILTER) + "." + metricName, metric);
        }
    }
}

