/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.graph.AggregationRunner;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.IDSkippingIterator;
import org.apache.hama.graph.ListVerticesInfo;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexMessageIterable;
import org.apache.hama.graph.VertexOutputWriter;
import org.apache.hama.graph.VerticesInfo;
import org.apache.hama.util.KeyValuePair;

public final class GraphJobRunner<V extends WritableComparable, E extends Writable, M extends Writable>
extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    public static final Text FLAG_MESSAGE_COUNTS = new Text("hama.0");
    public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class";
    public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
    private Configuration conf;
    private Combiner<M> combiner;
    private Partitioner<V, M> partitioner;
    public static Class<?> VERTEX_CLASS;
    public static Class<? extends WritableComparable> VERTEX_ID_CLASS;
    public static Class<? extends Writable> VERTEX_VALUE_CLASS;
    public static Class<? extends Writable> EDGE_VALUE_CLASS;
    public static Class<Vertex<?, ?, ?>> vertexClass;
    private VerticesInfo<V, E, M> vertices;
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private long numberVertices = 0L;
    private int maxIteration = -1;
    private long iteration;
    private AggregationRunner<V, E, M> aggregationRunner;
    private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
    private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        this.setupFields(peer);
        this.loadVertices(peer);
        this.countGlobalVertexCount(peer);
        this.doInitialSuperstep(peer);
    }

    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        while (this.updated && (this.maxIteration <= 0 || this.iteration <= (long)this.maxIteration)) {
            this.globalUpdateCounts = 0;
            peer.sync();
            GraphJobMessage firstVertexMessage = this.parseMessages(peer);
            firstVertexMessage = this.doAggregationUpdates(firstVertexMessage, peer);
            if (!this.updated) break;
            this.doSuperstep(firstVertexMessage, peer);
            if (!GraphJobRunner.isMasterTask(peer)) continue;
            peer.getCounter((Enum)GraphJobCounter.ITERATIONS).increment(1L);
        }
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.vertexOutputWriter.setup(this.conf);
        IDSkippingIterator<V, E, M> skippingIterator = this.vertices.skippingIterator();
        while (skippingIterator.hasNext()) {
            this.vertexOutputWriter.write(skippingIterator.next(), peer);
        }
        this.vertices.cleanup(this.conf, peer.getTaskId());
    }

    private GraphJobMessage doAggregationUpdates(GraphJobMessage firstVertexMessage, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        if (GraphJobRunner.isMasterTask(peer) && this.iteration > 1L) {
            MapWritable updatedCnt = new MapWritable();
            if (this.globalUpdateCounts == 0) {
                updatedCnt.put((Writable)FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(Integer.MIN_VALUE));
            } else {
                this.aggregationRunner.doMasterAggregation(updatedCnt);
            }
            for (String peerName : peer.getAllPeerNames()) {
                peer.send(peerName, (Writable)new GraphJobMessage(updatedCnt));
            }
        }
        if (this.aggregationRunner.isEnabled() && this.iteration > 1L) {
            if (firstVertexMessage != null) {
                peer.send(peer.getPeerName(), (Writable)firstVertexMessage);
            }
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                peer.send(peer.getPeerName(), (Writable)msg);
            }
            peer.sync();
            this.updated = this.aggregationRunner.receiveAggregatedValues(((GraphJobMessage)peer.getCurrentMessage()).getMap(), this.iteration);
            firstVertexMessage = (GraphJobMessage)peer.getCurrentMessage();
        }
        return firstVertexMessage;
    }

    private void doSuperstep(GraphJobMessage currentMessage, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        int activeVertices = 0;
        this.vertices.startSuperstep();
        IDSkippingIterator<WritableComparable, E, M> iterator = this.vertices.skippingIterator();
        while (iterator.hasNext(currentMessage == null ? null : (WritableComparable)currentMessage.getVertexId(), IDSkippingIterator.Strategy.ALL)) {
            Vertex<V, E, Object> vertex = iterator.next();
            VertexMessageIterable<WritableComparable, M> iterable = null;
            if (currentMessage != null) {
                iterable = this.iterate(currentMessage, (WritableComparable)currentMessage.getVertexId(), vertex, peer);
            }
            if (iterable != null && vertex.isHalted()) {
                vertex.setActive();
            }
            if (!vertex.isHalted()) {
                M lastValue = vertex.getValue();
                if (iterable == null) {
                    vertex.compute(Collections.emptyList());
                } else {
                    if (this.combiner != null) {
                        Writable combined = this.combiner.combine(iterable);
                        vertex.compute(Collections.singleton(combined));
                    } else {
                        vertex.compute(iterable);
                    }
                    currentMessage = iterable.getOverflowMessage();
                }
                this.aggregationRunner.aggregateVertex(lastValue, vertex);
                if (!vertex.isHalted()) {
                    ++activeVertices;
                }
            }
            this.vertices.finishVertexComputation(vertex);
        }
        this.vertices.finishSuperstep();
        this.aggregationRunner.sendAggregatorValues(peer, activeVertices);
        ++this.iteration;
    }

    private VertexMessageIterable<V, M> iterate(GraphJobMessage currentMessage, V firstMessageId, Vertex<V, E, M> vertex, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        int comparision = firstMessageId.compareTo(vertex.getVertexID());
        if (this.conf.getBoolean("hama.check.missing.vertex", true)) {
            if (comparision < 0) {
                throw new IllegalArgumentException("Messages must never be behind the vertex in ID! Current Message ID: " + firstMessageId + " vs. " + vertex.getVertexID());
            }
        } else {
            while (comparision < 0) {
                VertexMessageIterable messageIterable = new VertexMessageIterable(currentMessage, firstMessageId, peer);
                currentMessage = messageIterable.getOverflowMessage();
                firstMessageId = (WritableComparable)currentMessage.getVertexId();
                comparision = firstMessageId.compareTo(vertex.getVertexID());
            }
        }
        if (comparision == 0) {
            return new VertexMessageIterable(currentMessage, vertex.getVertexID(), peer);
        }
        return null;
    }

    private void doInitialSuperstep(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.vertices.startSuperstep();
        IDSkippingIterator<V, E, M> skippingIterator = this.vertices.skippingIterator();
        while (skippingIterator.hasNext()) {
            Vertex<V, E, M> vertex = skippingIterator.next();
            M lastValue = vertex.getValue();
            vertex.compute(Collections.singleton(vertex.getValue()));
            this.aggregationRunner.aggregateVertex(lastValue, vertex);
            this.vertices.finishVertexComputation(vertex);
        }
        this.vertices.finishSuperstep();
        this.aggregationRunner.sendAggregatorValues(peer, 1);
        ++this.iteration;
    }

    private void setupFields(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.peer = peer;
        this.conf = peer.getConfiguration();
        this.maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        GraphJobRunner.initClasses(this.conf);
        this.partitioner = (Partitioner)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), (Configuration)this.conf);
        if (!this.conf.getClass(MESSAGE_COMBINER_CLASS_KEY, Combiner.class).equals(Combiner.class)) {
            LOG.debug((Object)("Combiner class: " + this.conf.get(MESSAGE_COMBINER_CLASS_KEY)));
            this.combiner = (Combiner)ReflectionUtils.newInstance((Class)this.conf.getClass(MESSAGE_COMBINER_CLASS_KEY, Combiner.class), (Configuration)this.conf);
        }
        Class outputWriter = this.conf.getClass("hama.graph.vertex.output.writer.class", VertexOutputWriter.class);
        this.vertexOutputWriter = (VertexOutputWriter)org.apache.hama.util.ReflectionUtils.newInstance((Class)outputWriter);
        this.aggregationRunner = new AggregationRunner();
        this.aggregationRunner.setupAggregators(peer);
        this.vertices = new ListVerticesInfo();
        this.vertices.init(this, this.conf, peer.getTaskId());
    }

    public static <V extends WritableComparable<? super V>, E extends Writable, M extends Writable> void initClasses(Configuration conf) {
        Class vertexIdClass = conf.getClass("hama.graph.vertex.id.class", Text.class, Writable.class);
        Class vertexValueClass = conf.getClass("hama.graph.vertex.value.class", IntWritable.class, Writable.class);
        Class edgeValueClass = conf.getClass("hama.graph.vertex.edge.value.class", IntWritable.class, Writable.class);
        vertexClass = conf.getClass(VERTEX_CLASS_KEY, Vertex.class);
        VERTEX_ID_CLASS = vertexIdClass;
        VERTEX_VALUE_CLASS = vertexValueClass;
        VERTEX_CLASS = vertexClass;
        EDGE_VALUE_CLASS = edgeValueClass;
    }

    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        boolean selfReference = this.conf.getBoolean("hama.graph.self.ref", false);
        PartitioningRunner.RecordConverter converter = (PartitioningRunner.RecordConverter)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.runtime.partition.recordconverter", PartitioningRunner.DefaultRecordConverter.class, PartitioningRunner.RecordConverter.class), (Configuration)this.conf);
        Vertex vertex = GraphJobRunner.newVertexInstance(VERTEX_CLASS);
        KeyValuePair record = null;
        KeyValuePair converted = null;
        while ((record = peer.readNext()) != null) {
            converted = converter.convertRecord(record, this.conf);
            vertex = (Vertex)converted.getKey();
            vertex.runner = this;
            vertex.setup(this.conf);
            if (selfReference) {
                vertex.addEdge(new Edge(vertex.getVertexID(), null));
            }
            this.vertices.addVertex(vertex);
            vertex = GraphJobRunner.newVertexInstance(VERTEX_CLASS);
            vertex.runner = this;
        }
        this.vertices.finishAdditions();
        this.vertices.finishSuperstep();
        LOG.info((Object)(this.vertices.size() + " vertices are loaded into " + peer.getPeerName()));
        LOG.debug((Object)"Starting Vertex processing!");
    }

    private void countGlobalVertexCount(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        GraphJobMessage msg;
        for (String peerName : peer.getAllPeerNames()) {
            peer.send(peerName, (Writable)new GraphJobMessage(new IntWritable(this.vertices.size())));
        }
        peer.sync();
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            if (!msg.isVerticesSizeMessage()) continue;
            this.numberVertices += (long)msg.getVerticesSize().get();
        }
        if (GraphJobRunner.isMasterTask(peer)) {
            peer.getCounter((Enum)GraphJobCounter.INPUT_VERTICES).increment(this.numberVertices);
        }
    }

    private GraphJobMessage parseMessages(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        GraphJobMessage msg = null;
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null && !msg.isVertexMessage()) {
            if (msg.isMapMessage()) {
                for (Map.Entry e : msg.getMap().entrySet()) {
                    Text vertexID = (Text)e.getKey();
                    if (FLAG_MESSAGE_COUNTS.equals((Object)vertexID)) {
                        if (((IntWritable)e.getValue()).get() == Integer.MIN_VALUE) {
                            this.updated = false;
                            continue;
                        }
                        this.globalUpdateCounts += ((IntWritable)e.getValue()).get();
                        continue;
                    }
                    if (this.aggregationRunner.isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
                        this.aggregationRunner.masterReadAggregatedValue(vertexID, (Writable)e.getValue());
                        continue;
                    }
                    if (!this.aggregationRunner.isEnabled() || !vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) continue;
                    this.aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, (Writable)e.getValue());
                }
                continue;
            }
            throw new UnsupportedOperationException("Unknown message type: " + msg);
        }
        return msg;
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public final Partitioner<V, M> getPartitioner() {
        return this.partitioner;
    }

    public final Writable getLastAggregatedValue(int index) {
        return this.aggregationRunner.getLastAggregatedValue(index);
    }

    public final IntWritable getNumLastAggregatedVertices(int index) {
        return this.aggregationRunner.getNumLastAggregatedVertices(index);
    }

    public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer() {
        return this.peer;
    }

    public static boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName().equals(GraphJobRunner.getMasterTask(peer));
    }

    public static String getMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName(0);
    }

    public static <V extends WritableComparable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> vertexClass) {
        return (Vertex)org.apache.hama.util.ReflectionUtils.newInstance(vertexClass);
    }

    public static <X extends Writable> X createVertexIDObject() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_ID_CLASS));
    }

    public static <X extends Writable> X createVertexValue() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_VALUE_CLASS));
    }

    public static <X extends Writable> X createEdgeCostObject() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(EDGE_VALUE_CLASS));
    }

    public static enum GraphJobCounter {
        MULTISTEP_PARTITIONING,
        ITERATIONS,
        INPUT_VERTICES,
        AGGREGATE_VERTICES;

    }
}

