/*
 * Decompiled with CFR 0.152.
 */
package org.apache.stanbol.enhancer.jobmanager.event.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.clerezza.rdf.core.NonLiteral;
import org.apache.clerezza.rdf.core.TripleCollection;
import org.apache.clerezza.rdf.core.UriRef;
import org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJob;
import org.apache.stanbol.enhancer.jobmanager.event.impl.EventJobManagerImpl;
import org.apache.stanbol.enhancer.servicesapi.EngineException;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.ChainExecution;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EnhancementJobHandler
implements EventHandler {
    private EnhancementEngineManager engineManager;
    private EventAdmin eventAdmin;
    private static Logger log = LoggerFactory.getLogger(EnhancementJobHandler.class);
    private Map<EnhancementJob, EnhancementJobObserver> processingJobs;
    private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
    private Thread observerDaemon;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public EnhancementJobHandler(EventAdmin eventAdmin, EnhancementEngineManager engineManager) {
        if (eventAdmin == null) {
            throw new IllegalArgumentException("The parsed EventAdmin service MUST NOT be NULL!");
        }
        if (engineManager == null) {
            throw new IllegalArgumentException("The parsed EnhancementEngineManager MUST NOT be NULL!");
        }
        this.eventAdmin = eventAdmin;
        this.engineManager = engineManager;
        this.processingLock.writeLock().lock();
        try {
            this.processingJobs = new LinkedHashMap<EnhancementJob, EnhancementJobObserver>();
        }
        finally {
            this.processingLock.writeLock().unlock();
        }
        this.observerDaemon = new Thread(new EnhancementJobObserverDaemon());
        this.observerDaemon.setName("Event Job Manager Observer Daemon");
        this.observerDaemon.setDaemon(true);
        this.observerDaemon.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        log.info("deactivate {}", (Object)this.getClass().getName());
        this.processingLock.writeLock().lock();
        try {
            Iterator<EnhancementJobObserver> i$ = this.processingJobs.values().iterator();
            while (i$.hasNext()) {
                EnhancementJobObserver o;
                EnhancementJobObserver enhancementJobObserver = o = i$.next();
                synchronized (enhancementJobObserver) {
                    o.notifyAll();
                }
            }
            this.processingJobs = null;
        }
        finally {
            this.processingLock.writeLock().unlock();
        }
        this.observerDaemon = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public EnhancementJobObserver register(EnhancementJob enhancementJob) {
        boolean init;
        EnhancementJobObserver observer;
        this.processingLock.writeLock().lock();
        try {
            if (enhancementJob == null || this.processingJobs == null) {
                EnhancementJobObserver enhancementJobObserver = null;
                return enhancementJobObserver;
            }
            observer = this.processingJobs.get(enhancementJob);
            if (observer == null) {
                observer = new EnhancementJobObserver(enhancementJob);
                if (log.isDebugEnabled()) {
                    EnhancementJobHandler.logJobInfo(log, enhancementJob, "Add EnhancementJob:", log.isTraceEnabled());
                }
                this.processingJobs.put(enhancementJob, observer);
                init = true;
            } else {
                log.warn("Request to register an EnhancementJob for an ContentItem {} that isalready registered " + enhancementJob.getContentItem().getUri());
                init = false;
            }
        }
        finally {
            this.processingLock.writeLock().unlock();
        }
        if (init) {
            observer.acquire();
            enhancementJob.startProcessing();
            log.trace("++ w: {}", (Object)"init execution");
            enhancementJob.getLock().writeLock().lock();
            try {
                log.trace(">> w: {}", (Object)"init execution");
                if (!this.executeNextNodes(enhancementJob)) {
                    String message = "Unable to start Execution of " + enhancementJob.getContentItem().getUri();
                    log.warn(message);
                    EnhancementJobHandler.logJobInfo(log, enhancementJob, null, true);
                    log.warn("finishing job ...");
                    this.finish(enhancementJob);
                }
            }
            finally {
                log.trace("<< w: {}", (Object)"init execution");
                enhancementJob.getLock().writeLock().unlock();
            }
        }
        return observer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleEvent(Event event) {
        EnhancementJob job = (EnhancementJob)event.getProperty("stanbol.enhancer.jobmanager.event.job");
        NonLiteral execution = (NonLiteral)event.getProperty("stanbol.enhancer.jobmanager.event.execution");
        if (job == null || execution == null) {
            log.warn("Unable to process EnhancementEvent where EnhancementJob {} or Execution node {} is null -> ignore", (Object)job, (Object)execution);
        }
        try {
            this.processEvent(job, execution);
        }
        catch (Throwable t) {
            String message = String.format("Unexpected Exception while processing ContentItem %s with EnhancementJobManager: %s", job.getContentItem().getUri(), EventJobManagerImpl.class);
            job.setFailed(execution, null, new IllegalStateException(message, t));
            log.error(message, t);
        }
        log.trace("++ w: {}", (Object)"check for next Executions");
        job.getLock().writeLock().lock();
        log.trace(">> w: {}", (Object)"check for next Executions");
        try {
            if (job.isFinished()) {
                this.finish(job);
            } else if (!job.isFailed()) {
                if (!this.executeNextNodes(job) && job.getRunning().isEmpty()) {
                    log.warn("Unexpected state in the Execution of ContentItem {}: Job is not finished AND no executions are running AND no further execution could be started! -> finishing this job :(");
                    this.finish(job);
                }
            } else if (log.isInfoEnabled()) {
                ArrayList<String> running = new ArrayList<String>(3);
                for (NonLiteral runningNode : job.getRunning()) {
                    running.add(ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionPlan(), (NonLiteral)job.getExecutionNode(runningNode)));
                }
                log.info("Job {} failed, but {} still running!", (Object)job.getContentItem().getUri(), running);
            }
        }
        finally {
            log.trace("<< w: {}", (Object)"check for next Executions");
            job.getLock().writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEvent(EnhancementJob job, NonLiteral execution) {
        String engineName = ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionPlan(), (NonLiteral)job.getExecutionNode(execution));
        EnhancementEngine engine = this.engineManager.getEngine(engineName);
        if (engine != null) {
            int engineState;
            EngineException exception = null;
            try {
                engineState = engine.canEnhance(job.getContentItem());
            }
            catch (EngineException e) {
                exception = e;
                log.warn("Unable to check if engine '" + engineName + "'(type: " + engine.getClass() + ") can enhance ContentItem '" + job.getContentItem().getUri() + "'!", (Throwable)e);
                engineState = 0;
            }
            if (engineState == 1) {
                log.trace("++ w: {}: {}", (Object)"start sync execution", (Object)engine.getName());
                job.getLock().writeLock().lock();
                log.trace(">> w: {}: {}", (Object)"start sync execution", (Object)engine.getName());
                try {
                    engine.computeEnhancements(job.getContentItem());
                    job.setCompleted(execution);
                }
                catch (EngineException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    job.setFailed(execution, engine, (Exception)((Object)e));
                }
                catch (RuntimeException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    job.setFailed(execution, engine, e);
                }
                finally {
                    log.trace("<< w: {}: {}", (Object)"finished sync execution", (Object)engine.getName());
                    job.getLock().writeLock().unlock();
                }
            } else if (engineState == 2) {
                try {
                    log.trace("++ n: start async execution of Engine {}", (Object)engine.getName());
                    engine.computeEnhancements(job.getContentItem());
                    log.trace("++ n: finished async execution of Engine {}", (Object)engine.getName());
                    job.setCompleted(execution);
                }
                catch (EngineException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    job.setFailed(execution, engine, (Exception)((Object)e));
                }
                catch (RuntimeException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    job.setFailed(execution, engine, e);
                }
            } else if (exception != null) {
                job.setFailed(execution, engine, (Exception)((Object)exception));
            } else {
                job.setCompleted(execution);
            }
        } else {
            job.setFailed(execution, null, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finish(EnhancementJob job) {
        EnhancementJobObserver observer;
        this.processingLock.writeLock().lock();
        try {
            observer = this.processingJobs.remove(job);
        }
        finally {
            this.processingLock.writeLock().unlock();
        }
        if (observer != null) {
            try {
                if (log.isDebugEnabled()) {
                    EnhancementJobHandler.logJobInfo(log, job, "Finished EnhancementJob:", log.isTraceEnabled());
                }
                log.trace("++ n: finished processing ContentItem {} with Chain {}", (Object)job.getContentItem().getUri(), (Object)job.getChainName());
            }
            finally {
                observer.release();
            }
        } else {
            log.warn("EnhancementJob for ContentItem {} is not registered with {}. Will not send notification!", (Object)job.getContentItem().getUri(), (Object)this.getClass().getName());
        }
    }

    protected boolean executeNextNodes(EnhancementJob job) {
        boolean startedExecution = false;
        for (NonLiteral executable : job.getExecutable()) {
            if (log.isTraceEnabled()) {
                log.trace("PREPARE execution of Engine {}", (Object)ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionPlan(), (NonLiteral)job.getExecutionNode(executable)));
            }
            Hashtable<String, EnhancementJob> properties = new Hashtable<String, EnhancementJob>();
            ((Dictionary)properties).put("stanbol.enhancer.jobmanager.event.job", job);
            ((Dictionary)properties).put("stanbol.enhancer.jobmanager.event.execution", (EnhancementJob)executable);
            job.setRunning(executable);
            if (log.isTraceEnabled()) {
                log.trace("SHEDULE execution of Engine {}", (Object)ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionPlan(), (NonLiteral)job.getExecutionNode(executable)));
            }
            this.eventAdmin.postEvent(new Event("stanbol/enhancer/jobmanager/event/topic", properties));
            startedExecution = true;
        }
        return startedExecution;
    }

    protected static void logExecutionTimes(Logger logger, EnhancementJob job) {
        if (logger.isInfoEnabled()) {
            try {
                ExecutionMetadata em = ExecutionMetadata.parseFrom((TripleCollection)job.getExecutionMetadata(), (UriRef)job.getContentItem().getUri());
                ChainExecution ce = em.getChainExecution();
                long cd = ce.getDuration();
                logger.info("Executed Chain {} in {}ms", (Object)ce.getChainName(), (Object)ce.getDuration());
                logger.info(" > ContentItem: {}", (Object)job.getContentItem().getUri().getUnicodeString());
                ArrayList ees = new ArrayList(em.getEngineExecutions().values());
                Collections.sort(ees, new Comparator<Execution>(){

                    @Override
                    public int compare(Execution e1, Execution e2) {
                        return e1.getStarted().compareTo(e2.getStarted());
                    }
                });
                long eds = 0L;
                for (Execution ee : ees) {
                    long ed = ee.getDuration();
                    eds += ed;
                    int edp = Math.round((float)(ed * 100L) / (float)cd);
                    logger.info(" - {} in {}ms ({}%)", new Object[]{ee.getExecutionNode().getEngineName(), ed, edp});
                }
                float cf = eds / cd;
                int cfp = Math.round((cf - 1.0f) * 100.0f);
                logger.info(" > concurrency: {} ({}%)", (Object)Float.valueOf(cf), (Object)cfp);
            }
            catch (RuntimeException e) {
                log.warn("Exception while logging ExecutionTimes for Chain: '" + job.getChainName() + " and ContentItem " + job.getContentItem().getUri() + " to Logger " + logger.getName(), (Throwable)e);
            }
        }
    }

    protected static void logJobInfo(Logger log, EnhancementJob job, String header, boolean logExecutions) {
        if (!log.isInfoEnabled()) {
            return;
        }
        if (header != null) {
            log.info(header);
        }
        log.info("   finished:     {}", (Object)job.isFinished());
        log.info("   state:        {}", (Object)(job.isFailed() ? "failed" : "processing"));
        log.info("   chain:        {}", (Object)job.getChainName());
        log.info("   content-item: {}", (Object)job.getContentItem().getUri());
        if (logExecutions) {
            log.info("  executions:");
            for (NonLiteral completedExec : job.getCompleted()) {
                log.info("    - {} completed", (Object)ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionMetadata(), (NonLiteral)job.getExecutionNode(completedExec)));
            }
            for (NonLiteral runningExec : job.getRunning()) {
                log.info("    - {} running", (Object)ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionMetadata(), (NonLiteral)job.getExecutionNode(runningExec)));
            }
            for (NonLiteral executeable : job.getExecutable()) {
                log.info("    - {} executeable", (Object)ExecutionPlanHelper.getEngine((TripleCollection)job.getExecutionMetadata(), (NonLiteral)job.getExecutionNode(executeable)));
            }
        }
        if (job.getErrorMessage() != null) {
            log.info("Error Message: {}", (Object)job.getErrorMessage());
        }
        if (job.getError() != null) {
            log.info("Reported Exception:", (Throwable)job.getError());
        }
    }

    private class EnhancementJobObserverDaemon
    implements Runnable {
        private Logger observerLog = LoggerFactory.getLogger(EnhancementJobObserverDaemon.class);

        private EnhancementJobObserverDaemon() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            this.observerLog.debug(" ... init EnhancementJobObserver");
            while (EnhancementJobHandler.this.processingJobs != null) {
                List<Object> jobs;
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                Lock readLock = EnhancementJobHandler.this.processingLock.readLock();
                readLock.lock();
                try {
                    jobs = EnhancementJobHandler.this.processingJobs != null ? new ArrayList(EnhancementJobHandler.this.processingJobs.keySet()) : Collections.emptyList();
                }
                finally {
                    readLock.unlock();
                }
                if (!jobs.isEmpty()) {
                    this.observerLog.debug(" -- {} active Enhancement Jobs", (Object)jobs.size());
                    if (!this.observerLog.isDebugEnabled()) continue;
                    for (EnhancementJob job : jobs) {
                        Lock jobLock = job.getLock().readLock();
                        jobLock.lock();
                        try {
                            EnhancementJobHandler.logJobInfo(this.observerLog, job, null, true);
                        }
                        finally {
                            jobLock.unlock();
                        }
                    }
                    continue;
                }
                log.debug(" -- No active Enhancement Jobs");
            }
        }
    }

    public class EnhancementJobObserver {
        private static final int MIN_WAIT_TIME = 500;
        private final EnhancementJob enhancementJob;
        private final Semaphore semaphore;

        private EnhancementJobObserver(EnhancementJob job) {
            if (job == null) {
                throw new IllegalArgumentException("The parsed EnhancementJob MUST NOT be NULL!");
            }
            this.enhancementJob = job;
            this.semaphore = new Semaphore(1);
        }

        protected void acquire() {
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while acquireing Semaphore for EnhancementJob " + this.enhancementJob + "!", (Throwable)e);
            }
        }

        protected void release() {
            this.semaphore.release();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean hasCompleted() {
            this.enhancementJob.getLock().readLock().lock();
            try {
                boolean bl = this.enhancementJob.isFinished();
                return bl;
            }
            finally {
                this.enhancementJob.getLock().readLock().unlock();
            }
        }

        public boolean waitForCompletion(int maxEnhancementJobWaitTime) {
            boolean finished = false;
            if (this.semaphore.availablePermits() < 1) {
                try {
                    finished = this.semaphore.tryAcquire(1, Math.max(500, maxEnhancementJobWaitTime), TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    finished = false;
                }
            } else if (!this.hasCompleted()) {
                log.error("Unexpected {} permit(s) (expected = 0) available for Semaphore of  EnhancementJob of ContentItem {}. Please report this on dev@stanbol.apache.org and/or the Apache Stanbol Issue Tracker.", (Object)this.semaphore.availablePermits(), (Object)this.enhancementJob.getContentItem().getUri());
                finished = false;
            } else {
                finished = true;
            }
            return finished;
        }
    }
}

