/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.graph.AggregationRunner;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
import org.apache.hama.util.KeyValuePair;

public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    public static final Text FLAG_MESSAGE_COUNTS = new Text("hama.0");
    public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
    public static final String GRAPH_REPAIR = "hama.graph.repair";
    private Configuration conf;
    private Combiner<M> combiner;
    private Partitioner<V, M> partitioner;
    private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>();
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private long numberVertices = 0L;
    private int maxIteration = -1;
    private long iteration;
    private Class<V> vertexIdClass;
    private Class<M> vertexValueClass;
    private Class<E> edgeValueClass;
    private Class<Vertex<V, E, M>> vertexClass;
    private AggregationRunner<V, E, M> aggregationRunner;
    private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        this.setupFields(peer);
        this.loadVertices(peer);
        this.countGlobalVertexCount(peer);
        this.doInitialSuperstep(peer);
    }

    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        while (this.updated && (this.maxIteration <= 0 || this.iteration <= (long)this.maxIteration)) {
            this.globalUpdateCounts = 0;
            peer.sync();
            Map<V, List<M>> messages = this.parseMessages(peer);
            this.doMasterUpdates(peer);
            if (!this.aggregationRunner.receiveAggregatedValues(peer, this.iteration)) break;
            this.doSuperstep(messages, peer);
            if (!GraphJobRunner.isMasterTask(peer)) continue;
            peer.getCounter((Enum)GraphJobCounter.ITERATIONS).increment(1L);
        }
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        for (Vertex<V, E, M> e : this.vertices) {
            peer.write(e.getVertexID(), e.getValue());
        }
    }

    private void doMasterUpdates(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        if (GraphJobRunner.isMasterTask(peer) && this.iteration > 1L) {
            MapWritable updatedCnt = new MapWritable();
            if (this.globalUpdateCounts == 0) {
                updatedCnt.put((Writable)FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(Integer.MIN_VALUE));
            } else {
                this.aggregationRunner.doMasterAggregation(updatedCnt);
            }
            for (String peerName : peer.getAllPeerNames()) {
                peer.send(peerName, (Writable)new GraphJobMessage(updatedCnt));
            }
        }
    }

    private void doSuperstep(Map<V, List<M>> messages, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        int activeVertices = 0;
        for (Vertex<V, E, Object> vertex : this.vertices) {
            List<Object> msgs = messages.get(vertex.getVertexID());
            if (vertex.isHalted() && msgs != null) {
                vertex.setActive();
            }
            if (msgs == null) {
                msgs = Collections.emptyList();
            }
            if (vertex.isHalted()) continue;
            if (this.combiner != null) {
                Writable combined = this.combiner.combine(msgs);
                msgs = new ArrayList();
                msgs.add(combined);
            }
            M lastValue = vertex.getValue();
            vertex.compute(msgs.iterator());
            this.aggregationRunner.aggregateVertex(lastValue, vertex);
            if (vertex.isHalted()) continue;
            ++activeVertices;
        }
        this.aggregationRunner.sendAggregatorValues(peer, activeVertices);
        ++this.iteration;
    }

    private void doInitialSuperstep(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        for (Vertex<V, E, M> vertex : this.vertices) {
            List<M> singletonList = Collections.singletonList(vertex.getValue());
            M lastValue = vertex.getValue();
            vertex.compute(singletonList.iterator());
            this.aggregationRunner.aggregateVertex(lastValue, vertex);
        }
        this.aggregationRunner.sendAggregatorValues(peer, 1);
        ++this.iteration;
    }

    private void setupFields(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        this.peer = peer;
        this.conf = peer.getConfiguration();
        this.maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        this.vertexIdClass = this.conf.getClass("hama.graph.vertex.id.class", Text.class, Writable.class);
        this.vertexValueClass = this.conf.getClass("hama.graph.vertex.value.class", IntWritable.class, Writable.class);
        this.edgeValueClass = this.conf.getClass("hama.graph.vertex.edge.value.class", IntWritable.class, Writable.class);
        this.vertexClass = this.conf.getClass("hama.graph.vertex.class", Vertex.class);
        GraphJobMessage.VERTEX_ID_CLASS = this.vertexIdClass;
        GraphJobMessage.VERTEX_VALUE_CLASS = this.vertexValueClass;
        GraphJobMessage.VERTEX_CLASS = this.vertexClass;
        GraphJobMessage.EDGE_VALUE_CLASS = this.edgeValueClass;
        this.partitioner = (Partitioner)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), (Configuration)this.conf);
        if (!this.conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(Combiner.class)) {
            LOG.debug((Object)("Combiner class: " + this.conf.get(MESSAGE_COMBINER_CLASS)));
            this.combiner = (Combiner)ReflectionUtils.newInstance((Class)this.conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class), (Configuration)this.conf);
        }
        this.aggregationRunner = new AggregationRunner();
        this.aggregationRunner.setupAggregators(peer);
    }

    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        VertexInputReader reader = (VertexInputReader)ReflectionUtils.newInstance((Class)this.conf.getClass("hama.graph.input.reader.class", VertexInputReader.class), (Configuration)this.conf);
        boolean repairNeeded = this.conf.getBoolean(GRAPH_REPAIR, false);
        boolean runtimePartitioning = this.conf.getBoolean("hama.graph.runtime.partitioning", true);
        long splitSize = peer.getSplitSize();
        int partitioningSteps = this.partitionMultiSteps(peer, splitSize);
        long interval = splitSize / (long)partitioningSteps;
        boolean selfReference = this.conf.getBoolean("hama.graph.self.ref", false);
        LOG.debug((Object)("vertex class: " + this.vertexClass));
        Vertex vertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
        vertex.runner = this;
        long startPos = peer.getPos();
        if (startPos == 0L) {
            startPos = 1L;
        }
        KeyValuePair next = null;
        int steps = 1;
        while ((next = peer.readNext()) != null) {
            boolean vertexFinished = false;
            try {
                vertexFinished = reader.parseVertex((Writable)next.getKey(), (Writable)next.getValue(), vertex);
            }
            catch (Exception e) {
                throw new IOException("exception occured during parsing vertex!" + e.toString());
            }
            if (!vertexFinished) continue;
            if (vertex.getEdges() == null) {
                if (selfReference) {
                    vertex.setEdges(Collections.singletonList(new Edge<V, Object>(vertex.getVertexID(), null)));
                } else {
                    vertex.setEdges(Collections.EMPTY_LIST);
                }
            }
            if (selfReference) {
                vertex.addEdge(new Edge<V, Object>(vertex.getVertexID(), null));
            }
            if (runtimePartitioning) {
                int partition = this.partitioner.getPartition(vertex.getVertexID(), vertex.getValue(), peer.getNumPeers());
                peer.send(peer.getPeerName(partition), (Writable)new GraphJobMessage(vertex));
            } else {
                vertex.setup(this.conf);
                this.vertices.add(vertex);
            }
            vertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
            vertex.runner = this;
            if (!runtimePartitioning || steps >= partitioningSteps || peer.getPos() - startPos < interval) continue;
            peer.sync();
            ++steps;
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                Vertex<?, ?, ?> messagedVertex = msg.getVertex();
                messagedVertex.runner = this;
                messagedVertex.setup(this.conf);
                this.vertices.add(messagedVertex);
            }
            startPos = peer.getPos();
        }
        if (runtimePartitioning) {
            peer.sync();
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                Vertex<?, ?, ?> messagedVertex = msg.getVertex();
                messagedVertex.runner = this;
                messagedVertex.setup(this.conf);
                this.vertices.add(messagedVertex);
            }
        }
        LOG.debug((Object)("Loading finished at " + peer.getSuperstepCount() + " steps."));
        if (repairNeeded) {
            LOG.debug((Object)"Starting repair of this graph!");
            this.repair(peer, partitioningSteps, selfReference);
        }
        LOG.debug((Object)"Starting Vertex processing!");
    }

    private void repair(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, int partitioningSteps, boolean selfReference) throws IOException, SyncException, InterruptedException {
        int multiSteps = 0;
        MapWritable ssize = new MapWritable();
        ssize.put((Writable)new IntWritable(peer.getPeerIndex()), (Writable)new IntWritable(this.vertices.size()));
        peer.send(GraphJobRunner.getMasterTask(peer), (Writable)new GraphJobMessage(ssize));
        ssize = null;
        peer.sync();
        if (GraphJobRunner.isMasterTask(peer)) {
            int minVerticesSize = Integer.MAX_VALUE;
            GraphJobMessage received = null;
            while ((received = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                MapWritable x = received.getMap();
                for (Map.Entry e : x.entrySet()) {
                    int curr = ((IntWritable)e.getValue()).get();
                    if (minVerticesSize <= curr) continue;
                    minVerticesSize = curr;
                }
            }
            multiSteps = minVerticesSize < partitioningSteps * 2 ? minVerticesSize : partitioningSteps * 2;
            for (String peerName : peer.getAllPeerNames()) {
                MapWritable mapWritable = new MapWritable();
                mapWritable.put((Writable)new Text("steps"), (Writable)new IntWritable(multiSteps));
                peer.send(peerName, (Writable)new GraphJobMessage(mapWritable));
            }
        }
        peer.sync();
        GraphJobMessage received = (GraphJobMessage)peer.getCurrentMessage();
        MapWritable x = received.getMap();
        for (Map.Entry e : x.entrySet()) {
            multiSteps = ((IntWritable)e.getValue()).get();
        }
        HashMap tmp = new HashMap();
        int i = 0;
        int syncs = 0;
        for (Vertex<Edge<V, E>, E, M> vertex : this.vertices) {
            for (Edge<V, E> e : vertex.getEdges()) {
                peer.send(vertex.getDestinationPeerName(e), (Writable)new GraphJobMessage((Writable)e.getDestinationVertexID()));
            }
            if (syncs < multiSteps && i % (this.vertices.size() / multiSteps) == 0) {
                peer.sync();
                ++syncs;
                GraphJobMessage msg = null;
                while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                    Writable vertexName = msg.getVertexId();
                    Vertex newVertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
                    newVertex.setVertexID(vertexName);
                    newVertex.runner = this;
                    if (selfReference) {
                        newVertex.setEdges(Collections.singletonList(new Edge<V, Object>(newVertex.getVertexID(), null)));
                    } else {
                        newVertex.setEdges(new ArrayList(0));
                    }
                    newVertex.setup(this.conf);
                    tmp.put(vertexName, newVertex);
                }
            }
            ++i;
        }
        peer.sync();
        GraphJobMessage msg = null;
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            Writable writable = msg.getVertexId();
            Vertex newVertex = GraphJobRunner.newVertexInstance(this.vertexClass, this.conf);
            newVertex.setVertexID(writable);
            newVertex.runner = this;
            if (selfReference) {
                newVertex.setEdges(Collections.singletonList(new Edge<V, Object>(newVertex.getVertexID(), null)));
            } else {
                newVertex.setEdges(new ArrayList(0));
            }
            newVertex.setup(this.conf);
            tmp.put(writable, newVertex);
            newVertex = null;
        }
        for (Vertex<V, E, M> e : this.vertices) {
            if (!tmp.containsKey(e.getVertexID())) continue;
            tmp.remove(e.getVertexID());
        }
        this.vertices.addAll(tmp.values());
        tmp.clear();
    }

    private int partitionMultiSteps(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, long splitSize) throws IOException, SyncException, InterruptedException {
        int multiSteps = 1;
        MapWritable ssize = new MapWritable();
        ssize.put((Writable)new IntWritable(peer.getPeerIndex()), (Writable)new LongWritable(splitSize));
        peer.send(GraphJobRunner.getMasterTask(peer), (Writable)new GraphJobMessage(ssize));
        ssize = null;
        peer.sync();
        if (GraphJobRunner.isMasterTask(peer)) {
            long maxSplitSize = 0L;
            GraphJobMessage received = null;
            while ((received = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                MapWritable x = received.getMap();
                for (Map.Entry e : x.entrySet()) {
                    long curr = ((LongWritable)e.getValue()).get();
                    if (maxSplitSize >= curr) continue;
                    maxSplitSize = curr;
                }
            }
            int steps = (int)(maxSplitSize / this.conf.getLong("hama.graph.multi.step.partitioning.interval", 20000000L)) + 1;
            for (String peerName : peer.getAllPeerNames()) {
                MapWritable temp = new MapWritable();
                temp.put((Writable)new Text("max"), (Writable)new IntWritable(steps));
                peer.send(peerName, (Writable)new GraphJobMessage(temp));
            }
        }
        peer.sync();
        GraphJobMessage received = (GraphJobMessage)peer.getCurrentMessage();
        MapWritable x = received.getMap();
        for (Map.Entry e : x.entrySet()) {
            multiSteps = ((IntWritable)e.getValue()).get();
        }
        if (GraphJobRunner.isMasterTask(peer)) {
            peer.getCounter((Enum)GraphJobCounter.MULTISTEP_PARTITIONING).increment((long)multiSteps);
        }
        return multiSteps;
    }

    private void countGlobalVertexCount(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        for (String peerName : peer.getAllPeerNames()) {
            peer.send(peerName, (Writable)new GraphJobMessage(new IntWritable(this.vertices.size())));
        }
        peer.sync();
        GraphJobMessage msg = null;
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            if (!msg.isVerticesSizeMessage()) continue;
            this.numberVertices += (long)msg.getVerticesSize().get();
        }
        if (GraphJobRunner.isMasterTask(peer)) {
            peer.getCounter((Enum)GraphJobCounter.INPUT_VERTICES).increment(this.numberVertices);
        }
    }

    private Map<V, List<M>> parseMessages(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        GraphJobMessage msg = null;
        HashMap<Writable, ArrayList<Writable>> msgMap = new HashMap<Writable, ArrayList<Writable>>();
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            if (msg.isVertexMessage()) {
                Writable vertexID = msg.getVertexId();
                Writable value = msg.getVertexValue();
                ArrayList<Writable> msgs = (ArrayList<Writable>)msgMap.get(vertexID);
                if (msgs == null) {
                    msgs = new ArrayList<Writable>();
                    msgMap.put(vertexID, msgs);
                }
                msgs.add(value);
                continue;
            }
            if (msg.isMapMessage()) {
                for (Map.Entry e : msg.getMap().entrySet()) {
                    Text vertexID = (Text)e.getKey();
                    if (FLAG_MESSAGE_COUNTS.equals((Object)vertexID)) {
                        if (((IntWritable)e.getValue()).get() == Integer.MIN_VALUE) {
                            this.updated = false;
                            continue;
                        }
                        this.globalUpdateCounts += ((IntWritable)e.getValue()).get();
                        continue;
                    }
                    if (this.aggregationRunner.isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
                        this.aggregationRunner.masterReadAggregatedValue(vertexID, (Writable)e.getValue());
                        continue;
                    }
                    if (!this.aggregationRunner.isEnabled() || !vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) continue;
                    this.aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, (Writable)e.getValue());
                }
                continue;
            }
            throw new UnsupportedOperationException("Unknown message type: " + msg);
        }
        return msgMap;
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public final Partitioner<V, M> getPartitioner() {
        return this.partitioner;
    }

    public final Writable getLastAggregatedValue(int index) {
        return this.aggregationRunner.getLastAggregatedValue(index);
    }

    public final IntWritable getNumLastAggregatedVertices(int index) {
        return this.aggregationRunner.getNumLastAggregatedVertices(index);
    }

    public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer() {
        return this.peer;
    }

    public static boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName().equals(GraphJobRunner.getMasterTask(peer));
    }

    public static String getMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName(0);
    }

    public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> vertexClass, Configuration conf) {
        Vertex vertex = (Vertex)ReflectionUtils.newInstance(vertexClass, (Configuration)conf);
        return vertex;
    }

    public static enum GraphJobCounter {
        MULTISTEP_PARTITIONING,
        ITERATIONS,
        INPUT_VERTICES,
        AGGREGATE_VERTICES;

    }
}

