/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.RescaleMappings;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer;
import org.apache.flink.runtime.checkpoint.channel.RecoveredChannelStateHandler;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;

class ResultSubpartitionRecoveredStateHandler
implements RecoveredChannelStateHandler<ResultSubpartitionInfo, BufferBuilder> {
    private final ResultPartitionWriter[] writers;
    private final boolean notifyAndBlockOnCompletion;
    private final InflightDataRescalingDescriptor channelMapping;
    private final Map<ResultSubpartitionInfo, List<CheckpointedResultSubpartition>> rescaledChannels = new HashMap<ResultSubpartitionInfo, List<CheckpointedResultSubpartition>>();
    private final Map<Integer, RescaleMappings> oldToNewMappings = new HashMap<Integer, RescaleMappings>();

    ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor channelMapping) {
        this.writers = writers;
        this.channelMapping = channelMapping;
        this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
    }

    @Override
    public RecoveredChannelStateHandler.BufferWithContext<BufferBuilder> getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, InterruptedException {
        List<CheckpointedResultSubpartition> channels = this.getMappedChannels(subpartitionInfo);
        BufferBuilder bufferBuilder = channels.get(0).requestBufferBuilderBlocking();
        return new RecoveredChannelStateHandler.BufferWithContext<BufferBuilder>(ChannelStateByteBuffer.wrap(bufferBuilder), bufferBuilder);
    }

    @Override
    public void recover(ResultSubpartitionInfo subpartitionInfo, int oldSubtaskIndex, RecoveredChannelStateHandler.BufferWithContext<BufferBuilder> bufferWithContext) throws IOException {
        try (BufferBuilder bufferBuilder = (BufferBuilder)bufferWithContext.context;
             BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumerFromBeginning();){
            bufferBuilder.finish();
            if (bufferConsumer.isDataAvailable()) {
                List<CheckpointedResultSubpartition> channels = this.getMappedChannels(subpartitionInfo);
                for (CheckpointedResultSubpartition channel : channels) {
                    SubtaskConnectionDescriptor channelSelector = new SubtaskConnectionDescriptor(subpartitionInfo.getSubPartitionIdx(), oldSubtaskIndex);
                    channel.addRecovered(EventSerializer.toBufferConsumer(channelSelector, false));
                    channel.addRecovered(bufferConsumer.copy());
                }
            }
        }
    }

    private CheckpointedResultSubpartition getSubpartition(int partitionIndex, int subPartitionIdx) {
        ResultPartitionWriter writer = this.writers[partitionIndex];
        if (!(writer instanceof CheckpointedResultPartition)) {
            throw new IllegalStateException("Cannot restore state to a non-checkpointable partition type: " + writer);
        }
        return ((CheckpointedResultPartition)((Object)writer)).getCheckpointedSubpartition(subPartitionIdx);
    }

    private List<CheckpointedResultSubpartition> getMappedChannels(ResultSubpartitionInfo subpartitionInfo) {
        return this.rescaledChannels.computeIfAbsent(subpartitionInfo, this::calculateMapping);
    }

    private List<CheckpointedResultSubpartition> calculateMapping(ResultSubpartitionInfo info) {
        RescaleMappings oldToNewMapping = this.oldToNewMappings.computeIfAbsent(info.getPartitionIdx(), idx -> this.channelMapping.getChannelMapping((int)idx).invert());
        List<CheckpointedResultSubpartition> subpartitions = Arrays.stream(oldToNewMapping.getMappedIndexes(info.getSubPartitionIdx())).mapToObj(newIndexes -> this.getSubpartition(info.getPartitionIdx(), newIndexes)).collect(Collectors.toList());
        if (subpartitions.isEmpty()) {
            throw new IllegalStateException("Recovered a buffer from old " + info + " that has no mapping in " + this.channelMapping.getChannelMapping(info.getPartitionIdx()));
        }
        return subpartitions;
    }

    @Override
    public void close() throws IOException {
        for (ResultPartitionWriter writer : this.writers) {
            if (!(writer instanceof CheckpointedResultPartition)) continue;
            ((CheckpointedResultPartition)((Object)writer)).finishReadRecoveredState(this.notifyAndBlockOnCompletion);
        }
    }
}

