/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;

public final class WebsocketDuplexConnection
extends BaseDuplexConnection {
    private final Connection connection;

    public WebsocketDuplexConnection(Connection connection) {
        this.connection = Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener(future -> {
            if (!this.isDisposed()) {
                this.dispose();
            }
        });
        connection.outbound().sendObject((Publisher)this.sender.map(BinaryWebSocketFrame::new)).then().subscribe();
    }

    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    public SocketAddress remoteAddress() {
        return this.connection.channel().remoteAddress();
    }

    protected void doOnClose() {
        this.sender.dispose();
        this.connection.dispose();
    }

    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive();
    }

    public void sendErrorAndClose(RSocketErrorException e) {
        ByteBuf errorFrame = ErrorFrameCodec.encode((ByteBufAllocator)this.alloc(), (int)0, (Throwable)e);
        this.connection.outbound().sendObject((Object)new BinaryWebSocketFrame(errorFrame)).then().subscribe(null, t -> this.onClose.tryEmitError(t), () -> {
            Throwable cause = e.getCause();
            if (cause == null) {
                this.onClose.tryEmitEmpty();
            } else {
                this.onClose.tryEmitError(cause);
            }
        });
    }
}

