/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.nio.BufferObjectDataInput;
import com.hazelcast.internal.nio.BufferObjectDataOutput;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.serialization.MemoryReader;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Networking {
    private static final int PACKET_HEADER_SIZE = 16;
    private static final int FLOW_PACKET_INITIAL_SIZE = 128;
    private static final byte[] EMPTY_BYTES = new byte[0];
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final JobExecutionService jobExecutionService;
    private final ScheduledFuture<?> flowControlSender;
    private final MemoryReader memoryReader;
    private int lastFlowPacketSize;

    Networking(NodeEngine nodeEngine, JobExecutionService jobExecutionService, int flowControlPeriodMs) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobExecutionService = jobExecutionService;
        this.flowControlSender = nodeEngine.getExecutionService().scheduleWithRepetition(this::broadcastFlowControlPacket, 0L, flowControlPeriodMs, TimeUnit.MILLISECONDS);
        this.memoryReader = MemoryReader.create(((InternalSerializationService)nodeEngine.getSerializationService()).getByteOrder());
        this.lastFlowPacketSize = 128;
    }

    void shutdown() {
        this.flowControlSender.cancel(false);
    }

    void handle(Packet packet) throws IOException {
        if (packet.isFlagRaised(2)) {
            this.handleFlowControlPacket(packet.getConn().getEndPoint(), packet.toByteArray());
        } else {
            this.handleStreamPacket(packet);
        }
    }

    private void handleStreamPacket(Packet packet) {
        byte[] payload = packet.toByteArray();
        int offset = 0;
        long executionId = this.memoryReader.readLong(payload, offset);
        int vertexId = this.memoryReader.readInt(payload, offset += 8);
        int ordinal = this.memoryReader.readInt(payload, offset += 4);
        ExecutionContext executionContext = this.jobExecutionService.getExecutionContext(executionId);
        executionContext.handlePacket(vertexId, ordinal, packet.getConn().getEndPoint(), payload, offset += 4);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static byte[] createStreamPacketHeader(NodeEngine nodeEngine, long executionId, int destinationVertexId, int ordinal) {
        try (BufferObjectDataOutput output = ImdgUtil.createObjectDataOutput(nodeEngine, 16);){
            output.writeLong(executionId);
            output.writeInt(destinationVertexId);
            output.writeInt(ordinal);
            byte[] byArray = output.toByteArray();
            return byArray;
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private void broadcastFlowControlPacket() {
        try {
            ImdgUtil.getRemoteMembers(this.nodeEngine).forEach(member -> Util.uncheckRun(() -> {
                byte[] packetBuf = this.createFlowControlPacket((Address)member);
                if (packetBuf.length == 0) {
                    return;
                }
                Connection conn = ImdgUtil.getMemberConnection(this.nodeEngine, member);
                if (conn != null) {
                    conn.write(new Packet(packetBuf).setPacketType(Packet.Type.JET).raiseFlags(18));
                }
            }));
        }
        catch (Throwable t) {
            this.logger.severe("Flow-control packet broadcast failed", t);
        }
    }

    private byte[] createFlowControlPacket(Address member) throws IOException {
        try (BufferObjectDataOutput output = ImdgUtil.createObjectDataOutput(this.nodeEngine, this.lastFlowPacketSize);){
            boolean[] hasData = new boolean[]{false};
            Map<Long, ExecutionContext> executionContexts = this.jobExecutionService.getExecutionContextsFor(member);
            output.writeInt(executionContexts.size());
            executionContexts.forEach((execId, exeCtx) -> Util.uncheckRun(() -> {
                output.writeLong((long)execId);
                output.writeInt(exeCtx.receiverMap().values().stream().mapToInt(Map::size).sum());
                exeCtx.receiverMap().forEach((vertexId, ordinalToSenderToTasklet) -> ordinalToSenderToTasklet.forEach((ordinal, senderToTasklet) -> Util.uncheckRun(() -> {
                    output.writeInt((int)vertexId);
                    output.writeInt((int)ordinal);
                    output.writeInt(((ReceiverTasklet)senderToTasklet.get(member)).updateAndGetSendSeqLimitCompressed());
                    hasData[0] = true;
                })));
            }));
            if (hasData[0]) {
                byte[] payload = output.toByteArray();
                this.lastFlowPacketSize = payload.length;
                byte[] byArray = payload;
                return byArray;
            }
            byte[] byArray = EMPTY_BYTES;
            return byArray;
        }
    }

    private void handleFlowControlPacket(Address fromAddr, byte[] packet) throws IOException {
        try (BufferObjectDataInput input = ImdgUtil.createObjectDataInput(this.nodeEngine, packet);){
            int executionCtxCount = input.readInt();
            for (int j = 0; j < executionCtxCount; ++j) {
                long executionId = input.readLong();
                Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = this.jobExecutionService.getSenderMap(executionId);
                if (senderMap == null) {
                    this.logMissingExeCtx(executionId);
                    continue;
                }
                int flowCtlMsgCount = input.readInt();
                for (int k = 0; k < flowCtlMsgCount; ++k) {
                    int destVertexId = input.readInt();
                    int destOrdinal = input.readInt();
                    int sendSeqLimitCompressed = input.readInt();
                    SenderTasklet t = Optional.ofNullable(senderMap.get(destVertexId)).map(ordinalMap -> (Map)ordinalMap.get(destOrdinal)).map(addrMap -> (SenderTasklet)addrMap.get(fromAddr)).orElse(null);
                    if (t == null) {
                        this.logMissingSenderTasklet(destVertexId, destOrdinal);
                        return;
                    }
                    t.setSendSeqLimitCompressed(sendSeqLimitCompressed);
                }
            }
        }
    }

    private void logMissingExeCtx(long executionId) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Ignoring flow control message applying to non-existent execution context " + com.hazelcast.jet.Util.idToString(executionId));
        }
    }

    private void logMissingSenderTasklet(int destVertexId, int destOrdinal) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("Ignoring flow control message applying to non-existent sender tasklet (%d, %d)", destVertexId, destOrdinal));
        }
    }
}

