/*
 * Decompiled with CFR 0.152.
 */
package io.activej.net.socket.tcp;

import io.activej.async.callback.Callback;
import io.activej.async.exception.AsyncCloseException;
import io.activej.async.exception.AsyncTimeoutException;
import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.common.inspector.AbstractInspector;
import io.activej.common.inspector.BaseInspector;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ExceptionStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.net.socket.tcp.ITcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.AbstractNioReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.net.SocketSettings;
import io.activej.reactor.nio.NioChannelEventHandler;
import io.activej.reactor.nio.NioReactor;
import io.activej.reactor.schedule.ScheduledRunnable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.Nullable;

public final class TcpSocket
extends AbstractNioReactive
implements ITcpSocket,
NioChannelEventHandler {
    private static final boolean CHECKS = Checks.isEnabled(TcpSocket.class);
    private static final int DEBUG_READ_OFFSET = ApplicationSettings.getInt(TcpSocket.class, (String)"debugReadOffset", (Integer)0);
    public static final int DEFAULT_READ_BUFFER_SIZE = ApplicationSettings.getMemSize(TcpSocket.class, (String)"readBufferSize", (MemSize)MemSize.kilobytes((long)16L)).toInt();
    public static final int NO_TIMEOUT = 0;
    private static final AtomicInteger CONNECTION_COUNT = new AtomicInteger(0);
    private final InetSocketAddress remoteAddress;
    @Nullable
    private SocketChannel channel;
    @Nullable
    private ByteBuf readBuf;
    private boolean readEndOfStream;
    @Nullable
    private ByteBuf writeBuf;
    private boolean writeEndOfStream;
    @Nullable
    private SettablePromise<ByteBuf> read;
    @Nullable
    private SettablePromise<Void> write;
    private SelectionKey key;
    private byte ops;
    private int readTimeout = 0;
    private int writeTimeout = 0;
    private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
    @Nullable
    private ScheduledRunnable scheduledReadTimeout;
    @Nullable
    private ScheduledRunnable scheduledWriteTimeout;
    @Nullable
    private Inspector inspector;
    @Nullable
    private Object userData;

    private TcpSocket(NioReactor reactor, @Nullable SocketChannel socketChannel, InetSocketAddress remoteAddress) {
        super(reactor);
        this.channel = socketChannel;
        this.remoteAddress = remoteAddress;
    }

    public static TcpSocket wrapChannel(NioReactor reactor, SocketChannel socketChannel, InetSocketAddress remoteAddress, @Nullable SocketSettings socketSettings) throws IOException {
        MemSize implReadBufferSize;
        Duration implWriteTimeout;
        TcpSocket tcpSocket = new TcpSocket(reactor, socketChannel, remoteAddress);
        if (socketSettings == null) {
            return tcpSocket;
        }
        socketSettings.applySettings(socketChannel);
        Duration implReadTimeout = socketSettings.getImplReadTimeout();
        if (implReadTimeout != null) {
            tcpSocket.readTimeout = (int)implReadTimeout.toMillis();
        }
        if ((implWriteTimeout = socketSettings.getImplWriteTimeout()) != null) {
            tcpSocket.writeTimeout = (int)implWriteTimeout.toMillis();
        }
        if ((implReadBufferSize = socketSettings.getImplReadBufferSize()) != null) {
            tcpSocket.readBufferSize = implReadBufferSize.toInt();
        }
        return tcpSocket;
    }

    public static TcpSocket wrapChannel(NioReactor reactor, SocketChannel socketChannel, @Nullable SocketSettings socketSettings) throws IOException {
        return TcpSocket.wrapChannel(reactor, socketChannel, (InetSocketAddress)socketChannel.getRemoteAddress(), socketSettings);
    }

    public static Promise<TcpSocket> connect(NioReactor reactor, InetSocketAddress address) {
        return TcpSocket.connect(reactor, address, null, null);
    }

    public static Promise<TcpSocket> connect(NioReactor reactor, InetSocketAddress address, @Nullable Duration duration, @Nullable SocketSettings socketSettings) {
        return TcpSocket.connect(reactor, address, duration == null ? 0L : duration.toMillis(), socketSettings);
    }

    public static Promise<TcpSocket> connect(NioReactor reactor, InetSocketAddress address, long timeout, @Nullable SocketSettings socketSettings) {
        return Promise.ofCallback(cb -> reactor.connect((SocketAddress)address, timeout, (Callback)cb)).map(channel -> {
            try {
                return TcpSocket.wrapChannel(reactor, channel, address, socketSettings);
            }
            catch (IOException e) {
                reactor.closeChannel((SelectableChannel)channel, null);
                throw e;
            }
        });
    }

    public void setInspector(@Nullable Inspector inspector) {
        this.inspector = inspector;
    }

    public static int getConnectionCount() {
        return CONNECTION_COUNT.get();
    }

    public InetSocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    @Nullable
    public Object getUserData() {
        return this.userData;
    }

    public void setUserData(@Nullable Object userData) {
        this.userData = userData;
    }

    private void scheduleReadTimeout() {
        assert (this.scheduledReadTimeout == null && this.readTimeout != 0);
        this.scheduledReadTimeout = this.reactor.delayBackground((long)this.readTimeout, () -> {
            if (this.inspector != null) {
                this.inspector.onReadTimeout(this);
            }
            this.scheduledReadTimeout = null;
            this.closeEx((Exception)new AsyncTimeoutException("Timed out"));
        });
    }

    private void scheduleWriteTimeout() {
        assert (this.scheduledWriteTimeout == null && this.writeTimeout != 0);
        this.scheduledWriteTimeout = this.reactor.delayBackground((long)this.writeTimeout, () -> {
            if (this.inspector != null) {
                this.inspector.onWriteTimeout(this);
            }
            this.scheduledWriteTimeout = null;
            this.closeEx((Exception)new AsyncTimeoutException("Timed out"));
        });
    }

    private void updateInterests() {
        assert (!this.isClosed() && this.ops >= 0);
        byte newOps = (byte)((this.readBuf == null && !this.readEndOfStream ? 1 : 0) | (this.writeBuf == null || this.writeEndOfStream ? 0 : 4));
        if (this.key == null) {
            this.ops = newOps;
            try {
                this.key = this.channel.register(this.reactor.ensureSelector(), this.ops, this);
                CONNECTION_COUNT.incrementAndGet();
            }
            catch (ClosedChannelException e) {
                this.closeEx(e);
            }
        } else if (this.ops != newOps) {
            this.ops = newOps;
            this.key.interestOps(this.ops);
        }
    }

    @Override
    public Promise<ByteBuf> read() {
        SettablePromise read;
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.isClosed()) {
            return Promise.ofException((Exception)new AsyncCloseException());
        }
        this.read = null;
        if (this.readBuf != null || this.readEndOfStream) {
            ByteBuf readBuf = this.readBuf;
            this.readBuf = null;
            return Promise.of((Object)readBuf);
        }
        this.read = read = new SettablePromise();
        if (this.scheduledReadTimeout == null && this.readTimeout != 0) {
            this.scheduleReadTimeout();
        }
        if (this.ops >= 0) {
            this.updateInterests();
        }
        return read;
    }

    public void onReadReady() {
        assert (this.reactor.inReactorThread());
        this.ops = (byte)(this.ops | 0x80);
        try {
            this.doRead();
        }
        catch (IOException e) {
            this.closeEx(e);
            return;
        }
        if (this.read != null && (this.readBuf != null || this.readEndOfStream)) {
            SettablePromise<@Nullable ByteBuf> read = this.read;
            ByteBuf readBuf = this.readBuf;
            this.read = null;
            this.readBuf = null;
            read.set((Object)readBuf);
        }
        if (this.isClosed()) {
            return;
        }
        this.ops = (byte)(this.ops & 0x7F);
        this.updateInterests();
    }

    private void doRead() throws IOException {
        int numRead;
        ByteBuf buf;
        assert (this.channel != null);
        if (DEBUG_READ_OFFSET == 0) {
            buf = ByteBufPool.allocate((int)this.readBufferSize);
        } else {
            Checks.checkState((DEBUG_READ_OFFSET > 0 ? 1 : 0) != 0);
            buf = ByteBufPool.allocate((int)this.readBufferSize);
            buf.tail(DEBUG_READ_OFFSET);
            buf.head(DEBUG_READ_OFFSET);
        }
        ByteBuffer buffer = buf.toWriteByteBuffer();
        try {
            numRead = this.channel.read(buffer);
            buf.ofWriteByteBuffer(buffer);
        }
        catch (IOException e) {
            buf.recycle();
            if (this.inspector != null) {
                this.inspector.onReadError(this, e);
            }
            throw e;
        }
        if (numRead == 0) {
            if (this.inspector != null) {
                this.inspector.onRead(this, buf);
            }
            buf.recycle();
            return;
        }
        this.scheduledReadTimeout = (ScheduledRunnable)Utils.nullify((Object)this.scheduledReadTimeout, ScheduledRunnable::cancel);
        if (numRead == -1) {
            buf.recycle();
            if (this.inspector != null) {
                this.inspector.onReadEndOfStream(this);
            }
            this.readEndOfStream = true;
            if (this.writeEndOfStream && this.writeBuf == null) {
                this.doClose();
            }
            return;
        }
        if (this.inspector != null) {
            this.inspector.onRead(this, buf);
        }
        if (this.readBuf == null) {
            this.readBuf = buf;
        } else {
            this.readBuf = ByteBufPool.ensureWriteRemaining((ByteBuf)this.readBuf, (int)buf.readRemaining());
            this.readBuf.put(buf.array(), buf.head(), buf.readRemaining());
            buf.recycle();
        }
    }

    @Override
    public Promise<Void> write(@Nullable ByteBuf buf) {
        SettablePromise write;
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
            Checks.checkState((!this.writeEndOfStream ? 1 : 0) != 0, (Object)"End of stream has already been sent");
        }
        if (this.isClosed()) {
            if (buf != null) {
                buf.recycle();
            }
            return Promise.ofException((Exception)new AsyncCloseException());
        }
        this.writeEndOfStream |= buf == null;
        if (this.writeBuf == null) {
            if (buf != null && !buf.canRead()) {
                buf.recycle();
                return Promise.complete();
            }
            this.writeBuf = buf;
        } else if (buf != null) {
            this.writeBuf = ByteBufPool.ensureWriteRemaining((ByteBuf)this.writeBuf, (int)buf.readRemaining());
            this.writeBuf.put(buf.array(), buf.head(), buf.readRemaining());
            buf.recycle();
        }
        if (this.write != null) {
            return this.write;
        }
        try {
            this.doWrite();
        }
        catch (IOException e) {
            this.closeEx(e);
            return Promise.ofException((Exception)e);
        }
        if (this.writeBuf == null) {
            return Promise.complete();
        }
        this.write = write = new SettablePromise();
        if (this.scheduledWriteTimeout == null && this.writeTimeout != 0) {
            this.scheduleWriteTimeout();
        }
        if (this.ops >= 0) {
            this.updateInterests();
        }
        return write;
    }

    @Override
    public boolean isReadAvailable() {
        return this.readBuf != null;
    }

    public void onWriteReady() {
        assert (this.reactor.inReactorThread());
        assert (this.write != null);
        this.ops = (byte)(this.ops | 0x80);
        try {
            this.doWrite();
        }
        catch (IOException e) {
            this.closeEx(e);
            return;
        }
        if (this.writeBuf == null) {
            SettablePromise<@Nullable Void> write = this.write;
            this.write = null;
            write.set(null);
        }
        if (this.isClosed()) {
            return;
        }
        this.ops = (byte)(this.ops & 0x7F);
        this.updateInterests();
    }

    private void doWrite() throws IOException {
        assert (this.channel != null);
        if (this.writeBuf != null) {
            ByteBuf buf = this.writeBuf;
            ByteBuffer buffer = buf.toReadByteBuffer();
            try {
                this.channel.write(buffer);
            }
            catch (IOException e) {
                if (this.inspector != null) {
                    this.inspector.onWriteError(this, e);
                }
                throw e;
            }
            if (this.inspector != null) {
                this.inspector.onWrite(this, buf, buffer.position() - buf.head());
            }
            buf.ofReadByteBuffer(buffer);
            if (buf.canRead()) {
                return;
            }
            buf.recycle();
            this.writeBuf = null;
        }
        this.scheduledWriteTimeout = (ScheduledRunnable)Utils.nullify((Object)this.scheduledWriteTimeout, ScheduledRunnable::cancel);
        if (this.writeEndOfStream) {
            if (this.readEndOfStream) {
                this.doClose();
            } else {
                this.channel.shutdownOutput();
            }
        }
    }

    public void closeEx(Exception e) {
        Reactive.checkInReactorThread((Reactive)this);
        if (this.isClosed()) {
            return;
        }
        this.doClose();
        this.readBuf = (ByteBuf)Utils.nullify((Object)this.readBuf, ByteBuf::recycle);
        this.writeBuf = (ByteBuf)Utils.nullify((Object)this.writeBuf, ByteBuf::recycle);
        this.scheduledReadTimeout = (ScheduledRunnable)Utils.nullify((Object)this.scheduledReadTimeout, ScheduledRunnable::cancel);
        this.scheduledWriteTimeout = (ScheduledRunnable)Utils.nullify((Object)this.scheduledWriteTimeout, ScheduledRunnable::cancel);
        this.read = (SettablePromise)Utils.nullify(this.read, SettablePromise::setException, (Object)e);
        this.write = (SettablePromise)Utils.nullify(this.write, SettablePromise::setException, (Object)e);
    }

    private void doClose() {
        this.reactor.closeChannel((SelectableChannel)this.channel, this.key);
        this.channel = null;
        CONNECTION_COUNT.decrementAndGet();
        if (this.inspector != null) {
            this.inspector.onDisconnect(this);
        }
    }

    @Override
    public boolean isClosed() {
        return this.channel == null;
    }

    @Nullable
    public SocketChannel getSocketChannel() {
        return this.channel;
    }

    public String toString() {
        return "TcpSocket{channel=" + (this.channel != null ? this.channel : "") + ", readBuf=" + this.readBuf + ", writeBuf=" + this.writeBuf + ", readEndOfStream=" + this.readEndOfStream + ", writeEndOfStream=" + this.writeEndOfStream + ", read=" + this.read + ", write=" + this.write + ", ops=" + this.ops + "}";
    }

    public static interface Inspector
    extends BaseInspector<Inspector> {
        public void onConnect(TcpSocket var1);

        public void onReadTimeout(TcpSocket var1);

        public void onRead(TcpSocket var1, ByteBuf var2);

        public void onReadEndOfStream(TcpSocket var1);

        public void onReadError(TcpSocket var1, IOException var2);

        public void onWriteTimeout(TcpSocket var1);

        public void onWrite(TcpSocket var1, ByteBuf var2, int var3);

        public void onWriteError(TcpSocket var1, IOException var2);

        public void onDisconnect(TcpSocket var1);
    }

    public static class JmxInspector
    extends AbstractInspector<Inspector>
    implements Inspector {
        public static final Duration SMOOTHING_WINDOW = Duration.ofMinutes(1L);
        private final EventStats connects = EventStats.create((Duration)SMOOTHING_WINDOW);
        private final ValueStats reads = (ValueStats)ValueStats.builder((Duration)SMOOTHING_WINDOW).withUnit("bytes").withRate().build();
        private final EventStats readEndOfStreams = EventStats.create((Duration)SMOOTHING_WINDOW);
        private final ExceptionStats readErrors = ExceptionStats.create();
        private final EventStats readTimeouts = EventStats.create((Duration)SMOOTHING_WINDOW);
        private final ValueStats writes = (ValueStats)ValueStats.builder((Duration)SMOOTHING_WINDOW).withUnit("bytes").withRate().build();
        private final ExceptionStats writeErrors = ExceptionStats.create();
        private final EventStats writeTimeouts = EventStats.create((Duration)SMOOTHING_WINDOW);
        private final EventStats writeOverloaded = EventStats.create((Duration)SMOOTHING_WINDOW);
        private final EventStats disconnects = EventStats.create((Duration)SMOOTHING_WINDOW);

        @Override
        public void onConnect(TcpSocket socket) {
            this.connects.recordEvent();
        }

        @Override
        public void onReadTimeout(TcpSocket socket) {
            this.readTimeouts.recordEvent();
        }

        @Override
        public void onRead(TcpSocket socket, ByteBuf buf) {
            this.reads.recordValue((long)buf.readRemaining());
        }

        @Override
        public void onReadEndOfStream(TcpSocket socket) {
            this.readEndOfStreams.recordEvent();
        }

        @Override
        public void onReadError(TcpSocket socket, IOException e) {
            this.readErrors.recordException((Throwable)e, (Object)socket.getRemoteAddress());
        }

        @Override
        public void onWriteTimeout(TcpSocket socket) {
            this.writeTimeouts.recordEvent();
        }

        @Override
        public void onWrite(TcpSocket socket, ByteBuf buf, int bytes) {
            this.writes.recordValue((long)bytes);
            if (buf.readRemaining() != bytes) {
                this.writeOverloaded.recordEvent();
            }
        }

        @Override
        public void onWriteError(TcpSocket socket, IOException e) {
            this.writeErrors.recordException((Throwable)e, (Object)socket.getRemoteAddress());
        }

        @Override
        public void onDisconnect(TcpSocket socket) {
            this.disconnects.recordEvent();
        }

        @JmxAttribute
        public EventStats getReadTimeouts() {
            return this.readTimeouts;
        }

        @JmxAttribute
        public ValueStats getReads() {
            return this.reads;
        }

        @JmxAttribute
        public EventStats getReadEndOfStreams() {
            return this.readEndOfStreams;
        }

        @JmxAttribute
        public ExceptionStats getReadErrors() {
            return this.readErrors;
        }

        @JmxAttribute
        public EventStats getWriteTimeouts() {
            return this.writeTimeouts;
        }

        @JmxAttribute
        public ValueStats getWrites() {
            return this.writes;
        }

        @JmxAttribute
        public ExceptionStats getWriteErrors() {
            return this.writeErrors;
        }

        @JmxAttribute
        public EventStats getWriteOverloaded() {
            return this.writeOverloaded;
        }

        @JmxAttribute
        public EventStats getConnects() {
            return this.connects;
        }

        @JmxAttribute
        public EventStats getDisconnects() {
            return this.disconnects;
        }

        @JmxAttribute
        public long getActiveSockets() {
            return this.connects.getTotalCount() - this.disconnects.getTotalCount();
        }
    }
}

