/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.platform.http.vertx;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncInputStream
implements ReadStream<Buffer> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class);
    private static final int DEFAULT_BUFFER_SIZE = 4096;
    private final ReadableByteChannel channel;
    private final Vertx vertx;
    private final Context context;
    private final InboundBuffer<Buffer> queue;
    private long readPos;
    private boolean closed;
    private boolean readInProgress;
    private Handler<Buffer> dataHandler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;

    public AsyncInputStream(Vertx vertx, Context context, InputStream inputStream) {
        this.vertx = vertx;
        this.context = context;
        this.channel = Channels.newChannel(inputStream);
        this.queue = new InboundBuffer(context, 0L);
        this.queue.handler(buffer -> {
            if (buffer.length() > 0) {
                this.handleData((Buffer)buffer);
            } else {
                this.handleEnd();
            }
        });
        this.queue.drainHandler(v -> this.doRead());
    }

    public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
        this.checkStreamClosed();
        this.endHandler = endHandler;
        return this;
    }

    public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.checkStreamClosed();
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
        this.checkStreamClosed();
        this.dataHandler = handler;
        if (this.dataHandler != null && !this.closed) {
            this.doRead();
        } else {
            this.queue.clear();
        }
        return this;
    }

    public synchronized AsyncInputStream pause() {
        this.checkStreamClosed();
        this.queue.pause();
        return this;
    }

    public synchronized AsyncInputStream resume() {
        this.checkStreamClosed();
        this.queue.resume();
        return this;
    }

    public ReadStream<Buffer> fetch(long amount) {
        this.checkStreamClosed();
        this.queue.fetch(amount);
        return this;
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        this.closeInternal(handler);
    }

    private void checkStreamClosed() {
        if (this.closed) {
            throw new IllegalStateException("Stream closed");
        }
    }

    private void checkContext() {
        Context contextToCheck = this.vertx.getOrCreateContext();
        if (!contextToCheck.equals((Object)this.context)) {
            throw new IllegalStateException("AsyncInputStream must only be used in the context that created it, expected: " + String.valueOf(this.context) + " actual " + String.valueOf(contextToCheck));
        }
    }

    private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
        this.closed = true;
        this.doClose(handler);
    }

    private void doClose(Handler<AsyncResult<Void>> handler) {
        block3: {
            try {
                this.channel.close();
                if (handler != null) {
                    this.vertx.runOnContext(v -> handler.handle((Object)Future.succeededFuture()));
                }
            }
            catch (IOException e) {
                if (handler == null) break block3;
                this.vertx.runOnContext(v -> handler.handle((Object)Future.failedFuture((Throwable)e)));
            }
        }
    }

    private void doRead() {
        this.checkStreamClosed();
        this.doRead(ByteBuffer.allocate(4096));
    }

    private synchronized void doRead(ByteBuffer buffer) {
        if (!this.readInProgress) {
            this.readInProgress = true;
            Buffer buff = Buffer.buffer((int)4096);
            this.doRead(buff, 0, buffer, this.readPos, (Handler<AsyncResult<Buffer>>)((Handler)result -> {
                if (result.succeeded()) {
                    this.readInProgress = false;
                    Buffer updatedBuffer = (Buffer)result.result();
                    this.readPos += (long)updatedBuffer.length();
                    if (this.queue.write((Object)updatedBuffer) && updatedBuffer.length() > 0) {
                        this.doRead(buffer);
                    }
                } else {
                    this.handleException(result.cause());
                }
            }));
        }
    }

    private void doRead(Buffer writeBuff, int offset, ByteBuffer buffer, long position, Handler<AsyncResult<Buffer>> handler) {
        this.vertx.executeBlocking(() -> this.channel.read(buffer)).onComplete(result -> {
            if (result.succeeded()) {
                Integer bytesRead = (Integer)result.result();
                if (bytesRead == -1) {
                    this.context.runOnContext(v -> {
                        buffer.flip();
                        writeBuff.setBytes(offset, buffer);
                        buffer.compact();
                        handler.handle((Object)Future.succeededFuture((Object)writeBuff));
                    });
                } else if (buffer.hasRemaining()) {
                    this.context.runOnContext(v -> this.doRead(writeBuff, offset, buffer, position + (long)bytesRead.intValue(), handler));
                } else {
                    this.context.runOnContext(v -> {
                        buffer.flip();
                        writeBuff.setBytes(offset, buffer);
                        buffer.compact();
                        handler.handle((Object)Future.succeededFuture((Object)writeBuff));
                    });
                }
            } else {
                this.context.runOnContext(v -> handler.handle((Object)Future.failedFuture((Throwable)result.cause())));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleData(Buffer buffer) {
        Handler<Buffer> handler;
        AsyncInputStream asyncInputStream = this;
        synchronized (asyncInputStream) {
            handler = this.dataHandler;
        }
        if (handler != null) {
            this.checkContext();
            handler.handle((Object)buffer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void handleEnd() {
        Handler<Void> endHandler;
        AsyncInputStream asyncInputStream = this;
        synchronized (asyncInputStream) {
            this.dataHandler = null;
            endHandler = this.endHandler;
        }
        if (endHandler != null) {
            this.checkContext();
            endHandler.handle(null);
        }
    }

    private void handleException(Throwable t) {
        if (this.exceptionHandler != null && t instanceof Exception) {
            this.exceptionHandler.handle((Object)t);
        } else if (LOG.isErrorEnabled()) {
            LOG.error("Unhandled error while processing stream", t);
        }
    }
}

