/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.OutgoingMessageManager;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.util.BSPNetUtils;

public class OutgoingVertexMessagesManager<M extends Writable>
implements OutgoingMessageManager<GraphJobMessage> {
    protected static final Log LOG = LogFactory.getLog(OutgoingVertexMessagesManager.class);
    private HamaConfiguration conf;
    private BSPMessageCompressor<GraphJobMessage> compressor;
    private Combiner<Writable> combiner;
    private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap();
    private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles = new HashMap();
    private HashMap<InetSocketAddress, Map<WritableComparable, Writable>> vertexMessageMap = new HashMap();
    private List<Writable> tmp;

    public void init(HamaConfiguration conf, BSPMessageCompressor<GraphJobMessage> compressor) {
        this.conf = conf;
        this.compressor = compressor;
        if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(Combiner.class)) {
            LOG.debug((Object)("Combiner class: " + conf.get("bsp.combiner.class")));
            this.combiner = (Combiner)ReflectionUtils.newInstance((Class)conf.getClass("bsp.combiner.class", Combiner.class), (Configuration)conf);
        }
    }

    public void addMessage(String peerName, GraphJobMessage msg) {
        InetSocketAddress targetPeerAddress = this.getSocketAddress(peerName);
        if (msg.isVertexMessage() && this.combiner != null) {
            Map<WritableComparable, Writable> combinedMessage;
            WritableComparable vertexID = msg.getVertexId();
            Writable vertexValue = msg.getVertexValue();
            if (!this.vertexMessageMap.containsKey(targetPeerAddress)) {
                this.vertexMessageMap.put(targetPeerAddress, new HashMap());
            }
            if ((combinedMessage = this.vertexMessageMap.get(targetPeerAddress)).containsKey(vertexID)) {
                this.tmp = new ArrayList<Writable>();
                this.tmp.add(combinedMessage.get(vertexID));
                this.tmp.add(vertexValue);
                Iterable<Writable> iterable = new Iterable<Writable>(){

                    @Override
                    public Iterator<Writable> iterator() {
                        return OutgoingVertexMessagesManager.this.tmp.iterator();
                    }
                };
                combinedMessage.put(vertexID, this.combiner.combine((Iterable)iterable));
            } else {
                combinedMessage.put(vertexID, vertexValue);
            }
        } else {
            this.outgoingBundles.get(targetPeerAddress).addMessage((Writable)msg);
        }
    }

    private InetSocketAddress getSocketAddress(String peerName) {
        InetSocketAddress targetPeerAddress = null;
        if (this.peerSocketCache.containsKey(peerName)) {
            targetPeerAddress = this.peerSocketCache.get(peerName);
        } else {
            targetPeerAddress = BSPNetUtils.getAddress((String)peerName);
            this.peerSocketCache.put(peerName, targetPeerAddress);
        }
        if (!this.outgoingBundles.containsKey(targetPeerAddress)) {
            BSPMessageBundle bundle = new BSPMessageBundle();
            bundle.setCompressor(this.compressor, this.conf.getLong("hama.messenger.compression.threshold", 128L));
            this.outgoingBundles.put(targetPeerAddress, (BSPMessageBundle<GraphJobMessage>)bundle);
        }
        return targetPeerAddress;
    }

    public void clear() {
        this.outgoingBundles.clear();
        this.vertexMessageMap.clear();
    }

    public Iterator<Map.Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>> getBundleIterator() {
        if (this.combiner != null) {
            for (Map.Entry<InetSocketAddress, Map<WritableComparable, Writable>> e : this.vertexMessageMap.entrySet()) {
                for (Map.Entry<WritableComparable, Writable> v : e.getValue().entrySet()) {
                    this.outgoingBundles.get(e.getKey()).addMessage((Writable)new GraphJobMessage(v.getKey(), v.getValue()));
                }
            }
        }
        this.vertexMessageMap.clear();
        return this.outgoingBundles.entrySet().iterator();
    }
}

