/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;

public class CdcRecordStoreWriteOperator
extends TableWriteOperator<CdcRecord> {
    private static final long serialVersionUID = 1L;
    public static final ConfigOption<Duration> RETRY_SLEEP_TIME = ConfigOptions.key("cdc.retry-sleep-time").durationType().defaultValue(Duration.ofMillis(500L));
    private final long retrySleepMillis;

    public CdcRecordStoreWriteOperator(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(table, storeSinkWriteProvider, initialCommitUser);
        this.retrySleepMillis = table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        this.table = this.table.copyWithLatestSchema();
        super.initializeState(context);
    }

    @Override
    protected boolean containLogSystem() {
        return false;
    }

    public void processElement(StreamRecord<CdcRecord> element) throws Exception {
        CdcRecord record = (CdcRecord)element.getValue();
        Optional<GenericRow> optionalConverted = CdcRecordUtils.toGenericRow(record, this.table.schema().fields());
        if (!optionalConverted.isPresent()) {
            while (true) {
                this.table = this.table.copyWithLatestSchema();
                optionalConverted = CdcRecordUtils.toGenericRow(record, this.table.schema().fields());
                if (optionalConverted.isPresent()) break;
                Thread.sleep(this.retrySleepMillis);
            }
            this.write.replace(this.table);
        }
        try {
            this.write.write(optionalConverted.get());
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }
}

