/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.gfac.monitor.impl.pull.qstat;

import com.google.common.eventbus.EventBus;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.impl.pull.qstat.ResourceConnection;
import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.JobState;

public class HPCPullMonitor
extends PullMonitor {
    private static final AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
    public static final int FAILED_COUNT = 1;
    private BlockingQueue<UserMonitorData> queue;
    private boolean startPulling = false;
    private Map<String, ResourceConnection> connections;
    private MonitorPublisher publisher;
    private LinkedBlockingQueue<String> cancelJobList;
    private List<String> completedJobsFromPush;
    private GFac gfac;
    private AuthenticationInfo authenticationInfo;

    public HPCPullMonitor() {
        this.connections = new HashMap<String, ResourceConnection>();
        this.queue = new LinkedBlockingDeque<UserMonitorData>();
        this.publisher = new MonitorPublisher(new EventBus());
        this.cancelJobList = new LinkedBlockingQueue();
        this.completedJobsFromPush = new ArrayList<String>();
        new SimpleJobFinishConsumer(this.completedJobsFromPush).listen();
    }

    public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
        this.connections = new HashMap<String, ResourceConnection>();
        this.queue = new LinkedBlockingDeque<UserMonitorData>();
        this.publisher = monitorPublisher;
        this.authenticationInfo = authInfo;
        this.cancelJobList = new LinkedBlockingQueue();
        this.completedJobsFromPush = new ArrayList<String>();
        new SimpleJobFinishConsumer(this.completedJobsFromPush).listen();
    }

    public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
        this.queue = queue;
        this.publisher = publisher;
        this.connections = new HashMap<String, ResourceConnection>();
        this.cancelJobList = new LinkedBlockingQueue();
        this.completedJobsFromPush = new ArrayList<String>();
        new SimpleJobFinishConsumer(this.completedJobsFromPush).listen();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        this.startPulling = true;
        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
            try {
                BlockingQueue<UserMonitorData> blockingQueue = this.queue;
                synchronized (blockingQueue) {
                    if (this.queue.size() > 0) {
                        this.startPulling();
                    }
                }
                Thread.sleep(10000L);
            }
            catch (Exception e) {
                logger.error(e.getMessage(), (Throwable)e);
            }
        }
        for (String next : this.connections.keySet()) {
            ResourceConnection resourceConnection = this.connections.get(next);
            try {
                resourceConnection.getCluster().disconnect();
            }
            catch (SSHApiException e) {
                e.printStackTrace();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean startPulling() throws AiravataMonitorException {
        UserMonitorData take = null;
        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
        MonitorID currentMonitorID = null;
        try {
            take = this.queue.take();
            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
            ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator();
            while (hostIterator.hasNext()) {
                HostMonitorData iHostMonitorData = hostIterator.next();
                if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
                    MonitorID iMonitorID;
                    ListIterator<String> iterator;
                    Object iMonitorID2;
                    String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
                    ResourceConnection connection = null;
                    if (this.connections.containsKey(hostName)) {
                        if (!this.connections.get(hostName).isConnected()) {
                            connection = new ResourceConnection(iHostMonitorData, this.getAuthenticationInfo());
                            this.connections.put(hostName, connection);
                        } else {
                            logger.debug("We already have this connection so not going to create one");
                            connection = this.connections.get(hostName);
                        }
                    } else {
                        connection = new ResourceConnection(iHostMonitorData, this.getAuthenticationInfo());
                        this.connections.put(hostName, connection);
                    }
                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
                    Iterator<String> iterator1 = this.cancelJobList.iterator();
                    ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
                    while (monitorIDListIterator.hasNext()) {
                        iMonitorID2 = monitorIDListIterator.next();
                        while (iterator1.hasNext()) {
                            String cancelMId = iterator1.next();
                            if (!cancelMId.equals(iMonitorID2.getExperimentID() + "+" + iMonitorID2.getTaskID())) continue;
                            iMonitorID2.setStatus(JobState.CANCELED);
                            CommonUtils.removeMonitorFromQueue(take, iMonitorID2);
                            logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the completed job queue, experiment {}, task {} , job {}", new Object[]{iMonitorID2.getExperimentID(), iMonitorID2.getTaskID(), iMonitorID2.getJobID()});
                            logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", new Object[]{iMonitorID2.getExperimentID(), iMonitorID2.getTaskID(), iMonitorID2.getJobName()});
                            this.sendNotification((MonitorID)iMonitorID2);
                            monitorIDListIterator.remove();
                            GFacThreadPoolExecutor.getFixedThreadPool().submit((Runnable)new OutHandlerWorker(this.gfac, iMonitorID2, this.publisher));
                            break;
                        }
                        iterator1 = this.cancelJobList.iterator();
                    }
                    iMonitorID2 = this.completedJobsFromPush;
                    synchronized (iMonitorID2) {
                        iterator = this.completedJobsFromPush.listIterator();
                        block18: while (iterator.hasNext()) {
                            String completeId = iterator.next();
                            monitorIDListIterator = monitorID.listIterator();
                            while (monitorIDListIterator.hasNext()) {
                                MonitorID iMonitorID3 = monitorIDListIterator.next();
                                if (!completeId.equals(iMonitorID3.getUserName() + "," + iMonitorID3.getJobName())) continue;
                                logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
                                iMonitorID3.setStatus(JobState.COMPLETE);
                                CommonUtils.removeMonitorFromQueue(take, iMonitorID3);
                                logger.debugId(completeId, "Push notification updated job {} status to {}. experiment {} , task {}.", new Object[]{iMonitorID3.getJobID(), JobState.COMPLETE.toString(), iMonitorID3.getExperimentID(), iMonitorID3.getTaskID()});
                                logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", new Object[]{iMonitorID3.getExperimentID(), iMonitorID3.getTaskID(), iMonitorID3.getJobName()});
                                iterator.remove();
                                this.sendNotification(iMonitorID3);
                                GFacThreadPoolExecutor.getFixedThreadPool().submit((Runnable)new OutHandlerWorker(this.gfac, iMonitorID3, this.publisher));
                                continue block18;
                            }
                        }
                    }
                    monitorID = iHostMonitorData.getMonitorIDs();
                    Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
                    iterator = monitorID.listIterator();
                    while (iterator.hasNext()) {
                        currentMonitorID = iMonitorID = (MonitorID)iterator.next();
                        if (!JobState.CANCELED.equals((Object)iMonitorID.getStatus()) && !JobState.COMPLETE.equals((Object)iMonitorID.getStatus())) {
                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));
                        } else if (JobState.COMPLETE.equals((Object)iMonitorID.getStatus())) {
                            logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, task {}", new Object[]{iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()});
                            CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                            logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", new Object[]{iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()});
                            GFacThreadPoolExecutor.getFixedThreadPool().submit((Runnable)new OutHandlerWorker(this.gfac, iMonitorID, this.publisher));
                        }
                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));
                        iMonitorID.setLastMonitored(new Timestamp(new Date().getTime()));
                        this.sendNotification(iMonitorID);
                        iMonitorID.setLastMonitored(new Timestamp(new Date().getTime()));
                    }
                    iterator = monitorID.listIterator();
                    while (iterator.hasNext()) {
                        iMonitorID = (MonitorID)iterator.next();
                        if (iMonitorID.getFailedCount() > 1) {
                            List stdOut;
                            block41: {
                                iMonitorID.setLastMonitored(new Timestamp(new Date().getTime()));
                                String outputDir = iMonitorID.getJobExecutionContext().getOutputDir();
                                stdOut = null;
                                try {
                                    stdOut = connection.getCluster().listDirectory(outputDir);
                                }
                                catch (SSHApiException e) {
                                    if (!e.getMessage().contains("No such file or directory")) break block41;
                                    logger.error("We know this  job is already attempted to run out-handlers");
                                }
                            }
                            if (stdOut != null && stdOut.size() > 0 && !((String)stdOut.get(0)).isEmpty()) {
                                iMonitorID.setStatus(JobState.COMPLETE);
                                logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times,  Experiment {} , task {}", new Object[]{iMonitorID.getFailedCount(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()});
                                logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", new Object[]{iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()});
                                this.sendNotification(iMonitorID);
                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
                                GFacThreadPoolExecutor.getFixedThreadPool().submit((Runnable)new OutHandlerWorker(this.gfac, iMonitorID, this.publisher));
                                continue;
                            }
                            iMonitorID.setFailedCount(0);
                            continue;
                        }
                        iMonitorID.setLastMonitored(new Timestamp(new Date().getTime()));
                    }
                    continue;
                }
                logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", (Object)iHostMonitorData.getComputeResourceDescription().getHostName());
            }
            ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator();
            while (iterator1.hasNext()) {
                HostMonitorData iHostMonitorID = iterator1.next();
                if (iHostMonitorID.getMonitorIDs().size() != 0) continue;
                iterator1.remove();
                logger.debug("Removed host {} from monitoring queue", (Object)iHostMonitorID.getComputeResourceDescription().getHostName());
            }
            if (take.getHostMonitorData().size() != 0) {
                this.queue.put(take);
            }
        }
        catch (InterruptedException e) {
            if (!this.queue.contains(take)) {
                try {
                    this.queue.put(take);
                }
                catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
            throw new AiravataMonitorException(e);
        }
        catch (SSHApiException e) {
            logger.error(e.getMessage());
            if (e.getMessage().contains("Unknown Job Id Error")) {
                jobStatus.setState(JobState.UNKNOWN);
                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
                if (currentMonitorID != null) {
                    jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
                    jobIdentifier.setTaskId(currentMonitorID.getTaskID());
                    jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
                    jobIdentifier.setJobId(currentMonitorID.getJobID());
                    jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
                }
                jobStatus.setJobIdentity(jobIdentifier);
                this.publisher.publish((Object)jobStatus);
            } else if (e.getMessage().contains("illegally formed job identifier")) {
                logger.error("Wrong job ID is given so dropping the job from monitoring system");
            } else if (!this.queue.contains(take)) {
                try {
                    this.queue.put(take);
                }
                catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            throw new AiravataMonitorException("Error retrieving the job status", e);
        }
        catch (Exception e) {
            try {
                this.queue.put(take);
            }
            catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            throw new AiravataMonitorException("Error retrieving the job status", e);
        }
        return true;
    }

    private void sendNotification(MonitorID iMonitorID) {
        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID(), iMonitorID.getJobExecutionContext().getGatewayID());
        jobStatus.setJobIdentity(jobIdentity);
        jobStatus.setState(iMonitorID.getStatus());
        logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, experiment {} , task {}", (Object)jobStatus.getJobIdentity().getExperimentId(), (Object)jobStatus.getJobIdentity().getTaskId());
        this.publisher.publish((Object)jobStatus);
    }

    @Override
    public boolean stopPulling() {
        this.startPulling = false;
        return true;
    }

    @Override
    public MonitorPublisher getPublisher() {
        return this.publisher;
    }

    @Override
    public void setPublisher(MonitorPublisher publisher) {
        this.publisher = publisher;
    }

    public BlockingQueue<UserMonitorData> getQueue() {
        return this.queue;
    }

    public void setQueue(BlockingQueue<UserMonitorData> queue) {
        this.queue = queue;
    }

    public boolean authenticate() {
        return false;
    }

    public Map<String, ResourceConnection> getConnections() {
        return this.connections;
    }

    public boolean isStartPulling() {
        return this.startPulling;
    }

    public void setConnections(Map<String, ResourceConnection> connections) {
        this.connections = connections;
    }

    public void setStartPulling(boolean startPulling) {
        this.startPulling = startPulling;
    }

    public GFac getGfac() {
        return this.gfac;
    }

    public void setGfac(GFac gfac) {
        this.gfac = gfac;
    }

    public AuthenticationInfo getAuthenticationInfo() {
        return this.authenticationInfo;
    }

    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
        this.authenticationInfo = authenticationInfo;
    }

    public LinkedBlockingQueue<String> getCancelJobList() {
        return this.cancelJobList;
    }

    public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
        this.cancelJobList = cancelJobList;
    }
}

