/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.watermark;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.utils.JsonSerdeUtil;

public class CdcTimestampExtractorFactory
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Map<Class<?>, Supplier<CdcTimestampExtractor>> extractorMap = new HashMap();

    public static CdcTimestampExtractor createExtractor(Object source) {
        Supplier<CdcTimestampExtractor> extractorSupplier = extractorMap.get(source.getClass());
        if (extractorSupplier != null) {
            return extractorSupplier.get();
        }
        throw new IllegalArgumentException("Unsupported source type: " + source.getClass().getName());
    }

    static {
        extractorMap.put(MongoDBSource.class, MongoDBCdcTimestampExtractor::new);
        extractorMap.put(MySqlSource.class, MysqlCdcTimestampExtractor::new);
        extractorMap.put(PulsarSource.class, MessageQueueCdcTimestampExtractor::new);
        extractorMap.put(KafkaSource.class, MessageQueueCdcTimestampExtractor::new);
    }

    public static interface CdcTimestampExtractor
    extends Serializable {
        public long extractTimestamp(CdcSourceRecord var1) throws JsonProcessingException;
    }

    public static class MysqlCdcTimestampExtractor
    implements CdcTimestampExtractor {
        @Override
        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
            return (Long)JsonSerdeUtil.extractValue((JsonNode)((JsonNode)record.getValue()), Long.class, (String[])new String[]{"payload", "ts_ms"});
        }
    }

    public static class MessageQueueCdcTimestampExtractor
    implements CdcTimestampExtractor {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(CdcSourceRecord cdcSourceRecord) throws JsonProcessingException {
            JsonNode record = (JsonNode)cdcSourceRecord.getValue();
            if (JsonSerdeUtil.isNodeExists((JsonNode)record, (String[])new String[]{"mysqlType"})) {
                return (Long)JsonSerdeUtil.extractValue((JsonNode)record, Long.class, (String[])new String[]{"ts"});
            }
            if (JsonSerdeUtil.isNodeExists((JsonNode)record, (String[])new String[]{"pos"})) {
                String dateTimeString = (String)JsonSerdeUtil.extractValue((JsonNode)record, String.class, (String[])new String[]{"op_ts"});
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
                LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
                return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            }
            if (JsonSerdeUtil.isNodeExists((JsonNode)record, (String[])new String[]{"xid"})) {
                return (Long)JsonSerdeUtil.extractValue((JsonNode)record, Long.class, (String[])new String[]{"ts"}) * 1000L;
            }
            if (JsonSerdeUtil.isNodeExists((JsonNode)record, (String[])new String[]{"payload", "source", "connector"})) {
                return (Long)JsonSerdeUtil.extractValue((JsonNode)record, Long.class, (String[])new String[]{"payload", "ts_ms"});
            }
            if (JsonSerdeUtil.isNodeExists((JsonNode)record, (String[])new String[]{"source", "connector"})) {
                return (Long)JsonSerdeUtil.extractValue((JsonNode)record, Long.class, (String[])new String[]{"ts_ms"});
            }
            throw new RuntimeException(String.format("Failed to extract timestamp: The JSON format of the message queue is unsupported. Record details: %s", record));
        }
    }

    public static class MongoDBCdcTimestampExtractor
    implements CdcTimestampExtractor {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractTimestamp(CdcSourceRecord record) throws JsonProcessingException {
            return (Long)JsonSerdeUtil.extractValue((JsonNode)((JsonNode)record.getValue()), Long.class, (String[])new String[]{"ts_ms"});
        }
    }
}

