/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.server.http;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxContext;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.bayeux.server.ServerTransport;
import org.cometd.common.AsyncFoldLeft;
import org.cometd.server.AbstractServerTransport;
import org.cometd.server.BayeuxServerImpl;
import org.cometd.server.CometDRequest;
import org.cometd.server.CometDResponse;
import org.cometd.server.HttpException;
import org.cometd.server.ServerMessageImpl;
import org.cometd.server.ServerSessionImpl;
import org.cometd.server.http.AbstractHttpScheduler;
import org.cometd.server.http.TransportContext;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.NanoTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHttpTransport
extends AbstractServerTransport {
    public static final String PREFIX = "long-polling";
    public static final String JSON_DEBUG_OPTION = "jsonDebug";
    public static final String MESSAGE_PARAM = "message";
    public static final String BROWSER_COOKIE_NAME_OPTION = "browserCookieName";
    public static final String BROWSER_COOKIE_DOMAIN_OPTION = "browserCookieDomain";
    public static final String BROWSER_COOKIE_PATH_OPTION = "browserCookiePath";
    public static final String BROWSER_COOKIE_MAX_AGE_OPTION = "browserCookieMaxAge";
    public static final String BROWSER_COOKIE_SECURE_OPTION = "browserCookieSecure";
    public static final String BROWSER_COOKIE_HTTP_ONLY_OPTION = "browserCookieHttpOnly";
    public static final String BROWSER_COOKIE_SAME_SITE_OPTION = "browserCookieSameSite";
    public static final String BROWSER_COOKIE_PARTITIONED_OPTION = "browserCookiePartitioned";
    public static final String MAX_SESSIONS_PER_BROWSER_OPTION = "maxSessionsPerBrowser";
    public static final String HTTP2_MAX_SESSIONS_PER_BROWSER_OPTION = "http2MaxSessionsPerBrowser";
    public static final String MULTI_SESSION_INTERVAL_OPTION = "multiSessionInterval";
    public static final String TRUST_CLIENT_SESSION_OPTION = "trustClientSession";
    public static final String DUPLICATE_META_CONNECT_HTTP_RESPONSE_CODE_OPTION = "duplicateMetaConnectHttpResponseCode";
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHttpTransport.class);
    private static final byte[] OPEN_BRACKET = new byte[]{91};
    private static final byte[] COMMA = new byte[]{44};
    private static final byte[] CLOSE_BRACKET = new byte[]{93};
    private final ConcurrentMap<String, Collection<ServerSessionImpl>> _sessions = new ConcurrentHashMap<String, Collection<ServerSessionImpl>>();
    private final ConcurrentMap<String, AtomicInteger> _browserMap = new ConcurrentHashMap<String, AtomicInteger>();
    private final Map<String, AtomicInteger> _browserSweep = new ConcurrentHashMap<String, AtomicInteger>();
    private String _browserCookieName;
    private String _browserCookieDomain;
    private String _browserCookiePath;
    private int _browserCookieMaxAge;
    private boolean _browserCookieSecure;
    private boolean _browserCookieHttpOnly;
    private String _browserCookieSameSite;
    private boolean _browserCookiePartitioned;
    private int _maxSessionsPerBrowser;
    private int _http2MaxSessionsPerBrowser;
    private long _multiSessionInterval;
    private boolean _trustClientSession;
    private int _duplicateMetaConnectHttpResponseCode;
    private long _lastSweep;

    public static AbstractHttpTransport find(BayeuxServer bayeuxServer, CometDRequest request) {
        for (String transportName : bayeuxServer.getAllowedTransports()) {
            AbstractHttpTransport transport;
            ServerTransport serverTransport = bayeuxServer.getTransport(transportName);
            if (!(serverTransport instanceof AbstractHttpTransport) || !(transport = (AbstractHttpTransport)serverTransport).accept(request)) continue;
            return transport;
        }
        return null;
    }

    protected AbstractHttpTransport(BayeuxServerImpl bayeux, String name) {
        super(bayeux, name);
        this.setOptionPrefix(PREFIX);
    }

    @Override
    public void init() {
        super.init();
        this._browserCookieName = this.getOption(BROWSER_COOKIE_NAME_OPTION, "BAYEUX_BROWSER");
        this._browserCookieDomain = this.getOption(BROWSER_COOKIE_DOMAIN_OPTION, null);
        this._browserCookiePath = this.getOption(BROWSER_COOKIE_PATH_OPTION, "/");
        this._browserCookieMaxAge = this.getOption(BROWSER_COOKIE_MAX_AGE_OPTION, -1);
        this._browserCookieSecure = this.getOption(BROWSER_COOKIE_SECURE_OPTION, false);
        this._browserCookieHttpOnly = this.getOption(BROWSER_COOKIE_HTTP_ONLY_OPTION, true);
        this._browserCookieSameSite = this.getOption(BROWSER_COOKIE_SAME_SITE_OPTION, null);
        this._browserCookiePartitioned = this.getOption(BROWSER_COOKIE_PARTITIONED_OPTION, false);
        this._maxSessionsPerBrowser = this.getOption(MAX_SESSIONS_PER_BROWSER_OPTION, 1);
        this._http2MaxSessionsPerBrowser = this.getOption(HTTP2_MAX_SESSIONS_PER_BROWSER_OPTION, -1);
        this._multiSessionInterval = this.getOption(MULTI_SESSION_INTERVAL_OPTION, 2000);
        this._trustClientSession = this.getOption(TRUST_CLIENT_SESSION_OPTION, false);
        this._duplicateMetaConnectHttpResponseCode = this.getOption(DUPLICATE_META_CONNECT_HTTP_RESPONSE_CODE_OPTION, 500);
        if (this._duplicateMetaConnectHttpResponseCode < 400) {
            throw new IllegalArgumentException("Option '%s' must be greater or equal to 400, not %s".formatted(DUPLICATE_META_CONNECT_HTTP_RESPONSE_CODE_OPTION, this._duplicateMetaConnectHttpResponseCode));
        }
    }

    protected String getBrowserCookieName() {
        return this._browserCookieName;
    }

    protected String getBrowserCookieDomain() {
        return this._browserCookieDomain;
    }

    protected String getBrowserCookiePath() {
        return this._browserCookiePath;
    }

    protected int getBrowserCookieMaxAge() {
        return this._browserCookieMaxAge;
    }

    protected boolean isBrowserCookieSecure() {
        return this._browserCookieSecure;
    }

    protected boolean isBrowserCookieHttpOnly() {
        return this._browserCookieHttpOnly;
    }

    protected String getBrowserCookieSameSite() {
        return this._browserCookieSameSite;
    }

    protected boolean isBrowserCookiePartitioned() {
        return this._browserCookiePartitioned;
    }

    protected long getMultiSessionInterval() {
        return this._multiSessionInterval;
    }

    protected int getDuplicateMetaConnectHttpResponseCode() {
        return this._duplicateMetaConnectHttpResponseCode;
    }

    public abstract boolean accept(CometDRequest var1);

    public void handle(BayeuxContext bayeuxContext, CometDRequest request, CometDResponse response, Promise<Void> promise) {
        promise = new Promise.Wrapper<Void>(promise){

            public void fail(Throwable failure) {
                if (failure instanceof HttpException) {
                    super.fail(failure);
                } else {
                    int code = failure instanceof AbstractServerTransport.SchedulerCancelledException ? AbstractHttpTransport.this.getDuplicateMetaConnectHttpResponseCode() : 500;
                    super.fail((Throwable)new HttpException(code, failure));
                }
            }
        };
        TransportContext context = new TransportContext(bayeuxContext, request, response, (Promise<Void>)promise);
        this.handle(context);
    }

    protected abstract void handle(TransportContext var1);

    protected HttpScheduler suspend(TransportContext context, Promise<Void> promise, ServerMessage.Mutable message, long timeout) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Suspended {}", (Object)message);
        }
        context.scheduler(this.newHttpScheduler(context, promise, message, timeout));
        context.session().notifySuspended((ServerMessage)message, timeout);
        return context.scheduler();
    }

    protected HttpScheduler newHttpScheduler(TransportContext context, Promise<Void> promise, ServerMessage.Mutable reply, long timeout) {
        return new HttpSchedulerImpl(this, context, promise, reply, timeout);
    }

    protected void write(TransportContext context, List<ServerMessage> messages) {
        try {
            Writer writer = new Writer(context, messages);
            writer.iterate();
        }
        catch (Throwable x) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Exception while writing messages", x);
            }
            if (context.scheduleExpiration()) {
                this.scheduleExpiration(context.session(), context.metaConnectCycle());
            }
            context.promise().fail(x);
        }
    }

    protected void processMessages(TransportContext context, List<ServerMessage.Mutable> messages) {
        if (messages.isEmpty()) {
            context.promise().fail((Throwable)new IOException("protocol violation"));
        } else {
            boolean batch;
            Collection<ServerSessionImpl> sessions = this.findCurrentSessions(context.request());
            ServerMessage.Mutable message = messages.get(0);
            ServerSessionImpl session = this.findSession(sessions, message);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Processing {} messages for {}", (Object)messages.size(), (Object)session);
            }
            boolean bl = batch = session != null && !"/meta/connect".equals(message.getChannel());
            if (batch) {
                session.startBatch();
            }
            context.messages(messages);
            context.session(session);
            AsyncFoldLeft.run(messages, null, (result, item, loop) -> this.processMessage(context, (ServerMessageImpl)((Object)item), (Promise<Void>)Promise.from(arg_0 -> ((AsyncFoldLeft.Loop)loop).proceed(arg_0), arg_0 -> ((AsyncFoldLeft.Loop)loop).fail(arg_0))), (Promise)Promise.complete((r, x) -> {
                if (x == null) {
                    this.flush(context);
                } else {
                    context.promise().fail(x);
                }
                if (batch) {
                    session.endBatch();
                }
            }));
        }
    }

    private void processMessage(TransportContext context, ServerMessageImpl message, Promise<Void> promise) {
        String channel;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Processing {}", (Object)message);
        }
        message.setServerTransport(this);
        message.setBayeuxContext(context.bayeuxContext());
        ServerSessionImpl session = context.session();
        if (session != null) {
            session.setServerTransport(this);
        }
        if ("/meta/handshake".equals(channel = message.getChannel())) {
            if (context.messages().size() > 1) {
                promise.fail((Throwable)new IOException("bayeux protocol violation"));
            } else {
                this.processMetaHandshake(context, message, promise);
            }
        } else if ("/meta/connect".equals(channel)) {
            boolean canSuspend = context.messages().size() == 1;
            this.processMetaConnect(context, message, canSuspend, (Promise<Void>)Promise.from(y -> this.resume(context, message, promise), arg_0 -> promise.fail(arg_0)));
        } else {
            this.processMessage1(context, message, promise);
        }
    }

    protected ServerSessionImpl findSession(Collection<ServerSessionImpl> sessions, ServerMessage.Mutable message) {
        if ("/meta/handshake".equals(message.getChannel())) {
            ServerSessionImpl session = this.getBayeuxServer().newServerSession();
            session.setAllowMessageDeliveryDuringHandshake(this.isAllowMessageDeliveryDuringHandshake());
            return session;
        }
        String clientId = message.getClientId();
        if (sessions != null && clientId != null) {
            for (ServerSessionImpl session : sessions) {
                if (!session.getId().equals(clientId)) continue;
                return session;
            }
        }
        if (this._trustClientSession) {
            return (ServerSessionImpl)this.getBayeuxServer().getSession(clientId);
        }
        return null;
    }

    protected Collection<ServerSessionImpl> findCurrentSessions(CometDRequest request) {
        String value = request.getCookie(this._browserCookieName);
        if (value != null) {
            return (Collection)this._sessions.get(value);
        }
        return null;
    }

    private void processMetaHandshake(TransportContext context, ServerMessage.Mutable message, Promise<Void> promise) {
        this.handleMessage(context, message, (Promise<ServerMessage.Mutable>)Promise.from(reply -> {
            ServerSessionImpl session = context.session();
            if (reply.isSuccessful()) {
                String id = this.findBrowserId(context);
                if (id == null) {
                    id = this.setBrowserId(context);
                }
                String browserId = id;
                session.setBrowserId(browserId);
                Collection sessions = this._sessions.computeIfAbsent(browserId, k -> new CopyOnWriteArrayList());
                sessions.add(session);
                session.addListener((ServerSession.ServerSessionListener)((ServerSession.RemovedListener)(s, m, t) -> this._sessions.computeIfPresent(browserId, (k, v) -> {
                    v.remove(session);
                    return v.isEmpty() ? null : v;
                })));
            }
            this.processReply(session, (ServerMessage.Mutable)reply, (Promise<ServerMessage.Mutable>)Promise.from(r -> {
                if (r != null) {
                    context.replies().add((ServerMessage.Mutable)r);
                }
                context.sendQueue(r != null && r.isSuccessful() && this.allowMessageDeliveryDuringHandshake(session));
                context.scheduleExpiration(true);
                promise.succeed(null);
            }, x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle(), promise, (Throwable)x)));
        }, arg_0 -> promise.fail(arg_0)));
    }

    private void processMetaConnect(TransportContext context, ServerMessage.Mutable message, boolean canSuspend, Promise<Void> promise) {
        ServerSessionImpl session = context.session();
        if (session != null) {
            session.setScheduler(null);
        }
        boolean wasConnected = session != null && session.isConnected();
        this.handleMessage(context, message, (Promise<ServerMessage.Mutable>)Promise.from(reply -> {
            boolean proceed = true;
            if (session != null) {
                boolean maySuspend;
                boolean bl = maySuspend = !session.shouldSchedule();
                if (canSuspend && maySuspend && reply.isSuccessful()) {
                    CometDRequest request = context.request();
                    boolean allowSuspendConnect = this.incBrowserId(session, this.isHTTP2(request));
                    if (allowSuspendConnect) {
                        long timeout = session.calculateTimeout(this.getTimeout());
                        if (timeout > 0L && wasConnected && session.isConnected()) {
                            HttpScheduler scheduler = this.suspend(context, promise, message, timeout);
                            session.setScheduler(scheduler);
                            request.addFailureHandler(scheduler::cancel);
                            proceed = false;
                        } else {
                            this.decBrowserId(session, this.isHTTP2(request));
                        }
                    } else {
                        Map advice = reply.getAdvice(true);
                        advice.put("multiple-clients", true);
                        long multiSessionInterval = this.getMultiSessionInterval();
                        if (multiSessionInterval > 0L) {
                            advice.put("reconnect", "retry");
                            advice.put("interval", multiSessionInterval);
                        } else {
                            advice.put("reconnect", "none");
                            reply.setSuccessful(false);
                        }
                    }
                }
                if (proceed && session.isDisconnected()) {
                    reply.getAdvice(true).put("reconnect", "none");
                }
            }
            if (proceed) {
                promise.succeed(null);
            }
        }, x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle(), promise, (Throwable)x)));
    }

    private void processMessage1(TransportContext context, ServerMessageImpl message, Promise<Void> promise) {
        this.handleMessage(context, message, (Promise<ServerMessage.Mutable>)Promise.from(y -> {
            ServerSessionImpl session = context.session();
            this.processReply(session, message.getAssociated(), (Promise<ServerMessage.Mutable>)Promise.from(reply -> {
                boolean metaConnectDelivery;
                if (reply != null) {
                    context.replies().add((ServerMessage.Mutable)reply);
                }
                boolean bl = metaConnectDelivery = this.isMetaConnectDeliveryOnly() || session != null && session.isMetaConnectDeliveryOnly();
                if (!metaConnectDelivery) {
                    context.sendQueue(true);
                }
                promise.succeed(null);
            }, arg_0 -> ((Promise)promise).fail(arg_0)));
        }, arg_0 -> promise.fail(arg_0)));
    }

    protected boolean isHTTP2(CometDRequest request) {
        return "HTTP/2.0".equals(request.getProtocol());
    }

    protected void flush(TransportContext context) {
        List<ServerMessage> messages = List.of();
        ServerSessionImpl session = context.session();
        if (context.sendQueue() && session != null) {
            messages = session.takeQueue(context.replies());
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Flushing {}, replies={}, messages={}", new Object[]{session, context.replies(), messages});
        }
        this.write(context, messages);
    }

    protected void resume(TransportContext context, ServerMessage.Mutable message, Promise<Void> promise) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Resumed {}", (Object)message);
        }
        ServerMessage.Mutable reply = message.getAssociated();
        ServerSessionImpl session = context.session();
        if (session != null) {
            Map<String, Object> advice = session.takeAdvice(this);
            if (advice != null) {
                reply.put((Object)"advice", advice);
            }
            if (session.isDisconnected()) {
                reply.getAdvice(true).put("reconnect", "none");
            }
        }
        this.processReply(session, reply, (Promise<ServerMessage.Mutable>)Promise.from(r -> {
            if (r != null) {
                context.replies().add((ServerMessage.Mutable)r);
            }
            context.sendQueue(true);
            context.scheduleExpiration(true);
            promise.succeed(null);
        }, x -> this.scheduleExpirationAndFail(session, context.metaConnectCycle(), promise, (Throwable)x)));
    }

    private void scheduleExpirationAndFail(ServerSessionImpl session, long metaConnectCycle, Promise<Void> promise, Throwable x) {
        this.scheduleExpiration(session, metaConnectCycle);
        promise.fail(x);
    }

    protected String findBrowserId(TransportContext context) {
        return context.bayeuxContext().getCookie(this._browserCookieName);
    }

    protected String setBrowserId(TransportContext context) {
        StringBuilder builder = new StringBuilder();
        while (builder.length() < 16) {
            builder.append(Long.toString(this.getBayeuxServer().randomLong(), 36));
        }
        builder.setLength(16);
        String browserId = builder.toString();
        builder.setLength(0);
        this.newBrowserCookie(builder, this.getBrowserCookieName(), browserId, context.bayeuxContext().isSecure());
        context.response().addHeader("Set-Cookie", builder.toString());
        return browserId;
    }

    protected void newBrowserCookie(StringBuilder builder, String name, String value, boolean secure) {
        String sameSite;
        int maxAge;
        String path;
        builder.append(name).append("=").append(value);
        String domain = this.getBrowserCookieDomain();
        if (domain != null) {
            builder.append("; Domain=").append(domain);
        }
        if ((path = this.getBrowserCookiePath()) != null) {
            builder.append("; Path=").append(path);
        }
        if ((maxAge = this.getBrowserCookieMaxAge()) >= 0) {
            builder.append("; Max-Age=").append(maxAge);
        }
        if (this.isBrowserCookieHttpOnly()) {
            builder.append("; HttpOnly");
        }
        if (secure && this.isBrowserCookieSecure()) {
            builder.append("; Secure");
        }
        if ((sameSite = this.getBrowserCookieSameSite()) != null) {
            builder.append("; SameSite=").append(sameSite);
        }
        if (this.isBrowserCookiePartitioned()) {
            builder.append("; Partitioned");
        }
    }

    public boolean incBrowserId(ServerSessionImpl session, boolean http2) {
        int maxSessionsPerBrowser;
        int n = maxSessionsPerBrowser = http2 ? this._http2MaxSessionsPerBrowser : this._maxSessionsPerBrowser;
        if (maxSessionsPerBrowser < 0) {
            return true;
        }
        if (maxSessionsPerBrowser == 0) {
            return false;
        }
        String browserId = session.getBrowserId();
        AtomicInteger count = this._browserMap.computeIfAbsent(browserId, k -> new AtomicInteger());
        int sessions = count.incrementAndGet();
        if (sessions == 1) {
            this._browserSweep.remove(browserId);
        }
        boolean result = true;
        if (sessions > maxSessionsPerBrowser) {
            sessions = count.decrementAndGet();
            result = false;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("client {} {} sessions for {}", new Object[]{browserId, sessions, session});
        }
        return result;
    }

    public void decBrowserId(ServerSessionImpl session, boolean http2) {
        int maxSessionsPerBrowser = http2 ? this._http2MaxSessionsPerBrowser : this._maxSessionsPerBrowser;
        String browserId = session.getBrowserId();
        if (maxSessionsPerBrowser <= 0 || browserId == null) {
            return;
        }
        int sessions = -1;
        AtomicInteger count = (AtomicInteger)this._browserMap.get(browserId);
        if (count != null) {
            sessions = count.decrementAndGet();
        }
        if (sessions == 0) {
            this._browserSweep.put(browserId, new AtomicInteger());
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("client {} {} sessions for {}", new Object[]{browserId, sessions, session});
        }
    }

    protected void handleMessage(TransportContext context, ServerMessage.Mutable message, Promise<ServerMessage.Mutable> promise) {
        this.getBayeuxServer().handle(context.session(), message, promise);
    }

    protected void writePrepare(TransportContext context, Promise<Void> promise) {
        context.response().setContentType("application/json");
        promise.succeed(null);
    }

    protected void writeBegin(CometDResponse.Output output, Promise<Void> promise) {
        output.write(false, OPEN_BRACKET, promise);
    }

    protected void writeMessage(CometDResponse.Output output, ServerMessage message, Promise<Void> promise) {
        output.write(false, this.toJSONBytes(message), promise);
    }

    protected void writeEnd(CometDResponse.Output output, Promise<Void> promise) {
        output.write(true, CLOSE_BRACKET, promise);
    }

    protected void writeComplete(TransportContext context, List<ServerMessage> messages) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Messages/replies {}/{} written for {}", new Object[]{messages.size(), context.replies().size(), context.session()});
        }
    }

    @Override
    protected void sweep() {
        long now = NanoTime.now();
        long elapsed = NanoTime.millisElapsed((long)this._lastSweep, (long)now);
        if (this._lastSweep != 0L && elapsed > 0L) {
            int maxSweeps = (int)(2L * this.getMaxInterval() / elapsed);
            for (Map.Entry<String, AtomicInteger> entry : this._browserSweep.entrySet()) {
                String key;
                AtomicInteger count = entry.getValue();
                if (count == null || count.incrementAndGet() <= maxSweeps || !this._browserSweep.remove(key = entry.getKey(), count)) continue;
                this._browserMap.computeIfPresent(key, (k, v) -> {
                    if (v.get() == 0) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Swept browserId {}", (Object)key);
                        }
                        return null;
                    }
                    return v;
                });
            }
        }
        this._lastSweep = now;
    }

    protected byte[] toJSONBytes(ServerMessage msg) {
        ServerMessageImpl message = (ServerMessageImpl)((Object)(msg instanceof ServerMessageImpl ? msg : this.getBayeuxServer().newMessage(msg)));
        byte[] bytes = message.getJSONBytes();
        if (bytes == null) {
            bytes = this.toJSON((ServerMessage)message).getBytes(StandardCharsets.UTF_8);
        }
        return bytes;
    }

    public static interface HttpScheduler
    extends AbstractServerTransport.Scheduler {
    }

    private static class HttpSchedulerImpl
    extends AbstractHttpScheduler {
        private HttpSchedulerImpl(AbstractHttpTransport transport, TransportContext context, Promise<Void> promise, ServerMessage.Mutable message, long timeout) {
            super(transport, context, promise, message, timeout);
        }

        @Override
        protected void dispatch(boolean timeout) {
            this.getContext().session().notifyResumed((ServerMessage)this.getMessage(), timeout);
            this.getPromise().succeed(null);
        }
    }

    protected class Writer
    extends IteratingCallback
    implements Promise<Void> {
        private final TransportContext context;
        private final List<ServerMessage> messages;
        private State state = State.PREPARE;
        private int replyIndex;
        private int messageIndex;
        private boolean needsComma;

        protected Writer(TransportContext context, List<ServerMessage> messages) {
            this.context = context;
            this.messages = messages;
        }

        protected IteratingCallback.Action process() throws Throwable {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Processing write {} for messages/replies {}/{} for {}", new Object[]{this.state, this.messages.size(), this.context.replies().size(), this.context.session()});
            }
            CometDResponse.Output output = this.context.response().getOutput();
            return switch (this.state.ordinal()) {
                default -> throw new IncompatibleClassChangeError();
                case 0 -> {
                    this.state = State.BEGIN;
                    AbstractHttpTransport.this.writePrepare(this.context, this);
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 1 -> {
                    this.state = State.HANDSHAKE;
                    AbstractHttpTransport.this.writeBegin(output, this);
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 2 -> {
                    this.state = State.MESSAGES;
                    this.writeHandshakeReply(output, this);
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 3 -> {
                    if (this.writeMessages(output, this)) {
                        this.state = State.REPLIES;
                    }
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 4 -> {
                    if (this.writeReplies(output, this)) {
                        this.state = State.END;
                    }
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 5 -> {
                    this.state = State.COMPLETE;
                    AbstractHttpTransport.this.writeEnd(output, this);
                    yield IteratingCallback.Action.SCHEDULED;
                }
                case 6 -> IteratingCallback.Action.SUCCEEDED;
            };
        }

        public void succeed(Void result) {
            this.succeeded();
        }

        public void fail(Throwable failure) {
            this.failed(failure);
        }

        protected void onCompleteSuccess() {
            this.context.promise().succeed(null);
            AbstractHttpTransport.this.writeComplete(this.context, this.messages);
        }

        protected void onCompleteFailure(Throwable failure) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Failure writing messages", failure);
            }
            this.startExpiration();
            this.context.promise().fail(failure);
        }

        private void startExpiration() {
            if (this.context.scheduleExpiration()) {
                AbstractHttpTransport.this.scheduleExpiration(this.context.session(), this.context.metaConnectCycle());
            }
        }

        private void writeHandshakeReply(CometDResponse.Output output, Promise<Void> promise) {
            List<ServerMessage.Mutable> replies = this.context.replies();
            if (replies.isEmpty()) {
                promise.succeed(null);
                return;
            }
            ServerMessage.Mutable reply = replies.get(0);
            if ("/meta/handshake".equals(reply.getChannel())) {
                if (AbstractHttpTransport.this.allowMessageDeliveryDuringHandshake(this.context.session()) && !this.messages.isEmpty()) {
                    reply.put((Object)"x-messages", (Object)this.messages.size());
                }
                AbstractHttpTransport.this.getBayeuxServer().freeze(reply);
                output.write(false, AbstractHttpTransport.this.toJSONBytes((ServerMessage)reply), promise);
                this.needsComma = true;
                ++this.replyIndex;
            } else {
                promise.succeed(null);
            }
        }

        private boolean writeMessages(CometDResponse.Output output, Promise<Void> promise) {
            int size = this.messages.size();
            if (this.messageIndex == size) {
                this.startExpiration();
                promise.succeed(null);
                return true;
            }
            if (this.needsComma) {
                this.needsComma = false;
                output.write(false, COMMA, promise);
            } else {
                ServerMessage message = this.messages.get(this.messageIndex);
                this.needsComma = true;
                ++this.messageIndex;
                AbstractHttpTransport.this.writeMessage(output, message, promise);
            }
            return false;
        }

        private boolean writeReplies(CometDResponse.Output output, Promise<Void> promise) {
            List<ServerMessage.Mutable> replies = this.context.replies();
            int size = replies.size();
            if (this.replyIndex == size) {
                promise.succeed(null);
                return true;
            }
            ServerMessage.Mutable reply = replies.get(this.replyIndex);
            if (this.needsComma) {
                this.needsComma = false;
                output.write(false, COMMA, promise);
            } else {
                AbstractHttpTransport.this.getBayeuxServer().freeze(reply);
                this.needsComma = this.replyIndex < size;
                ++this.replyIndex;
                output.write(false, AbstractHttpTransport.this.toJSONBytes((ServerMessage)reply), promise);
            }
            return false;
        }

        private static enum State {
            PREPARE,
            BEGIN,
            HANDSHAKE,
            MESSAGES,
            REPLIES,
            END,
            COMPLETE;

        }
    }
}

