/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.types.RowKind;

public class KafkaLogSerializationSchema
implements KafkaSerializationSchema<SinkRecord> {
    private static final long serialVersionUID = 1L;
    private final String topic;
    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogChangelogMode changelogMode;

    public KafkaLogSerializationSchema(String topic, @Nullable SerializationSchema<RowData> primaryKeySerializer, SerializationSchema<RowData> valueSerializer, CoreOptions.LogChangelogMode changelogMode) {
        this.topic = topic;
        this.primaryKeySerializer = primaryKeySerializer;
        this.valueSerializer = valueSerializer;
        this.changelogMode = changelogMode;
        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT && primaryKeySerializer == null) {
            throw new IllegalArgumentException("Can not use upsert changelog mode for non-pk table.");
        }
    }

    public void open(SerializationSchema.InitializationContext context) throws Exception {
        if (this.primaryKeySerializer != null) {
            this.primaryKeySerializer.open(context);
        }
        this.valueSerializer.open(context);
    }

    public ProducerRecord<byte[], byte[]> serialize(SinkRecord element, @Nullable Long timestamp) {
        RowKind kind = element.row().getRowKind();
        byte[] primaryKeyBytes = null;
        byte[] valueBytes = null;
        if (this.primaryKeySerializer != null) {
            primaryKeyBytes = this.primaryKeySerializer.serialize((Object)new FlinkRowData((InternalRow)element.primaryKey()));
            if (this.changelogMode == CoreOptions.LogChangelogMode.ALL || kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
                valueBytes = this.valueSerializer.serialize((Object)new FlinkRowData(element.row()));
            }
        } else {
            valueBytes = this.valueSerializer.serialize((Object)new FlinkRowData(element.row()));
        }
        Integer partition = element.bucket() < 0 ? null : Integer.valueOf(element.bucket());
        return new ProducerRecord(this.topic, partition, (Object)primaryKeyBytes, (Object)valueBytes);
    }
}

