/*
 * Decompiled with CFR 0.152.
 */
package zmq.socket.radiodish;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import zmq.Ctx;
import zmq.Msg;
import zmq.Options;
import zmq.SocketBase;
import zmq.io.IOThread;
import zmq.io.SessionBase;
import zmq.io.net.Address;
import zmq.pipe.Pipe;
import zmq.socket.FQ;
import zmq.socket.pubsub.Dist;

public class Dish
extends SocketBase {
    private final FQ fq;
    private final Dist dist;
    private final Set<String> subscriptions;
    private Msg pendingMsg;

    public Dish(Ctx parent, int tid, int sid) {
        super(parent, tid, sid, true);
        this.options.type = 15;
        this.options.linger = 0;
        this.fq = new FQ();
        this.dist = new Dist();
        this.subscriptions = new HashSet<String>();
    }

    @Override
    protected void xattachPipe(Pipe pipe, boolean subscribe2all, boolean isLocallyInitiated) {
        assert (pipe != null);
        this.fq.attach(pipe);
        this.dist.attach(pipe);
        this.sendSubscriptions(pipe);
    }

    @Override
    protected void xreadActivated(Pipe pipe) {
        this.fq.activated(pipe);
    }

    @Override
    protected void xwriteActivated(Pipe pipe) {
        this.dist.activated(pipe);
    }

    @Override
    protected void xpipeTerminated(Pipe pipe) {
        this.fq.terminated(pipe);
        this.dist.terminated(pipe);
    }

    @Override
    protected void xhiccuped(Pipe pipe) {
        this.sendSubscriptions(pipe);
    }

    @Override
    protected boolean xjoin(String group) {
        if (group.length() > 255) {
            this.errno.set(22);
            return false;
        }
        if (!this.subscriptions.add(group)) {
            this.errno.set(22);
            return false;
        }
        Msg msg = new Msg();
        msg.initJoin();
        msg.setGroup(group);
        this.dist.sendToAll(msg);
        return true;
    }

    @Override
    protected boolean xleave(String group) {
        if (group.length() > 255) {
            this.errno.set(22);
            return false;
        }
        if (!this.subscriptions.remove(group)) {
            this.errno.set(22);
            return false;
        }
        Msg msg = new Msg();
        msg.initLeave();
        msg.setGroup(group);
        this.dist.sendToAll(msg);
        return true;
    }

    @Override
    protected boolean xsend(Msg msg) {
        this.errno.set(45);
        throw new UnsupportedOperationException();
    }

    @Override
    protected Msg xrecv() {
        if (this.pendingMsg != null) {
            Msg msg = this.pendingMsg;
            this.pendingMsg = null;
            return msg;
        }
        return this.xxrecv();
    }

    private Msg xxrecv() {
        Msg msg = this.fq.recv(this.errno);
        if (msg == null) {
            return null;
        }
        while (!this.subscriptions.contains(msg.getGroup())) {
            msg = this.fq.recv(this.errno);
            if (msg != null) continue;
            return null;
        }
        return msg;
    }

    @Override
    protected boolean xhasIn() {
        if (this.pendingMsg != null) {
            return true;
        }
        Msg msg = this.xxrecv();
        if (msg == null) {
            return false;
        }
        this.pendingMsg = msg;
        return true;
    }

    @Override
    protected boolean xhasOut() {
        return true;
    }

    private void sendSubscriptions(Pipe pipe) {
        for (String s : this.subscriptions) {
            Msg msg = new Msg();
            msg.initJoin();
            msg.setGroup(s);
            pipe.write(msg);
        }
        pipe.flush();
    }

    public static class DishSession
    extends SessionBase {
        static final byte[] JOIN_BYTES = "\u0004JOIN".getBytes(StandardCharsets.US_ASCII);
        static final byte[] LEAVE_BYTES = "\u0005LEAVE".getBytes(StandardCharsets.US_ASCII);
        private State state = State.GROUP;
        private String group = "";

        public DishSession(IOThread ioThread, boolean connect, SocketBase socket, Options options, Address addr) {
            super(ioThread, connect, socket, options, addr);
        }

        @Override
        public boolean pushMsg(Msg msg) {
            switch (this.state) {
                case GROUP: {
                    if (!msg.hasMore()) {
                        this.errno.set(14);
                        return false;
                    }
                    if (msg.size() > 255) {
                        this.errno.set(14);
                        return false;
                    }
                    this.group = new String(msg.data(), StandardCharsets.US_ASCII);
                    this.state = State.BODY;
                    return true;
                }
                case BODY: {
                    msg.setGroup(this.group);
                    if (msg.hasMore()) {
                        this.errno.set(14);
                        return false;
                    }
                    boolean rc = super.pushMsg(msg);
                    if (rc) {
                        this.state = State.GROUP;
                    }
                    return rc;
                }
            }
            throw new IllegalStateException();
        }

        @Override
        protected Msg pullMsg() {
            Msg command;
            Msg msg = super.pullMsg();
            if (msg == null) {
                return null;
            }
            if (!msg.isJoin() && !msg.isLeave()) {
                return msg;
            }
            byte[] groupBytes = msg.getGroup().getBytes(StandardCharsets.US_ASCII);
            if (msg.isJoin()) {
                command = new Msg(groupBytes.length + 5);
                command.put(JOIN_BYTES);
            } else {
                command = new Msg(groupBytes.length + 6);
                command.put(LEAVE_BYTES);
            }
            command.setFlags(2);
            command.put(groupBytes);
            return command;
        }

        @Override
        protected void reset() {
            super.reset();
            this.state = State.GROUP;
        }

        static enum State {
            GROUP,
            BODY;

        }
    }
}

