/*
 * Decompiled with CFR 0.152.
 */
package org.apache.stratos.cartridge.agent.data.publisher.log;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
import org.apache.stratos.cartridge.agent.data.publisher.DataPublisherConfiguration;
import org.apache.stratos.cartridge.agent.data.publisher.exception.DataPublisherException;
import org.apache.stratos.cartridge.agent.data.publisher.log.Constants;
import org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher;
import org.apache.stratos.cartridge.agent.data.publisher.log.LogPublisher;
import org.apache.stratos.cartridge.agent.util.CartridgeAgentUtils;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;

public class LogPublisherManager {
    private static final Log log = LogFactory.getLog(LogPublisherManager.class);
    private static DataPublisherConfiguration dataPublisherConfig = null;
    private static StreamDefinition streamDefinition = null;
    private static List<LogPublisher> fileBasedLogPublishers = new ArrayList<LogPublisher>();

    public void init(DataPublisherConfiguration dataPublisherConfig) throws DataPublisherException {
        LogPublisherManager.dataPublisherConfig = dataPublisherConfig;
        ArrayList<Integer> ports = new ArrayList<Integer>();
        ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerPort()));
        ports.add(Integer.parseInt(dataPublisherConfig.getMonitoringServerSecurePort()));
        CartridgeAgentUtils.waitUntilPortsActive(dataPublisherConfig.getMonitoringServerIp(), ports);
        if (!CartridgeAgentUtils.checkPortsActive(dataPublisherConfig.getMonitoringServerIp(), ports)) {
            throw new DataPublisherException("Monitoring server not active, data publishing is aborted");
        }
        try {
            streamDefinition = new StreamDefinition(Constants.LOG_PUBLISHER_STREAM_PREFIX + CartridgeAgentConfiguration.getInstance().getClusterId(), Constants.LOG_PUBLISHER_STREAM_VERSION);
        }
        catch (MalformedStreamDefinitionException e) {
            throw new RuntimeException(e);
        }
        streamDefinition.setDescription("Apache Stratos Instance Log Publisher");
        ArrayList<Attribute> metaDataDefinition = new ArrayList<Attribute>();
        metaDataDefinition.add(new Attribute(Constants.MEMBER_ID, AttributeType.STRING));
        ArrayList<Attribute> payloadDataDefinition = new ArrayList<Attribute>();
        payloadDataDefinition.add(new Attribute(Constants.LOG_EVENT, AttributeType.STRING));
        streamDefinition.setMetaData(metaDataDefinition);
        streamDefinition.setPayloadData(payloadDataDefinition);
    }

    public void start(String filePath) throws DataPublisherException {
        File logFile = new File(filePath);
        if (!logFile.exists() || !logFile.canRead() || logFile.isDirectory()) {
            throw new DataPublisherException("Unable to read the file at path " + filePath);
        }
        FileBasedLogPublisher fileBasedLogPublisher = new FileBasedLogPublisher(dataPublisherConfig, streamDefinition, filePath, CartridgeAgentConfiguration.getInstance().getMemberId());
        fileBasedLogPublisher.initialize();
        ((LogPublisher)fileBasedLogPublisher).start();
        fileBasedLogPublishers.add(fileBasedLogPublisher);
    }

    public void stop() {
        if (dataPublisherConfig.isEnabled()) {
            for (LogPublisher fileBasedLogPublisher : fileBasedLogPublishers) {
                fileBasedLogPublisher.stop();
            }
        }
    }
}

