/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.channel.jdbc.impl;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
import org.apache.commons.dbcp.PoolingDataSource;
import org.apache.commons.pool.KeyedObjectPoolFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.channel.jdbc.DatabaseType;
import org.apache.flume.channel.jdbc.JdbcChannelException;
import org.apache.flume.channel.jdbc.JdbcChannelProvider;
import org.apache.flume.channel.jdbc.TransactionIsolation;
import org.apache.flume.channel.jdbc.impl.JdbcTransactionFactory;
import org.apache.flume.channel.jdbc.impl.JdbcTransactionImpl;
import org.apache.flume.channel.jdbc.impl.PersistableEvent;
import org.apache.flume.channel.jdbc.impl.SchemaHandler;
import org.apache.flume.channel.jdbc.impl.SchemaHandlerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcChannelProviderImpl
implements JdbcChannelProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChannelProviderImpl.class);
    private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME = "org.apache.derby.jdbc.EmbeddedDriver";
    private static final String DEFAULT_DRIVER_CLASSNAME = "org.apache.derby.jdbc.EmbeddedDriver";
    private static final String DEFAULT_USERNAME = "sa";
    private static final String DEFAULT_PASSWORD = "";
    private static final String DEFAULT_DBTYPE = "DERBY";
    private GenericObjectPool connectionPool;
    private KeyedObjectPoolFactory statementPool;
    private DataSource dataSource;
    private DatabaseType databaseType;
    private SchemaHandler schemaHandler;
    private JdbcTransactionFactory txFactory;
    private String connectUrl;
    private String driverClassName;
    private long maxCapacity = 0L;

    @Override
    public void initialize(Context context) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Initializing JDBC Channel provider with props: " + context);
        }
        this.initializeSystemProperties(context);
        this.initializeDataSource(context);
        this.initializeSchema(context);
        this.initializeChannelState(context);
    }

    private void initializeSystemProperties(Context context) {
        Map sysProps = context.getSubProperties("org.apache.flume.channel.jdbc.sysprop.");
        for (String key : sysProps.keySet()) {
            Object object = sysProps.get(key);
            String value = DEFAULT_PASSWORD;
            if (object != null) {
                value = object.toString();
            }
            System.setProperty(key, value);
        }
    }

    private void initializeChannelState(Context context) {
        String maxCapacityStr = context.getString("org.apache.flume.channel.jdbc.maximum.capacity", "0");
        long maxCapacitySpecified = 0L;
        try {
            maxCapacitySpecified = Long.parseLong(maxCapacityStr);
        }
        catch (NumberFormatException nfe) {
            LOGGER.warn("Invalid value specified for maximum channel capacity: " + maxCapacityStr, (Throwable)nfe);
        }
        if (maxCapacitySpecified > 0L) {
            this.maxCapacity = maxCapacitySpecified;
            LOGGER.debug("Maximum channel capacity: " + this.maxCapacity);
        } else {
            LOGGER.warn("JDBC channel will operate without a capacity limit.");
        }
    }

    private void initializeSchema(Context context) {
        String createSchemaFlag = context.getString("org.apache.flume.channel.jdbc.create.schema", "true");
        boolean createSchema = Boolean.valueOf(createSchemaFlag);
        LOGGER.debug("Create schema flag set to: " + createSchema);
        this.schemaHandler = SchemaHandlerFactory.getHandler(this.databaseType, this.dataSource);
        if (!this.schemaHandler.schemaExists()) {
            if (!createSchema) {
                throw new JdbcChannelException("Schema does not exist and auto-generation is disabled. Please enable auto-generation of schema and try again.");
            }
            String createIndexFlag = context.getString("org.apache.flume.channel.jdbc.create.index", "true");
            boolean createIndex = Boolean.valueOf(createIndexFlag);
            if (!createIndex) {
                LOGGER.info("Index creation is disabled, indexes will not be created.");
            }
            this.schemaHandler.createSchemaObjects(createIndex);
        }
        this.schemaHandler.validateSchema();
    }

    @Override
    public void close() {
        try {
            this.connectionPool.close();
        }
        catch (Exception ex) {
            throw new JdbcChannelException("Unable to close connection pool", ex);
        }
        if (this.databaseType.equals((Object)DatabaseType.DERBY) && this.driverClassName.equals("org.apache.derby.jdbc.EmbeddedDriver")) {
            if (this.connectUrl.startsWith("jdbc:derby:")) {
                int index = this.connectUrl.indexOf(";");
                String baseUrl = null;
                baseUrl = index != -1 ? this.connectUrl.substring(0, index + 1) : this.connectUrl + ";";
                String shutDownUrl = baseUrl + "shutdown=true";
                LOGGER.debug("Attempting to shutdown embedded Derby using URL: " + shutDownUrl);
                try {
                    DriverManager.getConnection(shutDownUrl);
                }
                catch (SQLException ex) {
                    if (ex.getErrorCode() != 45000) {
                        throw new JdbcChannelException("Unable to shutdown embedded Derby: " + shutDownUrl + " Error Code: " + ex.getErrorCode(), ex);
                    }
                    LOGGER.info("Embedded Derby shutdown raised SQL STATE 45000 as expected.");
                }
            } else {
                LOGGER.warn("Even though embedded Derby drvier was loaded, the connect URL is of an unexpected form: " + this.connectUrl + ". Therfore no " + "attempt will be made to shutdown embedded Derby instance.");
            }
        }
        this.dataSource = null;
        this.txFactory = null;
        this.schemaHandler = null;
    }

    @Override
    public void persistEvent(String channel, Event event) {
        PersistableEvent persistableEvent = new PersistableEvent(channel, event);
        JdbcTransactionImpl tx = null;
        try {
            long currentSize;
            tx = this.getTransaction();
            tx.begin();
            Connection conn = tx.getConnection();
            if (this.maxCapacity > 0L && (currentSize = this.schemaHandler.getChannelSize(conn)) >= this.maxCapacity) {
                throw new JdbcChannelException("Channel capacity reached: maxCapacity: " + this.maxCapacity + ", currentSize: " + currentSize);
            }
            this.schemaHandler.storeEvent(persistableEvent, tx.getConnection());
            tx.commit();
        }
        catch (Exception ex) {
            tx.rollback();
            throw new JdbcChannelException("Failed to persist event", ex);
        }
        finally {
            if (tx != null) {
                tx.close();
            }
        }
    }

    @Override
    public Event removeEvent(String channelName) {
        PersistableEvent result = null;
        JdbcTransactionImpl tx = null;
        try {
            tx = this.getTransaction();
            tx.begin();
            result = this.schemaHandler.fetchAndDeleteEvent(channelName, tx.getConnection());
            tx.commit();
        }
        catch (Exception ex) {
            tx.rollback();
            throw new JdbcChannelException("Failed to persist event", ex);
        }
        finally {
            if (tx != null) {
                tx.close();
            }
        }
        return result;
    }

    @Override
    public JdbcTransactionImpl getTransaction() {
        return (JdbcTransactionImpl)this.txFactory.get();
    }

    private void initializeDataSource(Context context) {
        Object oldPass;
        Object oldUser;
        this.driverClassName = context.getString("org.apache.flume.channel.jdbc.driver.class");
        this.connectUrl = context.getString("org.apache.flume.channel.jdbc.driver.url");
        String userName = context.getString("org.apache.flume.channel.jdbc.db.username");
        String password = context.getString("org.apache.flume.channel.jdbc.db.password");
        String jdbcPropertiesFile = context.getString("org.apache.flume.channel.jdbc.connection.properties.file");
        String dbTypeName = context.getString("org.apache.flume.channel.jdbc.db.type");
        if (this.connectUrl == null || this.connectUrl.trim().length() == 0) {
            LOGGER.warn("No connection URL specified. Using embedded derby database instance.");
            this.driverClassName = "org.apache.derby.jdbc.EmbeddedDriver";
            userName = DEFAULT_USERNAME;
            password = DEFAULT_PASSWORD;
            dbTypeName = DEFAULT_DBTYPE;
            String homePath = System.getProperty("user.home").replace('\\', '/');
            String defaultDbDir = homePath + "/.flume/jdbc-channel";
            File dbDir = new File(defaultDbDir);
            String canonicalDbDirPath = null;
            try {
                canonicalDbDirPath = dbDir.getCanonicalPath();
            }
            catch (IOException ex) {
                throw new JdbcChannelException("Unable to find canonical path of dir: " + defaultDbDir, ex);
            }
            if (!dbDir.exists() && !dbDir.mkdirs()) {
                throw new JdbcChannelException("unable to create directory: " + canonicalDbDirPath);
            }
            this.connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";
            jdbcPropertiesFile = null;
            LOGGER.warn("Overriding values for - driver: " + this.driverClassName + ", user: " + userName + "connectUrl: " + this.connectUrl + ", jdbc properties file: " + jdbcPropertiesFile + ", dbtype: " + dbTypeName);
        }
        this.databaseType = DatabaseType.getByName(dbTypeName);
        switch (this.databaseType) {
            case DERBY: 
            case MYSQL: {
                break;
            }
            default: {
                throw new JdbcChannelException("Database " + (Object)((Object)this.databaseType) + " not supported at this time");
            }
        }
        if (this.driverClassName == null || this.driverClassName.trim().length() == 0) {
            throw new JdbcChannelException("No jdbc driver specified");
        }
        try {
            Class.forName(this.driverClassName);
        }
        catch (ClassNotFoundException ex) {
            throw new JdbcChannelException("Unable to load driver: " + this.driverClassName, ex);
        }
        Properties jdbcProps = new Properties();
        if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
            File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
            if (!jdbcPropsFile.exists()) {
                throw new JdbcChannelException("Jdbc properties file does not exist: " + jdbcPropertiesFile);
            }
            FileInputStream inStream = null;
            try {
                inStream = new FileInputStream(jdbcPropsFile);
                jdbcProps.load(inStream);
            }
            catch (IOException ex) {
                throw new JdbcChannelException("Unable to load jdbc properties from file: " + jdbcPropertiesFile, ex);
            }
            finally {
                if (inStream != null) {
                    try {
                        ((InputStream)inStream).close();
                    }
                    catch (IOException ex) {
                        LOGGER.error("Unable to close file: " + jdbcPropertiesFile, (Throwable)ex);
                    }
                }
            }
        }
        if (userName != null && (oldUser = jdbcProps.put("user", userName)) != null) {
            LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
        }
        if (password != null && (oldPass = jdbcProps.put("password", password)) != null) {
            LOGGER.warn("Overriding password from the jdbc properties with  the one specified explicitly.");
        }
        if (LOGGER.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("JDBC Properties {");
            boolean first = true;
            Enumeration<?> propertyKeys = jdbcProps.propertyNames();
            while (propertyKeys.hasMoreElements()) {
                if (first) {
                    first = false;
                } else {
                    sb.append(", ");
                }
                String key = (String)propertyKeys.nextElement();
                sb.append(key).append("=");
                if (key.equalsIgnoreCase("password")) {
                    sb.append("*******");
                    continue;
                }
                sb.append(jdbcProps.get(key));
            }
            sb.append("}");
            LOGGER.debug(sb.toString());
        }
        String txIsolation = context.getString("org.apache.flume.channel.jdbc.transaction.isolation", TransactionIsolation.READ_COMMITTED.getName());
        TransactionIsolation txIsolationLevel = TransactionIsolation.getByName(txIsolation);
        LOGGER.debug("Transaction isolation will be set to: " + (Object)((Object)txIsolationLevel));
        DriverManagerConnectionFactory connFactory = new DriverManagerConnectionFactory(this.connectUrl, jdbcProps);
        this.connectionPool = new GenericObjectPool();
        String maxActiveConnections = context.getString("org.apache.flume.channel.jdbc.maximum.connections", "10");
        int maxActive = 10;
        if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
            try {
                maxActive = Integer.parseInt(maxActiveConnections);
            }
            catch (NumberFormatException nfe) {
                LOGGER.warn("Max active connections has invalid value: " + maxActiveConnections + ", Using default: " + maxActive);
            }
        }
        LOGGER.debug("Max active connections for the pool: " + maxActive);
        this.connectionPool.setMaxActive(maxActive);
        this.statementPool = new GenericKeyedObjectPoolFactory(null);
        new PoolableConnectionFactory((ConnectionFactory)connFactory, (ObjectPool)this.connectionPool, this.statementPool, this.databaseType.getValidationQuery(), false, false, txIsolationLevel.getCode());
        this.dataSource = new PoolingDataSource((ObjectPool)this.connectionPool);
        this.txFactory = new JdbcTransactionFactory(this.dataSource);
    }
}

