/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.DataCatalogTable;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.SystemCatalogTable;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.flink.source.SystemTableSource;
import org.apache.paimon.flink.source.table.PushedRichTableSource;
import org.apache.paimon.flink.source.table.PushedTableSource;
import org.apache.paimon.flink.source.table.RichTableSource;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFlinkTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkTableFactory.class);

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        boolean isStreamingMode;
        CatalogTable origin = context.getCatalogTable().getOrigin();
        boolean bl = isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (origin instanceof SystemCatalogTable) {
            return new PushedTableSource(new SystemTableSource(((SystemCatalogTable)origin).table(), isStreamingMode, context.getObjectIdentifier()));
        }
        Table table = AbstractFlinkTableFactory.buildPaimonTable(context);
        if (table instanceof FileStoreTable) {
            this.storeTableLineage(((FileStoreTable)table).catalogEnvironment().lineageMetaFactory(), context, (entity, lineageFactory) -> {
                try (LineageMeta lineage = lineageFactory.create(() -> Options.fromMap(table.options()));){
                    lineage.saveSourceTableLineage((TableLineageEntity)entity);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        DataTableSource source = new DataTableSource(context.getObjectIdentifier(), table, isStreamingMode, context, AbstractFlinkTableFactory.createOptionalLogStoreFactory(context).orElse(null));
        return new Options(table.options()).get(FlinkConnectorOptions.SCAN_PUSH_DOWN) != false ? new PushedRichTableSource(source) : new RichTableSource(source);
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        Table table = AbstractFlinkTableFactory.buildPaimonTable(context);
        if (table instanceof FileStoreTable) {
            this.storeTableLineage(((FileStoreTable)table).catalogEnvironment().lineageMetaFactory(), context, (entity, lineageFactory) -> {
                try (LineageMeta lineage = lineageFactory.create(() -> Options.fromMap(table.options()));){
                    lineage.saveSinkTableLineage((TableLineageEntity)entity);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        return new FlinkTableSink(context.getObjectIdentifier(), table, context, AbstractFlinkTableFactory.createOptionalLogStoreFactory(context).orElse(null));
    }

    private void storeTableLineage(@Nullable LineageMetaFactory lineageMetaFactory, DynamicTableFactory.Context context, BiConsumer<TableLineageEntity, LineageMetaFactory> tableLineage) {
        if (lineageMetaFactory != null) {
            String pipelineName = (String)context.getConfiguration().get(PipelineOptions.NAME);
            if (pipelineName == null) {
                throw new ValidationException("Cannot get pipeline name for lineage meta.");
            }
            tableLineage.accept(new TableLineageEntityImpl(context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName(), pipelineName, Timestamp.fromEpochMillis(System.currentTimeMillis())), lineageMetaFactory);
        }
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(DynamicTableFactory.Context context) {
        return AbstractFlinkTableFactory.createOptionalLogStoreFactory(context.getClassLoader(), context.getCatalogTable().getOptions());
    }

    static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(ClassLoader classLoader, Map<String, String> options) {
        Options configOptions = new Options();
        options.forEach(configOptions::setString);
        if (configOptions.get(FlinkConnectorOptions.LOG_SYSTEM).equalsIgnoreCase("none")) {
            AbstractFlinkTableFactory.validateFileStoreContinuous(configOptions);
            return Optional.empty();
        }
        if (configOptions.get(CoreOptions.SCAN_MODE) == CoreOptions.StartupMode.FROM_SNAPSHOT || configOptions.get(CoreOptions.SCAN_MODE) == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
            throw new ValidationException(String.format("Log system does not support %s and %s scan mode", CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL));
        }
        return Optional.of(LogStoreTableFactory.discoverLogStoreFactory(classLoader, configOptions.get(FlinkConnectorOptions.LOG_SYSTEM)));
    }

    private static void validateFileStoreContinuous(Options options) {
        CoreOptions.LogChangelogMode changelogMode = options.get(CoreOptions.LOG_CHANGELOG_MODE);
        CoreOptions.StreamingReadMode streamingReadMode = options.get(CoreOptions.STREAMING_READ_MODE);
        if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) {
            throw new ValidationException("File store continuous reading does not support upsert changelog mode.");
        }
        CoreOptions.LogConsistency consistency = options.get(CoreOptions.LOG_CONSISTENCY);
        if (consistency == CoreOptions.LogConsistency.EVENTUAL) {
            throw new ValidationException("File store continuous reading does not support eventual consistency mode.");
        }
        if (streamingReadMode == CoreOptions.StreamingReadMode.LOG) {
            throw new ValidationException("File store continuous reading does not support the log streaming read mode.");
        }
    }

    static CatalogContext createCatalogContext(DynamicTableFactory.Context context) {
        return CatalogContext.create(Options.fromMap(context.getCatalogTable().getOptions()), new FlinkFileIOLoader());
    }

    static Table buildPaimonTable(DynamicTableFactory.Context context) {
        FileStoreTable table;
        CatalogTable origin = context.getCatalogTable().getOrigin();
        Map<String, String> dynamicOptions = AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
        dynamicOptions.forEach((key, value) -> {
            if (origin.getOptions().get(key) == null || !((String)origin.getOptions().get(key)).equals(value)) {
                SchemaManager.checkAlterTableOption(key);
            }
        });
        HashMap<String, String> newOptions = new HashMap<String, String>();
        newOptions.putAll(origin.getOptions());
        newOptions.putAll(dynamicOptions);
        if (origin instanceof DataCatalogTable) {
            FileStoreTable fileStoreTable = (FileStoreTable)((DataCatalogTable)origin).table();
            table = fileStoreTable.copyWithoutTimeTravel(newOptions);
        } else {
            table = FileStoreTableFactory.create(AbstractFlinkTableFactory.createCatalogContext(context)).copyWithoutTimeTravel(newOptions);
        }
        Schema schema = FlinkCatalog.fromCatalogTable((CatalogTable)context.getCatalogTable());
        RowType rowType = LogicalTypeConversion.toLogicalType(schema.rowType());
        List<String> partitionKeys = schema.partitionKeys();
        List<String> primaryKeys = schema.primaryKeys();
        Preconditions.checkArgument(AbstractFlinkTableFactory.schemaEquals(LogicalTypeConversion.toLogicalType(table.rowType()), rowType), "Flink schema and store schema are not the same, store schema is %s, Flink schema is %s", table.rowType(), rowType);
        Preconditions.checkArgument(table.partitionKeys().equals(partitionKeys), "Flink partitionKeys and store partitionKeys are not the same, store partitionKeys is %s, Flink partitionKeys is %s", table.partitionKeys(), partitionKeys);
        Preconditions.checkArgument(table.primaryKeys().equals(primaryKeys), "Flink primaryKeys and store primaryKeys are not the same, store primaryKeys is %s, Flink primaryKeys is %s", table.primaryKeys(), primaryKeys);
        return table;
    }

    @VisibleForTesting
    static boolean schemaEquals(RowType rowType1, RowType rowType2) {
        List fieldList1 = rowType1.getFields();
        List fieldList2 = rowType2.getFields();
        if (fieldList1.size() != fieldList2.size()) {
            return false;
        }
        for (int i = 0; i < fieldList1.size(); ++i) {
            RowType.RowField f1 = (RowType.RowField)fieldList1.get(i);
            RowType.RowField f2 = (RowType.RowField)fieldList2.get(i);
            if (f1.getName().equals(f2.getName()) && f1.getType().equals((Object)f2.getType())) continue;
            return false;
        }
        return true;
    }

    static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Context context) {
        Map conf;
        HashMap<String, String> optionsFromTableConfig = new HashMap<String, String>();
        ReadableConfig config = context.getConfiguration();
        if (config instanceof Configuration) {
            conf = ((Configuration)config).toMap();
        } else if (config instanceof TableConfig) {
            conf = ((TableConfig)config).getConfiguration().toMap();
        } else {
            throw new IllegalArgumentException("Unexpected config: " + config.getClass());
        }
        String template = String.format("(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)", "paimon", context.getObjectIdentifier().getCatalogName(), context.getObjectIdentifier().getDatabaseName(), context.getObjectIdentifier().getObjectName());
        Pattern pattern = Pattern.compile(template);
        conf.keySet().forEach(key -> {
            Matcher matcher;
            if (key.startsWith("paimon") && (matcher = pattern.matcher((CharSequence)key)).find()) {
                optionsFromTableConfig.put(matcher.group(5), (String)conf.get(key));
            }
        });
        if (!optionsFromTableConfig.isEmpty()) {
            LOG.info("Loading dynamic table options for {} in table config: {}", (Object)context.getObjectIdentifier().getObjectName(), optionsFromTableConfig);
        }
        return optionsFromTableConfig;
    }
}

