/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.connector.sink2;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

@Internal
public class CommittableMessageSerializer<CommT>
implements SimpleVersionedSerializer<CommittableMessage<CommT>> {
    @VisibleForTesting
    static final int VERSION = 1;
    private static final int COMMITTABLE = 1;
    private static final int SUMMARY = 2;
    private static final long EOI = Long.MAX_VALUE;
    private final SimpleVersionedSerializer<CommT> committableSerializer;

    public CommittableMessageSerializer(SimpleVersionedSerializer<CommT> committableSerializer) {
        this.committableSerializer = (SimpleVersionedSerializer)Preconditions.checkNotNull(committableSerializer);
    }

    public int getVersion() {
        return 1;
    }

    public byte[] serialize(CommittableMessage<CommT> obj) throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        if (obj instanceof CommittableWithLineage) {
            out.writeByte(1);
            SimpleVersionedSerialization.writeVersionAndSerialize(this.committableSerializer, ((CommittableWithLineage)obj).getCommittable(), (DataOutputView)out);
            this.writeCheckpointId(out, obj);
            out.writeInt(obj.getSubtaskId());
        } else if (obj instanceof CommittableSummary) {
            out.writeByte(2);
            out.writeInt(obj.getSubtaskId());
            CommittableSummary committableSummary = (CommittableSummary)obj;
            out.writeInt(committableSummary.getNumberOfSubtasks());
            this.writeCheckpointId(out, obj);
            out.writeInt(committableSummary.getNumberOfCommittables());
            out.writeInt(committableSummary.getNumberOfPendingCommittables());
            out.writeInt(committableSummary.getNumberOfFailedCommittables());
        } else {
            throw new IllegalArgumentException("Unknown message: " + obj.getClass());
        }
        return out.getCopyOfBuffer();
    }

    public CommittableMessage<CommT> deserialize(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        byte messageType = in.readByte();
        switch (messageType) {
            case 1: {
                return new CommittableWithLineage<Object>(SimpleVersionedSerialization.readVersionAndDeSerialize(this.committableSerializer, (DataInputView)in), this.readCheckpointId(in), in.readInt());
            }
            case 2: {
                return new CommittableSummary(in.readInt(), in.readInt(), this.readCheckpointId(in), in.readInt(), in.readInt(), in.readInt());
            }
        }
        throw new IllegalStateException("Unexpected message type " + messageType + " in " + StringUtils.byteToHexString((byte[])serialized));
    }

    private void writeCheckpointId(DataOutputSerializer out, CommittableMessage<CommT> obj) throws IOException {
        out.writeLong(obj.getCheckpointId().orElse(Long.MAX_VALUE));
    }

    private Long readCheckpointId(DataInputDeserializer in) throws IOException {
        long checkpointId = in.readLong();
        return checkpointId == Long.MAX_VALUE ? null : Long.valueOf(checkpointId);
    }
}

