/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.jdbc;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalogLock;
import org.apache.paimon.jdbc.JdbcCatalogLockContext;
import org.apache.paimon.jdbc.JdbcCatalogLockFactory;
import org.apache.paimon.jdbc.JdbcClientPool;
import org.apache.paimon.jdbc.JdbcUtils;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcCatalog
extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
    public static final String PROPERTY_PREFIX = "jdbc.";
    private static final String DATABASE_EXISTS_PROPERTY = "exists";
    private final JdbcClientPool connections;
    private final String catalogKey;
    private final Options options;
    private final String warehouse;

    protected JdbcCatalog(FileIO fileIO, String catalogKey, Options options, String warehouse) {
        super(fileIO, options);
        this.catalogKey = catalogKey;
        this.options = options;
        this.warehouse = warehouse;
        Preconditions.checkNotNull(options, "Invalid catalog properties: null");
        this.connections = new JdbcClientPool((int)options.get(CatalogOptions.CLIENT_POOL_SIZE), options.get(CatalogOptions.URI.key()), options.toMap());
        try {
            this.initializeCatalogTablesIfNeed();
        }
        catch (SQLException e) {
            throw new RuntimeException("Cannot initialize JDBC catalog", e);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted in call to initialize", e);
        }
    }

    @VisibleForTesting
    public JdbcClientPool getConnections() {
        return this.connections;
    }

    private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException {
        String uri = this.options.get(CatalogOptions.URI.key());
        Preconditions.checkNotNull(uri, "JDBC connection URI is required");
        this.connections.run(conn -> {
            DatabaseMetaData dbMeta = conn.getMetaData();
            ResultSet tableExists = dbMeta.getTables(null, null, "paimon_tables", null);
            if (tableExists.next()) {
                return true;
            }
            return conn.prepareStatement("CREATE TABLE paimon_tables(catalog_key VARCHAR(255) NOT NULL,database_name VARCHAR(255) NOT NULL,table_name VARCHAR(255) NOT NULL, PRIMARY KEY (catalog_key, database_name, table_name))").execute();
        });
        this.connections.run(conn -> {
            DatabaseMetaData dbMeta = conn.getMetaData();
            ResultSet tableExists = dbMeta.getTables(null, null, "paimon_database_properties", null);
            if (tableExists.next()) {
                return true;
            }
            return conn.prepareStatement("CREATE TABLE paimon_database_properties(catalog_key VARCHAR(255) NOT NULL,database_name VARCHAR(255) NOT NULL,property_key VARCHAR(255),property_value VARCHAR(1000),PRIMARY KEY (catalog_key, database_name, property_key))").execute();
        });
        if (this.lockEnabled()) {
            JdbcUtils.createDistributedLockTable(this.connections, this.options);
        }
    }

    @Override
    public String warehouse() {
        return this.warehouse;
    }

    @Override
    public List<String> listDatabases() {
        ArrayList<String> databases = Lists.newArrayList();
        databases.addAll(this.fetch(row -> row.getString("database_name"), "SELECT DISTINCT database_name FROM paimon_tables WHERE catalog_key = ?", this.catalogKey));
        databases.addAll(this.fetch(row -> row.getString("database_name"), "SELECT DISTINCT database_name FROM paimon_database_properties WHERE catalog_key = ?", this.catalogKey));
        return databases;
    }

    @Override
    protected boolean databaseExistsImpl(String databaseName) {
        return JdbcUtils.databaseExists(this.connections, this.catalogKey, databaseName);
    }

    @Override
    protected Map<String, String> loadDatabasePropertiesImpl(String databaseName) {
        if (!this.databaseExists(databaseName)) {
            throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
        }
        HashMap<String, String> properties = Maps.newHashMap();
        properties.putAll(this.fetchProperties(databaseName));
        if (!properties.containsKey("location")) {
            properties.put("location", this.newDatabasePath(databaseName).getName());
        }
        properties.remove(DATABASE_EXISTS_PROPERTY);
        return ImmutableMap.copyOf(properties);
    }

    @Override
    protected void createDatabaseImpl(String name, Map<String, String> properties) {
        if (this.databaseExists(name)) {
            throw new RuntimeException(String.format("Database already exists: %s", name));
        }
        HashMap<String, String> createProps = new HashMap<String, String>();
        createProps.put(DATABASE_EXISTS_PROPERTY, "true");
        if (properties != null && !properties.isEmpty()) {
            createProps.putAll(properties);
        }
        if (!createProps.containsKey("location")) {
            Path databasePath = this.newDatabasePath(name);
            createProps.put("location", databasePath.toString());
        }
        JdbcUtils.insertProperties(this.connections, this.catalogKey, name, createProps);
    }

    @Override
    protected void dropDatabaseImpl(String name) {
        JdbcUtils.execute(this.connections, "DELETE FROM  paimon_tables WHERE catalog_key = ? AND database_name = ?", this.catalogKey, name);
        JdbcUtils.execute(this.connections, "DELETE FROM paimon_database_properties WHERE catalog_key = ? AND database_name = ?", this.catalogKey, name);
    }

    @Override
    protected List<String> listTablesImpl(String databaseName) {
        if (!this.databaseExists(databaseName)) {
            throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
        }
        return this.fetch(row -> row.getString("table_name"), "SELECT * FROM paimon_tables WHERE catalog_key = ? AND database_name = ?", this.catalogKey, databaseName);
    }

    @Override
    protected void dropTableImpl(Identifier identifier) {
        try {
            int deletedRecords = JdbcUtils.execute(this.connections, "DELETE FROM paimon_tables WHERE catalog_key = ? AND database_name = ? AND table_name = ? ", this.catalogKey, identifier.getDatabaseName(), identifier.getObjectName());
            if (deletedRecords == 0) {
                LOG.info("Skipping drop, table does not exist: {}", (Object)identifier);
                return;
            }
            Path path = this.getDataTableLocation(identifier);
            try {
                if (this.fileIO.exists(path)) {
                    this.fileIO.deleteDirectoryQuietly(path);
                }
            }
            catch (Exception ex) {
                LOG.error("Delete directory[{}] fail for table {}", new Object[]{path, identifier, ex});
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e);
        }
    }

    @Override
    protected void createTableImpl(Identifier identifier, Schema schema) {
        try {
            this.getSchemaManager(identifier).createTable(schema);
            Path path = this.getDataTableLocation(identifier);
            int insertRecord = this.connections.run(conn -> {
                try (PreparedStatement sql = conn.prepareStatement("INSERT INTO paimon_tables (catalog_key, database_name, table_name)  VALUES (?,?,?)");){
                    sql.setString(1, this.catalogKey);
                    sql.setString(2, identifier.getDatabaseName());
                    sql.setString(3, identifier.getObjectName());
                    Integer n = sql.executeUpdate();
                    return n;
                }
            });
            if (insertRecord != 1) {
                try {
                    this.fileIO.deleteDirectoryQuietly(path);
                }
                catch (Exception ee) {
                    LOG.error("Delete directory[{}] fail for table {}", new Object[]{path, identifier, ee});
                }
                throw new RuntimeException(String.format("Failed to create table %s in catalog %s", identifier.getFullName(), this.catalogKey));
            }
            LOG.debug("Successfully committed to new table: {}", (Object)identifier);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
        }
    }

    @Override
    protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
        block4: {
            try {
                JdbcUtils.updateTable(this.connections, this.catalogKey, fromTable, toTable);
                Path fromPath = this.getDataTableLocation(fromTable);
                if (new SchemaManager(this.fileIO, fromPath).listAllIds().size() <= 0) break block4;
                Path toPath = this.getDataTableLocation(toTable);
                try {
                    this.fileIO.rename(fromPath, toPath);
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to rename changes of table " + toTable.getFullName() + " to underlying files.", e);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e);
            }
        }
    }

    @Override
    protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException {
        if (!this.tableExists(identifier)) {
            throw new RuntimeException("Table is not exists " + identifier.getFullName());
        }
        SchemaManager schemaManager = this.getSchemaManager(identifier);
        schemaManager.commitChanges(changes);
    }

    @Override
    protected TableSchema getDataTableSchema(Identifier identifier) throws Catalog.TableNotExistException {
        if (!this.tableExists(identifier)) {
            throw new Catalog.TableNotExistException(identifier);
        }
        Path tableLocation = this.getDataTableLocation(identifier);
        return new SchemaManager(this.fileIO, tableLocation).latest().orElseThrow(() -> new RuntimeException("There is no paimon table in " + tableLocation));
    }

    @Override
    public boolean tableExists(Identifier identifier) {
        if (this.isSystemTable(identifier)) {
            return super.tableExists(identifier);
        }
        return JdbcUtils.tableExists(this.connections, this.catalogKey, identifier.getDatabaseName(), identifier.getObjectName());
    }

    @Override
    public boolean caseSensitive() {
        return false;
    }

    @Override
    public Optional<CatalogLockFactory> defaultLockFactory() {
        return Optional.of(new JdbcCatalogLockFactory());
    }

    @Override
    public Optional<CatalogLockContext> lockContext() {
        return Optional.of(new JdbcCatalogLockContext(this.connections, this.catalogKey, this.options));
    }

    private Lock lock(Identifier identifier) {
        if (!this.lockEnabled()) {
            return new Lock.EmptyLock();
        }
        JdbcCatalogLock lock = new JdbcCatalogLock(this.connections, this.catalogKey, JdbcCatalogLock.checkMaxSleep(this.options.toMap()), JdbcCatalogLock.acquireTimeout(this.options.toMap()));
        return Lock.fromCatalog(lock, identifier);
    }

    @Override
    public void close() throws Exception {
        if (!this.connections.isClosed()) {
            this.connections.close();
        }
    }

    private SchemaManager getSchemaManager(Identifier identifier) {
        return new SchemaManager(this.fileIO, this.getDataTableLocation(identifier)).withLock(this.lock(identifier));
    }

    private Map<String, String> fetchProperties(String databaseName) {
        if (!this.databaseExists(databaseName)) {
            throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
        }
        List<Map.Entry> entries = this.fetch(row -> new AbstractMap.SimpleImmutableEntry<String, String>(row.getString("property_key"), row.getString("property_value")), "SELECT *  FROM paimon_database_properties WHERE catalog_key = ? AND database_name = ? ", this.catalogKey, databaseName);
        return ImmutableMap.builder().putAll(entries).build();
    }

    private <R> List<R> fetch(RowProducer<R> toRow, String sql, String ... args) {
        try {
            return this.connections.run(conn -> {
                ArrayList result = Lists.newArrayList();
                try (PreparedStatement preparedStatement = conn.prepareStatement(sql);){
                    for (int pos = 0; pos < args.length; ++pos) {
                        preparedStatement.setString(pos + 1, args[pos]);
                    }
                    try (ResultSet rs = preparedStatement.executeQuery();){
                        while (rs.next()) {
                            result.add(toRow.apply(rs));
                        }
                    }
                }
                return result;
            });
        }
        catch (SQLException e) {
            throw new RuntimeException(String.format("Failed to execute query: %s", sql), e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted in SQL query", e);
        }
    }

    @FunctionalInterface
    static interface RowProducer<R> {
        public R apply(ResultSet var1) throws SQLException;
    }
}

