/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.shipping;

import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.runtime.io.network.api.ChannelSelector;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.plugable.SerializationDelegate;

public class OutputEmitter<T>
implements ChannelSelector<SerializationDelegate<T>> {
    private final ShipStrategyType strategy;
    private int[] channels;
    private int nextChannelToSendTo = 0;
    private final TypeComparator<T> comparator;
    private final Partitioner<Object> partitioner;
    private Object[] extractedKeys;

    public OutputEmitter() {
        this(ShipStrategyType.NONE);
    }

    public OutputEmitter(ShipStrategyType strategy) {
        this(strategy, null);
    }

    public OutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
        this(strategy, comparator, null, null);
    }

    public OutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) {
        this(strategy, comparator, null, distr);
    }

    public OutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, Partitioner<?> partitioner) {
        this(strategy, comparator, partitioner, null);
    }

    public OutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, Partitioner<?> partitioner, DataDistribution distr) {
        if (strategy == null) {
            throw new NullPointerException();
        }
        this.strategy = strategy;
        this.comparator = comparator;
        this.partitioner = partitioner;
        switch (strategy) {
            case FORWARD: 
            case PARTITION_HASH: 
            case PARTITION_RANGE: 
            case PARTITION_RANDOM: 
            case PARTITION_FORCED_REBALANCE: 
            case PARTITION_CUSTOM: 
            case BROADCAST: {
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name());
            }
        }
        if (strategy == ShipStrategyType.PARTITION_RANGE && distr == null) {
            throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning.");
        }
        if (strategy == ShipStrategyType.PARTITION_CUSTOM && partitioner == null) {
            throw new NullPointerException("Partitioner must not be null when the ship strategy is set to custom partitioning.");
        }
    }

    @Override
    public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
        switch (this.strategy) {
            case FORWARD: 
            case PARTITION_RANDOM: 
            case PARTITION_FORCED_REBALANCE: {
                return this.robin(numberOfChannels);
            }
            case PARTITION_HASH: {
                return this.hashPartitionDefault(record.getInstance(), numberOfChannels);
            }
            case BROADCAST: {
                return this.broadcast(numberOfChannels);
            }
            case PARTITION_CUSTOM: {
                return this.customPartition(record.getInstance(), numberOfChannels);
            }
            case PARTITION_RANGE: {
                return this.rangePartition(record.getInstance(), numberOfChannels);
            }
        }
        throw new UnsupportedOperationException("Unsupported distribution strategy: " + this.strategy.name());
    }

    private final int[] robin(int numberOfChannels) {
        int nextChannel;
        if (this.channels == null || this.channels.length != 1) {
            this.channels = new int[1];
        }
        this.nextChannelToSendTo = nextChannel = (nextChannel = this.nextChannelToSendTo + 1) < numberOfChannels ? nextChannel : 0;
        this.channels[0] = nextChannel;
        return this.channels;
    }

    private final int[] broadcast(int numberOfChannels) {
        if (this.channels == null || this.channels.length != numberOfChannels) {
            this.channels = new int[numberOfChannels];
            for (int i = 0; i < numberOfChannels; ++i) {
                this.channels[i] = i;
            }
        }
        return this.channels;
    }

    private final int[] hashPartitionDefault(T record, int numberOfChannels) {
        if (this.channels == null || this.channels.length != 1) {
            this.channels = new int[1];
        }
        int hash = this.comparator.hash(record);
        this.channels[0] = (hash = this.murmurHash(hash)) >= 0 ? hash % numberOfChannels : (hash != Integer.MIN_VALUE ? -hash % numberOfChannels : 0);
        return this.channels;
    }

    private final int murmurHash(int k) {
        k *= -862048943;
        k = Integer.rotateLeft(k, 15);
        k *= 461845907;
        k = Integer.rotateLeft(k, 13);
        k *= -430675100;
        k ^= 4;
        k ^= k >>> 16;
        k *= -2048144789;
        k ^= k >>> 13;
        k *= -1028477387;
        k ^= k >>> 16;
        return k;
    }

    private final int[] rangePartition(T record, int numberOfChannels) {
        throw new UnsupportedOperationException();
    }

    private final int[] customPartition(T record, int numberOfChannels) {
        if (this.channels == null) {
            this.channels = new int[1];
            this.extractedKeys = new Object[1];
        }
        try {
            if (this.comparator.extractKeys(record, this.extractedKeys, 0) == 1) {
                Object key = this.extractedKeys[0];
                this.channels[0] = this.partitioner.partition(key, numberOfChannels);
                return this.channels;
            }
            throw new RuntimeException("Inconsistency in the key comparator - comparator extracted more than one field.");
        }
        catch (Throwable t) {
            throw new RuntimeException("Error while calling custom partitioner.", t);
        }
    }
}

