/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.api.BufferWriter;
import org.apache.flink.runtime.io.network.api.ChannelSelector;
import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

public class RecordWriter<T extends IOReadableWritable>
extends BufferWriter {
    private final BufferProvider bufferPool;
    private final ChannelSelector<T> channelSelector;
    private int numChannels;
    private RecordSerializer<T>[] serializers;

    public RecordWriter(AbstractInvokable invokable) {
        this(invokable, new RoundRobinChannelSelector());
    }

    public RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
        super(invokable);
        this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
        this.channelSelector = channelSelector;
    }

    public void initializeSerializers() {
        this.numChannels = this.outputGate.getNumChannels();
        this.serializers = new RecordSerializer[this.numChannels];
        for (int i = 0; i < this.numChannels; ++i) {
            this.serializers[i] = new SpanningRecordSerializer();
        }
    }

    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
            RecordSerializer<T> serializer = this.serializers[targetChannel];
            RecordSerializer.SerializationResult result = serializer.addRecord(record);
            while (result.isFullBuffer()) {
                Buffer buffer = serializer.getCurrentBuffer();
                if (buffer != null) {
                    this.sendBuffer(buffer, targetChannel);
                }
                buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
                result = serializer.setNextBuffer(buffer);
            }
        }
    }

    public void flush() throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer = this.serializers[targetChannel];
            Buffer buffer = serializer.getCurrentBuffer();
            if (buffer != null) {
                this.sendBuffer(buffer, targetChannel);
            }
            serializer.clear();
        }
    }

    @Override
    public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer = this.serializers[targetChannel];
            Buffer buffer = serializer.getCurrentBuffer();
            if (buffer == null) {
                super.sendEvent(event, targetChannel);
                continue;
            }
            super.sendBufferAndEvent(buffer, event, targetChannel);
            buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
            serializer.setNextBuffer(buffer);
        }
    }

    @Override
    public void sendEndOfSuperstep() throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer = this.serializers[targetChannel];
            Buffer buffer = serializer.getCurrentBuffer();
            if (buffer == null) {
                super.sendEvent(EndOfSuperstepEvent.INSTANCE, targetChannel);
                continue;
            }
            super.sendBufferAndEvent(buffer, EndOfSuperstepEvent.INSTANCE, targetChannel);
            buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
            serializer.setNextBuffer(buffer);
        }
    }
}

