/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.event.CouchbaseEvent;
import com.couchbase.client.core.event.EventBus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.conductor.NotMyVbucketException;
import com.couchbase.client.dcp.error.RollbackException;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpCloseStreamResponse;
import com.couchbase.client.dcp.message.DcpFailoverLogResponse;
import com.couchbase.client.dcp.message.DcpGetPartitionSeqnosResponse;
import com.couchbase.client.dcp.message.DcpOpenStreamResponse;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.RollbackMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.util.concurrent.Promise;

public class DcpChannelControlHandler
implements ControlEventHandler {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(DcpChannelControlHandler.class);
    private final DcpChannel dcpChannel;
    private final ControlEventHandler controlEventHandler;
    private final EventBus eventBus;

    public DcpChannelControlHandler(DcpChannel dcpChannel) {
        this.dcpChannel = dcpChannel;
        this.controlEventHandler = dcpChannel.env.controlEventHandler();
        this.eventBus = dcpChannel.env.eventBus();
    }

    @Override
    public void onEvent(ChannelFlowController flowController, ByteBuf buf) {
        if (DcpOpenStreamResponse.is(buf)) {
            this.filterOpenStreamResponse(flowController, buf);
        } else if (DcpFailoverLogResponse.is(buf)) {
            this.filterFailoverLogResponse(buf);
        } else if (DcpStreamEndMessage.is(buf)) {
            this.filterDcpStreamEndMessage(flowController, buf);
        } else if (DcpCloseStreamResponse.is(buf)) {
            this.filterDcpCloseStreamResponse(flowController, buf);
        } else if (DcpGetPartitionSeqnosResponse.is(buf)) {
            this.filterDcpGetPartitionSeqnosResponse(buf);
        } else {
            this.controlEventHandler.onEvent(flowController, buf);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean filterOpenStreamResponse(ChannelFlowController flowController, ByteBuf buf) {
        try {
            Promise<?> promise = this.dcpChannel.outstandingPromises.remove(MessageUtil.getOpaque(buf));
            short vbid = this.dcpChannel.outstandingVbucketInfos.remove(MessageUtil.getOpaque(buf));
            short status = MessageUtil.getStatus(buf);
            switch (status) {
                case 0: {
                    promise.setSuccess(null);
                    ByteBuf flog = Unpooled.buffer();
                    DcpFailoverLogResponse.init(flog);
                    DcpFailoverLogResponse.vbucket(flog, DcpOpenStreamResponse.vbucket(buf));
                    ByteBuf content = MessageUtil.getContent(buf).copy().writeShort((int)vbid);
                    MessageUtil.setContent(content, flog);
                    content.release();
                    this.controlEventHandler.onEvent(flowController, flog);
                    break;
                }
                case 35: {
                    promise.setFailure((Throwable)((Object)new RollbackException()));
                    ByteBuf rb = Unpooled.buffer();
                    RollbackMessage.init(rb, vbid, DcpOpenStreamResponse.rollbackSeqno(buf));
                    this.controlEventHandler.onEvent(flowController, rb);
                    break;
                }
                case 7: {
                    promise.setFailure((Throwable)((Object)new NotMyVbucketException()));
                    break;
                }
                default: {
                    promise.setFailure((Throwable)new IllegalStateException("Unhandled Status: " + status));
                }
            }
            boolean bl = false;
            return bl;
        }
        finally {
            buf.release();
        }
    }

    private void filterDcpGetPartitionSeqnosResponse(ByteBuf buf) {
        try {
            Promise<?> promise = this.dcpChannel.outstandingPromises.remove(MessageUtil.getOpaque(buf));
            promise.setSuccess((Object)MessageUtil.getContent(buf).copy());
        }
        finally {
            buf.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filterFailoverLogResponse(ByteBuf buf) {
        try {
            Promise<?> promise = this.dcpChannel.outstandingPromises.remove(MessageUtil.getOpaque(buf));
            short vbid = this.dcpChannel.outstandingVbucketInfos.remove(MessageUtil.getOpaque(buf));
            ByteBuf flog = Unpooled.buffer();
            DcpFailoverLogResponse.init(flog);
            DcpFailoverLogResponse.vbucket(flog, DcpFailoverLogResponse.vbucket(buf));
            ByteBuf copiedBuf = MessageUtil.getContent(buf).copy().writeShort((int)vbid);
            MessageUtil.setContent(copiedBuf, flog);
            copiedBuf.release();
            promise.setSuccess((Object)flog);
        }
        finally {
            buf.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filterDcpStreamEndMessage(ChannelFlowController flowController, ByteBuf buf) {
        try {
            short vbid = DcpStreamEndMessage.vbucket(buf);
            StreamEndReason reason = DcpStreamEndMessage.reason(buf);
            LOGGER.debug("Server closed Stream on vbid {} with reason {}", (Object)vbid, (Object)reason);
            if (this.eventBus != null) {
                this.eventBus.publish((CouchbaseEvent)new StreamEndEvent(vbid, reason));
            }
            this.dcpChannel.openStreams.set(vbid, 0);
            this.dcpChannel.conductor.maybeMovePartition(vbid);
            flowController.ack(buf);
        }
        finally {
            buf.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void filterDcpCloseStreamResponse(ChannelFlowController flowController, ByteBuf buf) {
        try {
            Promise<?> promise = this.dcpChannel.outstandingPromises.remove(MessageUtil.getOpaque(buf));
            promise.setSuccess(null);
            flowController.ack(buf);
        }
        finally {
            buf.release();
        }
    }
}

