/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.Aggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
import org.apache.hama.util.KeyValuePair;

public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    private static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    private static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    private static final Text FLAG_MESSAGE_COUNTS = new Text("hama.0");
    public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
    public static final String GRAPH_REPAIR = "hama.graph.repair";
    private Configuration conf;
    private Combiner<M> combiner;
    private Partitioner<V, M> partitioner;
    private Aggregator<M, Vertex<V, E, M>>[] aggregators;
    private Writable[] globalAggregatorResult;
    private IntWritable[] globalAggregatorIncrement;
    private boolean[] isAbstractAggregator;
    private String[] aggregatorClassNames;
    private Text[] aggregatorValueFlag;
    private Text[] aggregatorIncrementFlag;
    private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
    private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
    private String masterTask;
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private long numberVertices;
    private int maxIteration = -1;
    private long iteration;
    private Class<V> vertexIdClass;
    private Class<M> vertexValueClass;
    private Class<E> edgeValueClass;
    private Class<Vertex<V, E, M>> vertexClass;

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        String aggregatorClasses;
        this.conf = peer.getConfiguration();
        this.masterTask = peer.getPeerName(0);
        this.vertexIdClass = this.conf.getClass("hama.graph.vertex.id.class", Text.class, Writable.class);
        this.vertexValueClass = this.conf.getClass("hama.graph.vertex.value.class", IntWritable.class, Writable.class);
        this.edgeValueClass = this.conf.getClass("hama.graph.vertex.edge.value.class", IntWritable.class, Writable.class);
        this.vertexClass = this.conf.getClass("hama.graph.vertex.class", Vertex.class);
        GraphJobMessage.VERTEX_ID_CLASS = this.vertexIdClass;
        GraphJobMessage.VERTEX_VALUE_CLASS = this.vertexValueClass;
        GraphJobMessage.VERTEX_CLASS = this.vertexClass;
        GraphJobMessage.EDGE_VALUE_CLASS = this.edgeValueClass;
        boolean repairNeeded = this.conf.getBoolean(GRAPH_REPAIR, false);
        boolean runtimePartitioning = this.conf.getBoolean("hama.graph.runtime.partitioning", true);
        this.partitioner = (Partitioner)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), (Configuration)this.conf);
        if (!this.conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(Combiner.class)) {
            LOG.debug((Object)("Combiner class: " + this.conf.get(MESSAGE_COMBINER_CLASS)));
            this.combiner = (Combiner)ReflectionUtils.newInstance((Class)this.conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class), (Configuration)this.conf);
        }
        if ((aggregatorClasses = this.conf.get("hama.graph.aggregator.class")) != null) {
            LOG.debug((Object)("Aggregator classes: " + aggregatorClasses));
            this.aggregatorClassNames = aggregatorClasses.split(";");
            this.aggregators = new Aggregator[this.aggregatorClassNames.length];
            this.globalAggregatorResult = new Writable[this.aggregatorClassNames.length];
            this.globalAggregatorIncrement = new IntWritable[this.aggregatorClassNames.length];
            this.isAbstractAggregator = new boolean[this.aggregatorClassNames.length];
            this.aggregatorValueFlag = new Text[this.aggregatorClassNames.length];
            this.aggregatorIncrementFlag = new Text[this.aggregatorClassNames.length];
            if (this.isMasterTask(peer)) {
                this.masterAggregator = new Aggregator[this.aggregatorClassNames.length];
            }
            for (int i = 0; i < this.aggregatorClassNames.length; ++i) {
                this.aggregators[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
                this.aggregatorValueFlag[i] = new Text("hama.1;" + i);
                this.aggregatorIncrementFlag[i] = new Text("hama.2;" + i);
                if (this.aggregators[i] instanceof AbstractAggregator) {
                    this.isAbstractAggregator[i] = true;
                }
                if (!this.isMasterTask(peer)) continue;
                this.masterAggregator[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
            }
        }
        VertexInputReader reader = (VertexInputReader)ReflectionUtils.newInstance((Class)this.conf.getClass("hama.graph.input.reader.class", VertexInputReader.class), (Configuration)this.conf);
        this.loadVertices(peer, repairNeeded, runtimePartitioning, this.partitioner, reader);
        this.numberVertices = this.vertices.size() * peer.getNumPeers();
        for (Map.Entry<V, Vertex<V, E, M>> e : this.vertices.entrySet()) {
            LinkedList<M> msgIterator = new LinkedList<M>();
            Vertex v = e.getValue();
            msgIterator.add(v.getValue());
            M lastValue = v.getValue();
            v.compute(msgIterator.iterator());
            if (this.aggregators == null) continue;
            for (int i = 0; i < this.aggregators.length; ++i) {
                Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
                aggregator.aggregate(v, v.getValue());
                if (!this.isAbstractAggregator[i]) continue;
                AbstractAggregator intern = (AbstractAggregator)aggregator;
                intern.aggregate(v, lastValue, v.getValue());
                intern.aggregateInternal();
            }
        }
        this.runAggregators(peer, 1);
        ++this.iteration;
    }

    /*
     * Could not resolve type clashes
     */
    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        this.maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        while (this.updated && (this.maxIteration <= 0 || this.iteration <= (long)this.maxIteration)) {
            this.globalUpdateCounts = 0;
            peer.sync();
            Map<V, LinkedList<M>> messages = this.parseMessages(peer);
            if (this.isMasterTask(peer) && this.iteration > 1L) {
                MapWritable updatedCnt = new MapWritable();
                if (this.globalUpdateCounts == 0) {
                    updatedCnt.put((Writable)FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(Integer.MIN_VALUE));
                } else if (this.aggregators != null) {
                    for (int i = 0; i < this.masterAggregator.length; ++i) {
                        M lastAggregatedValue = this.masterAggregator[i].getValue();
                        if (this.isAbstractAggregator[i]) {
                            AbstractAggregator intern = (AbstractAggregator)this.masterAggregator[i];
                            Object finalizeAggregation = intern.finalizeAggregation();
                            if (intern.finalizeAggregation() != null) {
                                lastAggregatedValue = finalizeAggregation;
                            }
                            updatedCnt.put((Writable)this.aggregatorIncrementFlag[i], (Writable)intern.getTimesAggregated());
                        }
                        updatedCnt.put((Writable)this.aggregatorValueFlag[i], lastAggregatedValue);
                    }
                }
                for (String peerName : peer.getAllPeerNames()) {
                    peer.send(peerName, (Writable)new GraphJobMessage(updatedCnt));
                }
            }
            if (this.aggregators != null && this.iteration > 1L) {
                peer.sync();
                MapWritable updatedValues = ((GraphJobMessage)peer.getCurrentMessage()).getMap();
                for (int i = 0; i < this.aggregators.length; ++i) {
                    this.globalAggregatorResult[i] = updatedValues.get((Object)this.aggregatorValueFlag[i]);
                    this.globalAggregatorIncrement[i] = (IntWritable)updatedValues.get((Object)this.aggregatorIncrementFlag[i]);
                }
                IntWritable count = (IntWritable)updatedValues.get((Object)FLAG_MESSAGE_COUNTS);
                if (count != null && count.get() == Integer.MIN_VALUE) {
                    this.updated = false;
                    break;
                }
            }
            int activeVertices = 0;
            for (Vertex vertex : this.vertices.values()) {
                LinkedList<Object> msgs = messages.get(vertex.getVertexID());
                if (vertex.isHalted() && msgs != null) {
                    vertex.setActive();
                }
                if (msgs == null) {
                    msgs = new LinkedList();
                }
                if (vertex.isHalted()) continue;
                if (this.combiner != null) {
                    Writable combined = this.combiner.combine(msgs);
                    msgs = new LinkedList();
                    msgs.add(combined);
                }
                M lastValue = vertex.getValue();
                vertex.compute(msgs.iterator());
                if (this.aggregators != null && this.aggregators != null) {
                    for (int i = 0; i < this.aggregators.length; ++i) {
                        Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
                        aggregator.aggregate(vertex, vertex.getValue());
                        if (!this.isAbstractAggregator[i]) continue;
                        AbstractAggregator intern = (AbstractAggregator)aggregator;
                        intern.aggregate(vertex, lastValue, vertex.getValue());
                        intern.aggregateInternal();
                    }
                }
                if (vertex.isHalted()) continue;
                ++activeVertices;
            }
            this.runAggregators(peer, activeVertices);
            ++this.iteration;
        }
    }

    private void runAggregators(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, int activeVertices) throws IOException {
        MapWritable updatedCnt = new MapWritable();
        updatedCnt.put((Writable)FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(activeVertices));
        if (this.aggregators != null) {
            int i;
            for (i = 0; i < this.aggregators.length; ++i) {
                updatedCnt.put((Writable)this.aggregatorValueFlag[i], this.aggregators[i].getValue());
                if (!this.isAbstractAggregator[i]) continue;
                updatedCnt.put((Writable)this.aggregatorIncrementFlag[i], (Writable)((AbstractAggregator)this.aggregators[i]).getTimesAggregated());
            }
            for (i = 0; i < this.aggregators.length; ++i) {
                this.aggregators[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
                if (!this.isMasterTask(peer)) continue;
                this.masterAggregator[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
            }
        }
        peer.send(this.masterTask, (Writable)new GraphJobMessage(updatedCnt));
    }

    private Map<V, LinkedList<M>> parseMessages(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        GraphJobMessage msg = null;
        HashMap<Writable, LinkedList<Writable>> msgMap = new HashMap<Writable, LinkedList<Writable>>();
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            if (msg.isVertexMessage()) {
                Writable vertexID = msg.getVertexId();
                Writable value = msg.getVertexValue();
                LinkedList<Writable> msgs = (LinkedList<Writable>)msgMap.get(vertexID);
                if (msgs == null) {
                    msgs = new LinkedList<Writable>();
                    msgMap.put(vertexID, msgs);
                }
                msgs.add(value);
                continue;
            }
            if (msg.isMapMessage()) {
                for (Map.Entry e : msg.getMap().entrySet()) {
                    int index;
                    Text vertexID = (Text)e.getKey();
                    if (FLAG_MESSAGE_COUNTS.equals((Object)vertexID)) {
                        if (((IntWritable)e.getValue()).get() == Integer.MIN_VALUE) {
                            this.updated = false;
                            continue;
                        }
                        this.globalUpdateCounts += ((IntWritable)e.getValue()).get();
                        continue;
                    }
                    if (this.aggregators != null && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
                        index = Integer.parseInt(vertexID.toString().split(";")[1]);
                        this.masterAggregator[index].aggregate(null, (Writable)e.getValue());
                        continue;
                    }
                    if (this.aggregators == null || !vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT) || !this.isAbstractAggregator[index = Integer.parseInt(vertexID.toString().split(";")[1])]) continue;
                    ((AbstractAggregator)this.masterAggregator[index]).addTimesAggregated(((IntWritable)e.getValue()).get());
                }
                continue;
            }
            throw new UnsupportedOperationException("Unknown message type? " + msg);
        }
        return msgMap;
    }

    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, boolean repairNeeded, boolean runtimePartitioning, Partitioner<V, M> partitioner, VertexInputReader<Writable, Writable, V, E, M> reader) throws IOException, SyncException, InterruptedException {
        LOG.debug((Object)("vertex class: " + this.vertexClass));
        boolean selfReference = this.conf.getBoolean("hama.graph.self.ref", false);
        Vertex vertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
        vertex.setPeer(peer);
        vertex.runner = this;
        KeyValuePair next = null;
        int lines = 0;
        while ((next = peer.readNext()) != null) {
            boolean vertexFinished = reader.parseVertex((Writable)next.getKey(), (Writable)next.getValue(), vertex);
            if (!vertexFinished) continue;
            if (vertex.getEdges() == null) {
                vertex.setEdges(new ArrayList(0));
            }
            if (selfReference) {
                vertex.addEdge(new Edge<V, Object>(vertex.getVertexID(), peer.getPeerName(), null));
            }
            if (runtimePartitioning) {
                int partition = partitioner.getPartition(vertex.getVertexID(), vertex.getValue(), peer.getNumPeers());
                for (Edge<V, E> edge : vertex.getEdges()) {
                    int edgePartition = partitioner.getPartition(edge.getDestinationVertexID(), edge.getValue(), peer.getNumPeers());
                    edge.destinationPeerName = peer.getPeerName(edgePartition);
                }
                peer.send(peer.getPeerName(partition), (Writable)new GraphJobMessage(vertex));
            } else {
                vertex.setup(this.conf);
                this.vertices.put(vertex.getVertexID(), vertex);
            }
            vertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
            vertex.setPeer(peer);
            vertex.runner = this;
            if (++lines % 100000 != 0) continue;
            peer.sync();
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                Vertex<?, ?, ?> messagedVertex = msg.getVertex();
                messagedVertex.setPeer(peer);
                messagedVertex.runner = this;
                messagedVertex.setup(this.conf);
                this.vertices.put((Vertex<?, ?, ?>)messagedVertex.getVertexID(), (Vertex<Vertex<?, ?, ?>, E, M>)messagedVertex);
            }
        }
        if (runtimePartitioning) {
            peer.sync();
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                Vertex<?, ?, ?> messagedVertex = msg.getVertex();
                messagedVertex.setPeer(peer);
                messagedVertex.runner = this;
                messagedVertex.setup(this.conf);
                this.vertices.put((Vertex<?, ?, ?>)messagedVertex.getVertexID(), (Vertex<Vertex<?, ?, ?>, E, M>)messagedVertex);
            }
        }
        LOG.info((Object)("Loading finished at " + peer.getSuperstepCount() + " steps."));
        if (repairNeeded) {
            LOG.debug((Object)"Starting repair of this graph!");
            Collection<Vertex<V, E, M>> entries = this.vertices.values();
            for (Vertex<V, E, M> entry : entries) {
                List<Edge<V, E>> outEdges = entry.getEdges();
                for (Edge<V, E> e : outEdges) {
                    peer.send(e.getDestinationPeerName(), (Writable)new GraphJobMessage((Writable)e.getDestinationVertexID()));
                }
            }
            peer.sync();
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                Writable vertexName = msg.getVertexId();
                if (this.vertices.containsKey(vertexName)) continue;
                Vertex newVertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
                newVertex.setPeer(peer);
                newVertex.setVertexID(vertexName);
                newVertex.runner = this;
                if (selfReference) {
                    int partition = partitioner.getPartition(newVertex.getVertexID(), newVertex.getValue(), peer.getNumPeers());
                    String target = peer.getPeerName(partition);
                    newVertex.setEdges(Collections.singletonList(new Edge<V, Object>(newVertex.getVertexID(), target, null)));
                } else {
                    newVertex.setEdges(new ArrayList(0));
                }
                newVertex.setup(this.conf);
                this.vertices.put(vertexName, newVertex);
            }
        }
    }

    public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> vertexClass, Configuration conf) {
        Vertex vertex = (Vertex)ReflectionUtils.newInstance(vertexClass, (Configuration)conf);
        return vertex;
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        for (Map.Entry<V, Vertex<V, E, M>> e : this.vertices.entrySet()) {
            peer.write(e.getValue().getVertexID(), e.getValue().getValue());
        }
    }

    private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
        try {
            return (Aggregator)ReflectionUtils.newInstance((Class)this.conf.getClassByName(clsName), (Configuration)this.conf);
        }
        catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new IllegalArgumentException("Aggregator class " + clsName + " could not be found or instantiated!");
        }
    }

    private boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName().equals(this.masterTask);
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public Partitioner<V, M> getPartitioner() {
        return this.partitioner;
    }

    public final Writable getLastAggregatedValue(int index) {
        return this.globalAggregatorResult[index];
    }

    public final IntWritable getNumLastAggregatedVertices(int index) {
        return this.globalAggregatorIncrement[index];
    }
}

