/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.leases.impl;

import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LeaseTaker<T extends Lease>
implements ILeaseTaker<T> {
    private static final Log LOG = LogFactory.getLog(LeaseTaker.class);
    private static final int TAKE_RETRIES = 3;
    private static final int SCAN_RETRIES = 1;
    private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = new Callable<Long>(){

        @Override
        public Long call() {
            return System.nanoTime();
        }
    };
    private final ILeaseManager<T> leaseManager;
    private final String workerIdentifier;
    private final Map<String, T> allLeases = new HashMap<String, T>();
    private final long leaseDurationNanos;
    private Random random = new Random();
    private long lastScanTimeNanos = 0L;

    public LeaseTaker(ILeaseManager<T> leaseManager, String workerIdentifier, long leaseDurationMillis) {
        this.leaseManager = leaseManager;
        this.workerIdentifier = workerIdentifier;
        this.leaseDurationNanos = leaseDurationMillis * 1000000L;
    }

    @Override
    public Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
        return this.takeLeases(SYSTEM_CLOCK_CALLABLE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized Map<String, T> takeLeases(Callable<Long> timeProvider) throws DependencyException, InvalidStateException {
        HashMap<String, Lease> takenLeases = new HashMap<String, Lease>();
        long startTime = System.currentTimeMillis();
        boolean success = false;
        ProvisionedThroughputException lastException = null;
        try {
            for (int i = 1; i <= 1; ++i) {
                try {
                    this.updateAllLeases(timeProvider);
                    success = true;
                    continue;
                }
                catch (ProvisionedThroughputException e) {
                    LOG.info((Object)String.format("Worker %s could not find expired leases on try %d out of %d", this.workerIdentifier, i, 3));
                    lastException = e;
                }
            }
        }
        finally {
            MetricsHelper.addSuccessAndLatency("ListLeases", startTime, success);
        }
        if (lastException != null) {
            LOG.error((Object)("Worker " + this.workerIdentifier + " could not scan leases table, aborting takeLeases. Exception caught by last retry:"), lastException);
            return takenLeases;
        }
        List<T> expiredLeases = this.getExpiredLeases();
        Set<T> leasesToTake = this.computeLeasesToTake(expiredLeases);
        HashSet<String> untakenLeaseKeys = new HashSet<String>();
        block11: for (Lease lease : leasesToTake) {
            String leaseKey = lease.getLeaseKey();
            startTime = System.currentTimeMillis();
            success = false;
            try {
                for (int i = 1; i <= 3; ++i) {
                    try {
                        if (this.leaseManager.takeLease(lease, this.workerIdentifier)) {
                            lease.setLastCounterIncrementNanos(System.nanoTime());
                            takenLeases.put(leaseKey, lease);
                        } else {
                            untakenLeaseKeys.add(leaseKey);
                        }
                        success = true;
                        continue block11;
                    }
                    catch (ProvisionedThroughputException e) {
                        LOG.info((Object)String.format("Could not take lease with key %s for worker %s on try %d out of %d due to capacity", leaseKey, this.workerIdentifier, i, 3));
                        continue;
                    }
                }
            }
            finally {
                MetricsHelper.addSuccessAndLatency("TakeLease", startTime, success);
            }
        }
        if (takenLeases.size() > 0) {
            LOG.info((Object)String.format("Worker %s successfully took %d leases: %s", this.workerIdentifier, takenLeases.size(), LeaseTaker.stringJoin(takenLeases.keySet(), ", ")));
        }
        if (untakenLeaseKeys.size() > 0) {
            LOG.info((Object)String.format("Worker %s failed to take %d leases: %s", this.workerIdentifier, untakenLeaseKeys.size(), LeaseTaker.stringJoin(untakenLeaseKeys, ", ")));
        }
        MetricsHelper.getMetricsScope().addData("TakenLeases", takenLeases.size(), StandardUnit.Count);
        return takenLeases;
    }

    static String stringJoin(Collection<String> strings, String delimiter) {
        StringBuilder builder = new StringBuilder();
        boolean needDelimiter = false;
        for (String string : strings) {
            if (needDelimiter) {
                builder.append(delimiter);
            }
            builder.append(string);
            needDelimiter = true;
        }
        return builder.toString();
    }

    private void updateAllLeases(Callable<Long> timeProvider) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        List<T> freshList = this.leaseManager.listLeases();
        try {
            this.lastScanTimeNanos = timeProvider.call();
        }
        catch (Exception e) {
            throw new DependencyException("Exception caught from timeProvider", e);
        }
        HashSet<String> notUpdated = new HashSet<String>(this.allLeases.keySet());
        for (Lease lease : freshList) {
            String leaseKey = lease.getLeaseKey();
            Lease oldLease = (Lease)this.allLeases.get(leaseKey);
            this.allLeases.put(leaseKey, lease);
            notUpdated.remove(leaseKey);
            if (oldLease != null) {
                if (oldLease.getLeaseCounter().equals(lease.getLeaseCounter())) {
                    lease.setLastCounterIncrementNanos(oldLease.getLastCounterIncrementNanos());
                    continue;
                }
                lease.setLastCounterIncrementNanos(this.lastScanTimeNanos);
                continue;
            }
            if (lease.getLeaseOwner() == null) {
                lease.setLastCounterIncrementNanos(0L);
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug((Object)("Treating new lease with key " + leaseKey + " as never renewed because it is new and unowned."));
                continue;
            }
            lease.setLastCounterIncrementNanos(this.lastScanTimeNanos);
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug((Object)("Treating new lease with key " + leaseKey + " as recently renewed because it is new and owned."));
        }
        for (String key : notUpdated) {
            this.allLeases.remove(key);
        }
    }

    private List<T> getExpiredLeases() {
        ArrayList<Lease> expiredLeases = new ArrayList<Lease>();
        for (Lease lease : this.allLeases.values()) {
            if (!lease.isExpired(this.leaseDurationNanos, this.lastScanTimeNanos)) continue;
            expiredLeases.add(lease);
        }
        return expiredLeases;
    }

    private Set<T> computeLeasesToTake(List<T> expiredLeases) {
        int myCount;
        Map<String, Integer> leaseCounts = this.computeLeaseCounts(expiredLeases);
        HashSet<T> leasesToTake = new HashSet<T>();
        int numLeases = this.allLeases.size();
        int numWorkers = leaseCounts.size();
        if (numLeases == 0) {
            return leasesToTake;
        }
        int target = numWorkers >= numLeases ? 1 : numLeases / numWorkers + (numLeases % numWorkers == 0 ? 0 : 1);
        int numLeasesToReachTarget = target - (myCount = leaseCounts.get(this.workerIdentifier).intValue());
        if (numLeasesToReachTarget <= 0) {
            return leasesToTake;
        }
        Collections.shuffle(expiredLeases);
        int originalExpiredLeasesSize = expiredLeases.size();
        if (expiredLeases.size() > 0) {
            while (numLeasesToReachTarget > 0 && expiredLeases.size() > 0) {
                leasesToTake.add(expiredLeases.remove(0));
                --numLeasesToReachTarget;
            }
        } else {
            T leaseToSteal = this.chooseLeaseToSteal(leaseCounts, numLeasesToReachTarget, target);
            if (leaseToSteal != null) {
                LOG.info((Object)String.format("Worker %s needed %d leases but none were expired, so it will steal lease %s from %s", this.workerIdentifier, numLeasesToReachTarget, ((Lease)leaseToSteal).getLeaseKey(), ((Lease)leaseToSteal).getLeaseOwner()));
                leasesToTake.add(leaseToSteal);
            }
        }
        if (!leasesToTake.isEmpty()) {
            LOG.info((Object)String.format("Worker %s saw %d total leases, %d available leases, %d workers. Target is %d leases, I have %d leases, I will take %d leases", this.workerIdentifier, numLeases, originalExpiredLeasesSize, numWorkers, target, myCount, leasesToTake.size()));
        }
        IMetricsScope metrics = MetricsHelper.getMetricsScope();
        metrics.addData("TotalLeases", numLeases, StandardUnit.Count);
        metrics.addData("ExpiredLeases", originalExpiredLeasesSize, StandardUnit.Count);
        metrics.addData("NumWorkers", numWorkers, StandardUnit.Count);
        metrics.addData("NeededLeases", numLeasesToReachTarget, StandardUnit.Count);
        metrics.addData("LeasesToTake", leasesToTake.size(), StandardUnit.Count);
        return leasesToTake;
    }

    private T chooseLeaseToSteal(Map<String, Integer> leaseCounts, int needed, int target) {
        Map.Entry<String, Integer> mostLoadedWorker = null;
        for (Map.Entry<String, Integer> worker : leaseCounts.entrySet()) {
            if (mostLoadedWorker != null && (Integer)mostLoadedWorker.getValue() >= worker.getValue()) continue;
            mostLoadedWorker = worker;
        }
        if ((Integer)mostLoadedWorker.getValue() < target + (needed > 1 ? 0 : 1)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Worker %s not stealing from most loaded worker %s.  He has %d, target is %d, and I need %d", this.workerIdentifier, mostLoadedWorker.getKey(), mostLoadedWorker.getValue(), target, needed));
            }
            return null;
        }
        String mostLoadedWorkerIdentifier = mostLoadedWorker.getKey();
        ArrayList<Lease> candidates = new ArrayList<Lease>();
        for (Lease lease : this.allLeases.values()) {
            if (!mostLoadedWorkerIdentifier.equals(lease.getLeaseOwner())) continue;
            candidates.add(lease);
        }
        int randomIndex = this.random.nextInt(candidates.size());
        return (T)((Lease)candidates.get(randomIndex));
    }

    private Map<String, Integer> computeLeaseCounts(List<T> expiredLeases) {
        HashMap<String, Integer> leaseCounts = new HashMap<String, Integer>();
        for (Lease lease : this.allLeases.values()) {
            if (expiredLeases.contains(lease)) continue;
            String leaseOwner = lease.getLeaseOwner();
            Integer oldCount = (Integer)leaseCounts.get(leaseOwner);
            if (oldCount == null) {
                leaseCounts.put(leaseOwner, 1);
                continue;
            }
            leaseCounts.put(leaseOwner, oldCount + 1);
        }
        Integer myCount = (Integer)leaseCounts.get(this.workerIdentifier);
        if (myCount == null) {
            myCount = 0;
            leaseCounts.put(this.workerIdentifier, myCount);
        }
        return leaseCounts;
    }

    @Override
    public String getWorkerIdentifier() {
        return this.workerIdentifier;
    }
}

