/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.proton.messenger.impl;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.InterruptException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.TimeoutException;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
import org.apache.qpid.proton.messenger.Status;
import org.apache.qpid.proton.messenger.Tracker;
import org.apache.qpid.proton.messenger.impl.TrackerImpl;
import org.apache.qpid.proton.messenger.impl.TrackerQueue;

public class MessengerImpl
implements Messenger {
    private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
    private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
    private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
    private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
    private final Logger _logger = Logger.getLogger("proton.messenger");
    private final String _name;
    private long _timeout = -1L;
    private boolean _blocking = true;
    private long _nextTag = 1L;
    private byte[] _buffer = new byte[5120];
    private Driver _driver;
    private int _receiving = 0;
    private static final int _creditBatch = 1024;
    private int _credit;
    private int _distributed;
    private TrackerQueue _incoming = new TrackerQueue();
    private TrackerQueue _outgoing = new TrackerQueue();
    private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
    private boolean _worked = false;
    private final SentSettled _sentSettled = new SentSettled();
    private final MessageAvailable _messageAvailable = new MessageAvailable();
    private final AllClosed _allClosed = new AllClosed();
    private final WorkPred _workPred = new WorkPred();

    @Deprecated
    public MessengerImpl() {
        this(UUID.randomUUID().toString());
    }

    @Deprecated
    public MessengerImpl(String name) {
        this._name = name;
    }

    @Override
    public void setTimeout(long timeInMillis) {
        this._timeout = timeInMillis;
    }

    @Override
    public long getTimeout() {
        return this._timeout;
    }

    @Override
    public boolean isBlocking() {
        return this._blocking;
    }

    @Override
    public void setBlocking(boolean b) {
        this._blocking = b;
    }

    @Override
    public void start() throws IOException {
        this._driver = Proton.driver();
    }

    @Override
    public void stop() {
        if (this._driver != null) {
            if (this._logger.isLoggable(Level.FINE)) {
                this._logger.fine(this + " about to stop");
            }
            for (Connector c : this._driver.connectors()) {
                Connection connection = c.getConnection();
                connection.close();
            }
            for (Listener l : this._driver.listeners()) {
                try {
                    l.close();
                }
                catch (IOException e) {
                    this._logger.log(Level.WARNING, "Error while closing listener", e);
                }
            }
            this.waitUntil(this._allClosed);
        }
    }

    @Override
    public boolean stopped() {
        return this._allClosed.test();
    }

    @Override
    public boolean work(long timeout) {
        if (this._driver == null) {
            return false;
        }
        this._worked = false;
        return this.waitUntil(this._workPred, timeout);
    }

    @Override
    public void interrupt() {
        if (this._driver != null) {
            this._driver.wakeup();
        }
    }

    @Override
    public void put(Message m) throws MessengerException {
        if (this._driver == null) {
            throw new IllegalStateException("cannot put while messenger is stopped");
        }
        if (this._logger.isLoggable(Level.FINE)) {
            this._logger.fine(this + " about to put message: " + m);
        }
        try {
            int encoded;
            URI address = new URI(m.getAddress());
            if (address.getHost() == null) {
                throw new MessengerException("unable to send to address: " + m.getAddress());
            }
            int port = address.getPort() < 0 ? MessengerImpl.defaultPort(address.getScheme()) : address.getPort();
            Sender sender = this.getLink(address.getHost(), port, new SenderFinder(MessengerImpl.cleanPath(address.getPath())));
            this.adjustReplyTo(m);
            byte[] tag = String.valueOf(this._nextTag++).getBytes();
            Delivery delivery = sender.delivery(tag);
            while (true) {
                try {
                    encoded = m.encode(this._buffer, 0, this._buffer.length);
                }
                catch (BufferOverflowException e) {
                    this._buffer = new byte[this._buffer.length * 2];
                    continue;
                }
                break;
            }
            sender.send(this._buffer, 0, encoded);
            this._outgoing.add(delivery);
            sender.advance();
        }
        catch (URISyntaxException e) {
            throw new MessengerException("Invalid address: " + m.getAddress(), e);
        }
    }

    @Override
    public void send() throws TimeoutException {
        this.send(-1);
    }

    @Override
    public void send(int n) throws TimeoutException {
        if (this._driver == null) {
            throw new IllegalStateException("cannot send while messenger is stopped");
        }
        if (this._logger.isLoggable(Level.FINE)) {
            this._logger.fine(this + " about to send");
        }
        this.waitUntil(this._sentSettled);
    }

    @Override
    public void recv(int n) throws TimeoutException {
        if (this._driver == null) {
            throw new IllegalStateException("cannot recv while messenger is stopped");
        }
        if (this._logger.isLoggable(Level.FINE)) {
            this._logger.fine(this + " about to wait for up to " + n + " messages to be received");
        }
        this._receiving = n;
        this.distributeCredit();
        this.waitUntil(this._messageAvailable);
    }

    @Override
    public void recv() throws TimeoutException {
        this.recv(-1);
    }

    public int receiving() {
        return this._receiving;
    }

    @Override
    public Message get() {
        if (this._driver != null) {
            for (Connector c : this._driver.connectors()) {
                Connection connection = c.getConnection();
                this._logger.log(Level.FINE, "Attempting to get message from " + connection);
                for (Delivery delivery = connection.getWorkHead(); delivery != null; delivery = delivery.getWorkNext()) {
                    if (delivery.isReadable() && !delivery.isPartial()) {
                        this._logger.log(Level.FINE, "Readable delivery found: " + delivery);
                        int size2 = this.read((Receiver)delivery.getLink());
                        Message message = Proton.message();
                        message.decode(this._buffer, 0, size2);
                        delivery.getLink().advance();
                        this._incoming.add(delivery);
                        --this._distributed;
                        return message;
                    }
                    this._logger.log(Level.FINE, "Delivery not readable: " + delivery);
                }
            }
        }
        return null;
    }

    @Override
    public void subscribe(String source) throws MessengerException {
        if (this._driver == null) {
            throw new IllegalStateException("messenger is stopped");
        }
        boolean listen = source.contains("~");
        try {
            int port;
            URI address = new URI(listen ? source.replace("~", "") : source);
            String hostName = address.getHost();
            if (hostName == null) {
                throw new MessengerException("Invalid source address (hostname cannot be null): " + source);
            }
            int n = port = address.getPort() < 0 ? MessengerImpl.defaultPort(address.getScheme()) : address.getPort();
            if (listen) {
                if (this._logger.isLoggable(Level.FINE)) {
                    this._logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
                }
                this._driver.createListener(hostName, port, null);
            } else {
                if (this._logger.isLoggable(Level.FINE)) {
                    this._logger.fine(this + " about to subscribe to source " + source);
                }
                this.getLink(hostName, port, new ReceiverFinder(MessengerImpl.cleanPath(address.getPath())));
            }
        }
        catch (URISyntaxException e) {
            throw new MessengerException("Invalid source: " + source, e);
        }
    }

    @Override
    public int outgoing() {
        return this.queued(true);
    }

    @Override
    public int incoming() {
        return this.queued(false);
    }

    @Override
    public int getIncomingWindow() {
        return this._incoming.getWindow();
    }

    @Override
    public void setIncomingWindow(int window) {
        this._incoming.setWindow(window);
    }

    @Override
    public int getOutgoingWindow() {
        return this._outgoing.getWindow();
    }

    @Override
    public void setOutgoingWindow(int window) {
        this._outgoing.setWindow(window);
    }

    @Override
    public Tracker incomingTracker() {
        return new TrackerImpl(false, this._incoming.getHighWaterMark() - 1);
    }

    @Override
    public Tracker outgoingTracker() {
        return new TrackerImpl(true, this._outgoing.getHighWaterMark() - 1);
    }

    private TrackerQueue getTrackerQueue(Tracker tracker) {
        return TrackerQueue.isOutgoing(tracker) ? this._outgoing : this._incoming;
    }

    @Override
    public void reject(Tracker tracker, int flags) {
        this.getTrackerQueue(tracker).reject(tracker, flags);
    }

    @Override
    public void accept(Tracker tracker, int flags) {
        this.getTrackerQueue(tracker).accept(tracker, flags);
    }

    @Override
    public void settle(Tracker tracker, int flags) {
        this.getTrackerQueue(tracker).settle(tracker, flags);
    }

    @Override
    public Status getStatus(Tracker tracker) {
        return this.getTrackerQueue(tracker).getStatus(tracker);
    }

    private int queued(boolean outgoing) {
        int count = 0;
        if (this._driver != null) {
            for (Connector c : this._driver.connectors()) {
                Connection connection = c.getConnection();
                for (Link link2 : new Links(connection, ACTIVE, ANY)) {
                    if (outgoing) {
                        if (!(link2 instanceof Sender)) continue;
                        count += link2.getQueued();
                        continue;
                    }
                    if (!(link2 instanceof Receiver)) continue;
                    count += link2.getQueued();
                }
            }
        }
        return count;
    }

    private int read(Receiver receiver) {
        Delivery dlv = receiver.current();
        if (dlv.isPartial()) {
            throw new IllegalStateException();
        }
        int size2 = dlv.pending();
        while (this._buffer.length < size2) {
            this._buffer = new byte[this._buffer.length * 2];
        }
        int read2 = receiver.recv(this._buffer, 0, this._buffer.length);
        if (read2 != size2) {
            throw new IllegalStateException();
        }
        return size2;
    }

    private void bringDestruction() {
        for (Connector c : this._awaitingDestruction) {
            c.destroy();
        }
        this._awaitingDestruction.clear();
    }

    private void processAllConnectors() {
        this.distributeCredit();
        for (Connector c : this._driver.connectors()) {
            this.processEndpoints(c);
            try {
                if (!c.process()) continue;
                this._worked = true;
            }
            catch (IOException e) {
                this._logger.log(Level.SEVERE, "Error processing connection", e);
            }
        }
        this.bringDestruction();
        this.distributeCredit();
    }

    private void processActive() {
        Listener l = this._driver.listener();
        while (l != null) {
            this._worked = true;
            Connector c = l.accept();
            Connection connection = Proton.connection();
            connection.setContainer(this._name);
            c.setConnection(connection);
            Sasl sasl = c.sasl();
            if (sasl != null) {
                sasl.server();
                sasl.setMechanisms("ANONYMOUS");
                sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
            }
            connection.open();
            l = this._driver.listener();
        }
        Connector c = this._driver.connector();
        while (c != null) {
            this._worked = true;
            this._logger.log(Level.FINE, "Processing active connector " + c);
            try {
                c.process();
            }
            catch (IOException e) {
                this._logger.log(Level.SEVERE, "Error processing connection", e);
            }
            this.processEndpoints(c);
            if (c.isClosed()) {
                this._awaitingDestruction.add(c);
                this.reclaimCredit(c.getConnection());
            } else {
                try {
                    c.process();
                }
                catch (IOException e) {
                    this._logger.log(Level.SEVERE, "Error processing connection", e);
                }
            }
            c = this._driver.connector();
        }
        this.bringDestruction();
        this.distributeCredit();
    }

    private void processEndpoints(Connector c) {
        Connection connection = c.getConnection();
        if (connection.getLocalState() == EndpointState.UNINITIALIZED) {
            connection.open();
        }
        for (Delivery delivery = connection.getWorkHead(); delivery != null; delivery = delivery.getWorkNext()) {
            if (!(delivery.getLink() instanceof Sender) || !delivery.isUpdated()) continue;
            delivery.disposition(delivery.getRemoteState());
        }
        this._outgoing.slide();
        for (Session session : new Sessions(connection, UNINIT, ANY)) {
            session.open();
            this._logger.log(Level.FINE, "Opened session " + session);
        }
        for (Link link2 : new Links(connection, UNINIT, ANY)) {
            link2.setSource(link2.getRemoteSource());
            link2.setTarget(link2.getRemoteTarget());
            link2.open();
            this._logger.log(Level.FINE, "Opened link " + link2);
        }
        this.distributeCredit();
        for (Link link2 : new Links(connection, ACTIVE, CLOSED)) {
            link2.close();
        }
        for (Session session : new Sessions(connection, ACTIVE, CLOSED)) {
            session.close();
        }
        if (connection.getRemoteState() == EndpointState.CLOSED && connection.getLocalState() == EndpointState.ACTIVE) {
            connection.close();
        }
    }

    private boolean waitUntil(Predicate condition) throws TimeoutException {
        if (this._blocking) {
            boolean done = this.waitUntil(condition, this._timeout);
            if (!done) {
                this._logger.log(Level.SEVERE, String.format("Timeout when waiting for condition %s after %s ms", condition, this._timeout));
                throw new TimeoutException();
            }
            return done;
        }
        return this.waitUntil(condition, 0L);
    }

    private boolean waitUntil(Predicate condition, long timeout) {
        if (this._driver == null) {
            throw new IllegalStateException("cannot wait while messenger is stopped");
        }
        this.processAllConnectors();
        long now = System.currentTimeMillis();
        long deadline = timeout < 0L ? Long.MAX_VALUE : now + timeout;
        boolean done = false;
        while (true) {
            done = condition.test();
            long remaining = deadline - now;
            if (done || timeout >= 0L && remaining < 0L) break;
            boolean woken = this._driver.doWait(remaining);
            this.processActive();
            if (woken) {
                throw new InterruptException();
            }
            if (timeout < 0L) continue;
            now = System.currentTimeMillis();
        }
        return done;
    }

    private Connection lookup(String host, String service) {
        for (Connector c : this._driver.connectors()) {
            Connection connection = c.getConnection();
            if (!host.equals(connection.getRemoteContainer()) && !service.equals(connection.getContext())) continue;
            return connection;
        }
        return null;
    }

    private void reclaimCredit(Connection connection) {
        for (Link link2 : new Links(connection, ANY, ANY)) {
            if (!(link2 instanceof Receiver) || link2.getCredit() <= 0) continue;
            this.reclaimCredit(link2.getCredit());
        }
    }

    private void reclaimCredit(int credit) {
        this._credit += credit;
        this._distributed -= credit;
    }

    private void distributeCredit() {
        int linkCt = 0;
        for (Connector c : this._driver.connectors()) {
            if (c.isClosed()) continue;
            Connection connection = c.getConnection();
            for (Link link2 : new Links(connection, ACTIVE, ANY)) {
                if (!(link2 instanceof Receiver)) continue;
                ++linkCt;
            }
        }
        if (linkCt == 0) {
            return;
        }
        if (this._receiving < 0) {
            this._credit = linkCt * 1024 - this.incoming();
        } else {
            int total = this._credit + this._distributed;
            if (this._receiving > total) {
                this._credit += this._receiving - total;
            }
        }
        int batch = this._credit < linkCt ? 1 : this._credit / linkCt;
        for (Connector c : this._driver.connectors()) {
            if (c.isClosed()) continue;
            Connection connection = c.getConnection();
            for (Link link3 : new Links(connection, ACTIVE, ANY)) {
                int have;
                if (!(link3 instanceof Receiver) || (have = ((Receiver)link3).getCredit()) >= batch) continue;
                int need = batch - have;
                int amount = this._credit < need ? this._credit : need;
                ((Receiver)link3).flow(amount);
                this._credit -= amount;
                this._distributed += amount;
                if (this._credit != 0) continue;
                return;
            }
        }
    }

    private <C extends Link> C getLink(String host, int port, LinkFinder<C> finder) {
        Link link22;
        String service = host + ":" + port;
        Connection connection = this.lookup(host, service);
        if (connection == null) {
            Connector<Object> connector = this._driver.createConnector(host, port, null);
            this._logger.log(Level.FINE, "Connecting to " + host + ":" + port);
            connection = Proton.connection();
            connection.setContainer(this._name);
            connection.setHostname(host);
            connection.setContext(service);
            connector.setConnection(connection);
            Sasl sasl = connector.sasl();
            if (sasl != null) {
                sasl.client();
                sasl.setMechanisms("ANONYMOUS");
            }
            connection.open();
        }
        for (Link link22 : new Links(connection, ACTIVE, ANY)) {
            C result = finder.test(link22);
            if (result == null) continue;
            return result;
        }
        Session session = connection.session();
        session.open();
        link22 = finder.create(session);
        link22.open();
        return (C)link22;
    }

    private void adjustReplyTo(Message m) {
        String original = m.getReplyTo();
        if (original == null || original.length() == 0) {
            m.setReplyTo("amqp://" + this._name);
        } else if (original.startsWith("~/")) {
            m.setReplyTo("amqp://" + this._name + "/" + original.substring(2));
        }
    }

    private static String cleanPath(String path) {
        if (path != null && path.length() > 0 && path.charAt(0) == '/') {
            return path.substring(1);
        }
        return path;
    }

    private static boolean matchTarget(Target target, String path) {
        if (target == null) {
            return path.isEmpty();
        }
        return path.equals(target.getAddress());
    }

    private static boolean matchSource(Source source, String path) {
        if (source == null) {
            return path.isEmpty();
        }
        return path.equals(source.getAddress());
    }

    private static int defaultPort(String scheme) {
        if ("amqps".equals(scheme)) {
            return 5671;
        }
        return 5672;
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("MessengerImpl [_name=").append(this._name).append("]");
        return builder.toString();
    }

    private static class SessionIterator
    implements Iterator<Session> {
        private final EnumSet<EndpointState> _local;
        private final EnumSet<EndpointState> _remote;
        private Session _next;

        SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
            this._local = local;
            this._remote = remote;
            this._next = connection.sessionHead(this._local, this._remote);
        }

        @Override
        public boolean hasNext() {
            return this._next != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Session next() {
            try {
                Session session = this._next;
                return session;
            }
            finally {
                this._next = this._next.next(this._local, this._remote);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static class Sessions
    implements Iterable<Session> {
        private final Connection _connection;
        private final EnumSet<EndpointState> _local;
        private final EnumSet<EndpointState> _remote;

        Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
            this._connection = connection;
            this._local = local;
            this._remote = remote;
        }

        @Override
        public Iterator<Session> iterator() {
            return new SessionIterator(this._connection, this._local, this._remote);
        }
    }

    private static class LinkIterator
    implements Iterator<Link> {
        private final EnumSet<EndpointState> _local;
        private final EnumSet<EndpointState> _remote;
        private Link _next;

        LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
            this._local = local;
            this._remote = remote;
            this._next = connection.linkHead(this._local, this._remote);
        }

        @Override
        public boolean hasNext() {
            return this._next != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Link next() {
            try {
                Link link2 = this._next;
                return link2;
            }
            finally {
                this._next = this._next.next(this._local, this._remote);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static class Links
    implements Iterable<Link> {
        private final Connection _connection;
        private final EnumSet<EndpointState> _local;
        private final EnumSet<EndpointState> _remote;

        Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
            this._connection = connection;
            this._local = local;
            this._remote = remote;
        }

        @Override
        public Iterator<Link> iterator() {
            return new LinkIterator(this._connection, this._local, this._remote);
        }
    }

    private class ReceiverFinder
    implements LinkFinder<Receiver> {
        private final String _path;

        ReceiverFinder(String path) {
            this._path = path;
        }

        @Override
        public Receiver test(Link link2) {
            if (link2 instanceof Receiver && MessengerImpl.matchSource((Source)link2.getSource(), this._path)) {
                return (Receiver)link2;
            }
            return null;
        }

        @Override
        public Receiver create(Session session) {
            Receiver receiver = session.receiver(this._path);
            Source source = new Source();
            source.setAddress(this._path);
            receiver.setSource(source);
            return receiver;
        }
    }

    private class SenderFinder
    implements LinkFinder<Sender> {
        private final String _path;

        SenderFinder(String path) {
            this._path = path;
        }

        @Override
        public Sender test(Link link2) {
            if (link2 instanceof Sender && MessengerImpl.matchTarget((Target)link2.getTarget(), this._path)) {
                return (Sender)link2;
            }
            return null;
        }

        @Override
        public Sender create(Session session) {
            Sender sender = session.sender(this._path);
            Target target = new Target();
            target.setAddress(this._path);
            sender.setTarget(target);
            return sender;
        }
    }

    private static interface LinkFinder<C extends Link> {
        public C test(Link var1);

        public C create(Session var1);
    }

    private class WorkPred
    implements Predicate {
        private WorkPred() {
        }

        @Override
        public boolean test() {
            return MessengerImpl.this._worked;
        }
    }

    private class AllClosed
    implements Predicate {
        private AllClosed() {
        }

        @Override
        public boolean test() {
            if (MessengerImpl.this._driver == null) {
                return true;
            }
            for (Connector c : MessengerImpl.this._driver.connectors()) {
                if (c.isClosed()) continue;
                return false;
            }
            MessengerImpl.this._driver.destroy();
            MessengerImpl.this._driver = null;
            return true;
        }
    }

    private class MessageAvailable
    implements Predicate {
        private MessageAvailable() {
        }

        @Override
        public boolean test() {
            for (Connector c : MessengerImpl.this._driver.connectors()) {
                Connection connection = c.getConnection();
                for (Delivery delivery = connection.getWorkHead(); delivery != null; delivery = delivery.getWorkNext()) {
                    if (!delivery.isReadable() || delivery.isPartial()) continue;
                    return true;
                }
            }
            return false;
        }
    }

    private class SentSettled
    implements Predicate {
        private SentSettled() {
        }

        @Override
        public boolean test() {
            for (Connector c : MessengerImpl.this._driver.connectors()) {
                Connection connection = c.getConnection();
                for (Link link2 : new Links(connection, ACTIVE, ANY)) {
                    if (!(link2 instanceof Sender) || link2.getQueued() <= 0) continue;
                    return false;
                }
            }
            return this.checkSettled(MessengerImpl.this._outgoing.deliveries());
        }

        boolean checkSettled(Iterator<Delivery> unsettled) {
            if (unsettled != null) {
                Delivery d;
                while (unsettled.hasNext() && (d = unsettled.next()) != null) {
                    if (d.getRemoteState() != null || d.remotelySettled()) {
                        d.settle();
                        continue;
                    }
                    if (d.getLink().getSession().getConnection().getRemoteState() == EndpointState.CLOSED) continue;
                    return false;
                }
            }
            return true;
        }
    }

    private static interface Predicate {
        public boolean test();
    }
}

