/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.benchmark;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.InputGateWithMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.streaming.runtime.io.benchmark.SerializingLongReceiver;
import org.apache.flink.streaming.runtime.io.benchmark.SingleInputGateBenchmarkFactory;
import org.apache.flink.util.ExceptionUtils;

public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
    private static final InetAddress LOCAL_ADDRESS;
    private final ResourceID location = ResourceID.generate();
    protected final JobID jobId = new JobID();
    protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
    protected NettyShuffleEnvironment senderEnv;
    protected NettyShuffleEnvironment receiverEnv;
    protected int channels;
    protected boolean localMode = false;
    protected ResultPartitionID[] partitionIds;
    private int dataPort;
    private SingleInputGateFactory gateFactory;

    public void setUp(int writers, int channels, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize) throws Exception {
        this.setUp(writers, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, new Configuration());
    }

    public void setUp(int writers, int channels, boolean localMode, int senderBufferPoolSize, int receiverBufferPoolSize, Configuration config) throws Exception {
        this.localMode = localMode;
        this.channels = channels;
        this.partitionIds = new ResultPartitionID[writers];
        if (senderBufferPoolSize == -1) {
            senderBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        if (receiverBufferPoolSize == -1) {
            receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
        }
        this.senderEnv = this.createShuffleEnvironment(senderBufferPoolSize, config);
        this.dataPort = this.senderEnv.start();
        if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
            this.receiverEnv = this.senderEnv;
        } else {
            this.receiverEnv = this.createShuffleEnvironment(receiverBufferPoolSize, config);
            this.receiverEnv.start();
        }
        this.gateFactory = new SingleInputGateBenchmarkFactory(this.location, this.receiverEnv.getConfiguration(), this.receiverEnv.getConnectionManager(), this.receiverEnv.getResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), this.receiverEnv.getNetworkBufferPool());
        this.generatePartitionIds();
    }

    public void tearDown() {
        ExceptionUtils.suppressExceptions(() -> ((NettyShuffleEnvironment)this.senderEnv).close());
        ExceptionUtils.suppressExceptions(() -> ((NettyShuffleEnvironment)this.receiverEnv).close());
    }

    public SerializingLongReceiver createReceiver() throws Exception {
        TaskManagerLocation senderLocation = new TaskManagerLocation(ResourceID.generate(), LOCAL_ADDRESS, this.dataPort);
        InputGate receiverGate = this.createInputGate(senderLocation);
        SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, this.channels * this.partitionIds.length);
        receiver.start();
        return receiver;
    }

    public ResultPartitionWriter createResultPartitionWriter(int partitionIndex) throws Exception {
        ResultPartition resultPartitionWriter = new ResultPartitionBuilder().setResultPartitionId(this.partitionIds[partitionIndex]).setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).setNumberOfSubpartitions(this.channels).setResultPartitionManager(this.senderEnv.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(this.senderEnv).build();
        resultPartitionWriter.setup();
        return resultPartitionWriter;
    }

    private void generatePartitionIds() throws Exception {
        for (int writer = 0; writer < this.partitionIds.length; ++writer) {
            this.partitionIds[writer] = new ResultPartitionID();
        }
    }

    private NettyShuffleEnvironment createShuffleEnvironment(int bufferPoolSize, Configuration config) throws Exception {
        NettyConfig nettyConfig = new NettyConfig(LOCAL_ADDRESS, 0, ConfigurationParserUtils.getPageSize((Configuration)config), ConfigurationParserUtils.getSlot((Configuration)config), config);
        return new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(bufferPoolSize).setNettyConfig(nettyConfig).build();
    }

    private InputGate createInputGate(TaskManagerLocation senderLocation) throws Exception {
        IndexedInputGate[] gates = new IndexedInputGate[this.partitionIds.length];
        for (int gateIndex = 0; gateIndex < gates.length; ++gateIndex) {
            InputGateDeploymentDescriptor gateDescriptor = this.createInputGateDeploymentDescriptor(senderLocation, gateIndex, this.location);
            IndexedInputGate gate = this.createInputGateWithMetrics(this.gateFactory, gateDescriptor, gateIndex);
            gate.setup();
            gates[gateIndex] = gate;
        }
        if (gates.length > 1) {
            return new UnionInputGate(gates);
        }
        return gates[0];
    }

    private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor(TaskManagerLocation senderLocation, int gateIndex, ResourceID localLocation) throws IOException {
        ShuffleDescriptor[] channelDescriptors = new ShuffleDescriptor[this.channels];
        for (int channelIndex = 0; channelIndex < this.channels; ++channelIndex) {
            channelDescriptors[channelIndex] = StreamNetworkBenchmarkEnvironment.createShuffleDescriptor(this.localMode, this.partitionIds[gateIndex], localLocation, senderLocation, channelIndex);
        }
        return new InputGateDeploymentDescriptor(this.dataSetID, ResultPartitionType.PIPELINED_BOUNDED, 0, channelDescriptors);
    }

    private IndexedInputGate createInputGateWithMetrics(SingleInputGateFactory gateFactory, InputGateDeploymentDescriptor gateDescriptor, int gateIndex) {
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        SingleInputGate singleGate = gateFactory.create(this.receiverEnv.createShuffleIOOwnerContext("receiving task[" + gateIndex + "]", taskMetricGroup.executionId(), (MetricGroup)taskMetricGroup), gateIndex, gateDescriptor, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics());
        return new InputGateWithMetrics((IndexedInputGate)singleGate, (Counter)new SimpleCounter());
    }

    private static ShuffleDescriptor createShuffleDescriptor(boolean localMode, ResultPartitionID resultPartitionID, ResourceID location, TaskManagerLocation senderLocation, int connectionIndex) {
        NettyShuffleDescriptorBuilder builder = NettyShuffleDescriptorBuilder.newBuilder().setId(resultPartitionID).setProducerInfoFromTaskManagerLocation(senderLocation).setConnectionIndex(connectionIndex);
        return localMode ? builder.setProducerLocation(location).buildLocal() : builder.buildRemote();
    }

    static {
        try {
            LOCAL_ADDRESS = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            throw new Error(e);
        }
    }
}

