/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.util.Preconditions;

@Internal
class BucketStateSerializer<BucketID>
implements SimpleVersionedSerializer<BucketState<BucketID>> {
    private static final int MAGIC_NUMBER = 511069049;
    private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
    private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
    private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;

    BucketStateSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer, SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer, SimpleVersionedSerializer<BucketID> bucketIdSerializer) {
        this.resumableSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(resumableSerializer);
        this.commitableSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(commitableSerializer);
        this.bucketIdSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(bucketIdSerializer);
    }

    public int getVersion() {
        return 1;
    }

    public byte[] serialize(BucketState<BucketID> state) throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(511069049);
        this.serializeV1(state, (DataOutputView)out);
        return out.getCopyOfBuffer();
    }

    public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
        switch (version) {
            case 1: {
                DataInputDeserializer in = new DataInputDeserializer(serialized);
                BucketStateSerializer.validateMagicNumber((DataInputView)in);
                return this.deserializeV1((DataInputView)in);
            }
        }
        throw new IOException("Unrecognized version or corrupt state: " + version);
    }

    @VisibleForTesting
    void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
        SimpleVersionedSerialization.writeVersionAndSerialize(this.bucketIdSerializer, state.getBucketId(), (DataOutputView)out);
        out.writeUTF(state.getBucketPath().toString());
        out.writeLong(state.getInProgressFileCreationTime());
        if (state.hasInProgressResumableFile()) {
            RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
            out.writeBoolean(true);
            SimpleVersionedSerialization.writeVersionAndSerialize(this.resumableSerializer, (Object)resumable, (DataOutputView)out);
        } else {
            out.writeBoolean(false);
        }
        Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
        out.writeInt(this.commitableSerializer.getVersion());
        out.writeInt(pendingCommitters.size());
        for (Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
            List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
            out.writeLong(resumablesForCheckpoint.getKey().longValue());
            out.writeInt(resumables.size());
            for (RecoverableWriter.CommitRecoverable resumable : resumables) {
                byte[] serialized = this.commitableSerializer.serialize((Object)resumable);
                out.writeInt(serialized.length);
                out.write(serialized);
            }
        }
    }

    @VisibleForTesting
    BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
        Object bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(this.bucketIdSerializer, (DataInputView)in);
        String bucketPathStr = in.readUTF();
        long creationTime = in.readLong();
        RecoverableWriter.ResumeRecoverable current = null;
        if (in.readBoolean()) {
            current = (RecoverableWriter.ResumeRecoverable)SimpleVersionedSerialization.readVersionAndDeSerialize(this.resumableSerializer, (DataInputView)in);
        }
        int committableVersion = in.readInt();
        int numCheckpoints = in.readInt();
        HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<Long, List<RecoverableWriter.CommitRecoverable>>(numCheckpoints);
        for (int i = 0; i < numCheckpoints; ++i) {
            long checkpointId = in.readLong();
            int noOfResumables = in.readInt();
            ArrayList<Object> resumables = new ArrayList<Object>(noOfResumables);
            for (int j = 0; j < noOfResumables; ++j) {
                byte[] bytes = new byte[in.readInt()];
                in.readFully(bytes);
                resumables.add(this.commitableSerializer.deserialize(committableVersion, bytes));
            }
            resumablesPerCheckpoint.put(checkpointId, resumables);
        }
        return new BucketState<Object>(bucketId, new Path(bucketPathStr), creationTime, current, resumablesPerCheckpoint);
    }

    private static void validateMagicNumber(DataInputView in) throws IOException {
        int magicNumber = in.readInt();
        if (magicNumber != 511069049) {
            throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
        }
    }
}

