/*
 * Decompiled with CFR 0.152.
 */
package org.simpleframework.transport.reactor;

import java.io.IOException;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.simpleframework.common.thread.Daemon;
import org.simpleframework.transport.reactor.Action;
import org.simpleframework.transport.reactor.ActionSelector;
import org.simpleframework.transport.reactor.ActionSet;
import org.simpleframework.transport.reactor.CancelAction;
import org.simpleframework.transport.reactor.ExecuteAction;
import org.simpleframework.transport.reactor.Latch;
import org.simpleframework.transport.reactor.Operation;
import org.simpleframework.transport.reactor.OperationDistributor;
import org.simpleframework.transport.reactor.ReactorEvent;
import org.simpleframework.transport.trace.Trace;

class ActionDistributor
extends Daemon
implements OperationDistributor {
    private Map<Channel, ActionSet> executing;
    private Map<Channel, ActionSet> selecting = new LinkedHashMap<Channel, ActionSet>();
    private Queue<Channel> invalid;
    private Queue<Action> pending;
    private ActionSelector selector;
    private Executor executor;
    private Latch latch;
    private long expiry;
    private long update;
    private boolean cancel;

    public ActionDistributor(Executor executor) throws IOException {
        this(executor, true);
    }

    public ActionDistributor(Executor executor, boolean cancel) throws IOException {
        this(executor, cancel, 120000L);
    }

    public ActionDistributor(Executor executor, boolean cancel, long expiry) throws IOException {
        this.executing = new LinkedHashMap<Channel, ActionSet>();
        this.pending = new ConcurrentLinkedQueue<Action>();
        this.invalid = new ConcurrentLinkedQueue<Channel>();
        this.selector = new ActionSelector();
        this.latch = new Latch();
        this.executor = executor;
        this.cancel = cancel;
        this.expiry = expiry;
        this.start();
    }

    public void process(Operation task, int require) throws IOException {
        ExecuteAction action = new ExecuteAction(task, require, this.expiry);
        if (!this.isActive()) {
            throw new IOException("Distributor is closed");
        }
        this.pending.offer(action);
        this.selector.wake();
    }

    public void close() throws IOException {
        this.stop();
        this.selector.wake();
        this.latch.close();
    }

    public int size() {
        return this.selecting.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            this.execute();
        }
        finally {
            this.purge();
        }
    }

    private void execute() {
        while (this.isActive()) {
            try {
                this.register();
                this.cancel();
                this.expire();
                this.distribute();
                this.validate();
            }
            catch (Exception cause) {
                this.report(cause);
            }
        }
    }

    private void purge() {
        try {
            this.register();
            this.cancel();
            this.clear();
        }
        catch (Exception cause) {
            this.report(cause);
        }
    }

    private void report(Exception cause) {
        Set<Channel> channels = this.selecting.keySet();
        for (Channel channel : channels) {
            Action[] list;
            ActionSet set = this.selecting.get(channel);
            for (Action action : list = set.list()) {
                Operation operation = action.getOperation();
                Trace trace = operation.getTrace();
                try {
                    trace.trace((Object)ReactorEvent.ERROR, cause);
                }
                catch (Exception e) {
                    this.invalid.offer(channel);
                }
            }
        }
        this.invalid.clear();
    }

    private void clear() throws IOException {
        List<ActionSet> sets = this.selector.registeredSets();
        for (ActionSet set : sets) {
            Action[] list;
            for (Action action : list = set.list()) {
                Operation task = action.getOperation();
                Trace trace = task.getTrace();
                try {
                    trace.trace((Object)ReactorEvent.CLOSE_SELECTOR);
                    this.expire(set, Long.MAX_VALUE);
                }
                catch (Exception cause) {
                    trace.trace((Object)ReactorEvent.ERROR, cause);
                }
            }
        }
        this.selector.close();
        this.latch.signal();
    }

    private void expire() throws IOException {
        long time;
        List<ActionSet> sets = this.selector.registeredSets();
        if (this.cancel && this.update <= (time = System.currentTimeMillis())) {
            for (ActionSet set : sets) {
                this.expire(set, time);
            }
            this.update = time + 10000L;
        }
    }

    private void expire(ActionSet set, long time) throws IOException {
        Action[] actions = set.list();
        SelectionKey key = set.key();
        if (key.isValid()) {
            int mask = key.interestOps();
            for (Action action : actions) {
                int interest = action.getInterest();
                long expiry = action.getExpiry();
                if (expiry >= time) continue;
                this.expire(set, action);
                mask &= ~interest;
            }
            this.update(set, mask);
        }
    }

    private void update(ActionSet set, int interest) throws IOException {
        SelectionKey key = set.key();
        if (interest == 0) {
            SelectableChannel channel = key.channel();
            this.selecting.remove(channel);
            key.cancel();
        } else {
            key.interestOps(interest);
        }
    }

    private void expire(ActionSet set, Action action) throws IOException {
        CancelAction cancel = new CancelAction(action);
        if (set != null) {
            Operation task = action.getOperation();
            Trace trace = task.getTrace();
            int interest = action.getInterest();
            try {
                trace.trace((Object)ReactorEvent.SELECT_EXPIRED, interest);
                set.remove(interest);
                this.execute(cancel);
            }
            catch (Exception cause) {
                trace.trace((Object)ReactorEvent.ERROR, cause);
            }
        }
    }

    private void validate() throws IOException {
        Set<Channel> channels = this.selecting.keySet();
        for (Channel channel : channels) {
            ActionSet set = this.selecting.get(channel);
            SelectionKey key = set.key();
            if (key.isValid()) continue;
            this.invalid.offer(channel);
        }
        for (Channel channel : this.invalid) {
            this.invalidate(channel);
        }
        this.invalid.clear();
    }

    private void invalidate(Channel channel) throws IOException {
        Action[] list;
        ActionSet set = this.selecting.remove(channel);
        for (Action action : list = set.list()) {
            Operation task = action.getOperation();
            Trace trace = task.getTrace();
            try {
                trace.trace((Object)ReactorEvent.INVALID_KEY);
                this.execute(action);
            }
            catch (Exception cause) {
                trace.trace((Object)ReactorEvent.ERROR, cause);
            }
        }
    }

    private void cancel() throws IOException {
        Collection<ActionSet> list = this.executing.values();
        for (ActionSet set : list) {
            Action[] actions;
            for (Action action : actions = set.list()) {
                Operation task = action.getOperation();
                Trace trace = task.getTrace();
                trace.trace((Object)ReactorEvent.SELECT_CANCEL);
            }
            set.cancel();
            set.clear();
        }
        this.executing.clear();
    }

    private void register() throws IOException {
        while (!this.pending.isEmpty()) {
            Action action = this.pending.poll();
            if (action == null) continue;
            SelectableChannel channel = action.getChannel();
            ActionSet set = this.executing.remove(channel);
            if (set == null) {
                set = this.selecting.get(channel);
            }
            if (set != null) {
                this.update(action, set);
                continue;
            }
            this.register(action);
        }
    }

    private void register(Action action) throws IOException {
        SelectableChannel channel = action.getChannel();
        Operation task = action.getOperation();
        Trace trace = task.getTrace();
        try {
            if (channel.isOpen()) {
                trace.trace((Object)ReactorEvent.SELECT);
                this.select(action);
            } else {
                trace.trace((Object)ReactorEvent.CHANNEL_CLOSED);
                this.selecting.remove(channel);
                this.execute(action);
            }
        }
        catch (Exception cause) {
            trace.trace((Object)ReactorEvent.ERROR, cause);
        }
    }

    private void update(Action action, ActionSet set) throws IOException {
        Operation task = action.getOperation();
        Trace trace = task.getTrace();
        SelectionKey key = set.key();
        int interest = action.getInterest();
        int current = key.interestOps();
        int updated = current | interest;
        try {
            if (1 == (interest & 1)) {
                trace.trace((Object)ReactorEvent.UPDATE_READ_INTEREST);
            }
            if (4 == (interest & 4)) {
                trace.trace((Object)ReactorEvent.UPDATE_WRITE_INTEREST);
            }
            trace.trace((Object)ReactorEvent.UPDATE_INTEREST, updated);
            key.interestOps(updated);
            set.attach(action);
        }
        catch (Exception cause) {
            trace.trace((Object)ReactorEvent.ERROR, cause);
        }
    }

    private void select(Action action) throws IOException {
        SelectableChannel channel = action.getChannel();
        Operation task = action.getOperation();
        Trace trace = task.getTrace();
        int interest = action.getInterest();
        if (interest > 0) {
            ActionSet set = this.selector.register(channel, interest);
            if (1 == (interest & 1)) {
                trace.trace((Object)ReactorEvent.REGISTER_READ_INTEREST);
            }
            if (4 == (interest & 4)) {
                trace.trace((Object)ReactorEvent.REGISTER_WRITE_INTEREST);
            }
            trace.trace((Object)ReactorEvent.REGISTER_INTEREST, interest);
            set.attach(action);
            this.selecting.put(channel, set);
        }
    }

    private void distribute() throws IOException {
        if (this.selector.select(5000L) > 0 && this.isActive()) {
            this.process();
        }
    }

    private void process() throws IOException {
        List<ActionSet> ready = this.selector.selectedSets();
        for (ActionSet set : ready) {
            this.process(set);
            this.remove(set);
        }
    }

    private void process(ActionSet set) throws IOException {
        Action[] actions;
        for (Action action : actions = set.ready()) {
            Operation task = action.getOperation();
            Trace trace = task.getTrace();
            int interest = action.getInterest();
            try {
                if (1 == (interest & 1)) {
                    trace.trace((Object)ReactorEvent.READ_INTEREST_READY, interest);
                }
                if (4 == (interest & 4)) {
                    trace.trace((Object)ReactorEvent.WRITE_INTEREST_READY, interest);
                }
                this.execute(action);
            }
            catch (Exception cause) {
                trace.trace((Object)ReactorEvent.ERROR, cause);
            }
        }
    }

    private void remove(ActionSet set) throws IOException {
        SelectableChannel channel = set.channel();
        SelectionKey key = set.key();
        if (key.isValid()) {
            int interest = set.interest();
            int ready = key.readyOps();
            if (this.cancel) {
                int remaining = interest & ~ready;
                if (remaining == 0) {
                    this.executing.put(channel, set);
                } else {
                    key.interestOps(remaining);
                }
                set.remove(ready);
            }
        } else {
            this.selecting.remove(channel);
        }
    }

    private void execute(Action action) {
        Operation task = action.getOperation();
        Trace trace = task.getTrace();
        int interest = action.getInterest();
        try {
            trace.trace((Object)ReactorEvent.EXECUTE_ACTION, interest);
            this.executor.execute(action);
        }
        catch (Exception cause) {
            trace.trace((Object)ReactorEvent.ERROR, cause);
        }
    }
}

