/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.vitess.connection;

import binlogdata.Binlogdata;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.connection.MessageDecoder;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
import io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.vitess.client.Proto;
import io.vitess.client.grpc.StaticAuthCredentials;
import io.vitess.proto.Topodata;
import io.vitess.proto.Vtgate;
import io.vitess.proto.grpc.VitessGrpc;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VitessReplicationConnection
implements ReplicationConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessReplicationConnection.class);
    private final MessageDecoder messageDecoder;
    private final VitessConnectorConfig config;
    private final AtomicReference<ManagedChannel> managedChannel = new AtomicReference();

    public VitessReplicationConnection(VitessConnectorConfig config, VitessDatabaseSchema schema) {
        this.messageDecoder = new VStreamOutputMessageDecoder(schema);
        this.config = config;
    }

    public void execute(String sqlStatement) {
        ManagedChannel channel = this.newChannel(this.config.getVtgateHost(), this.config.getVtgatePort(), this.config.getGrpcMaxInboundMessageSize());
        this.managedChannel.compareAndSet(null, channel);
        Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder().setQuery(Proto.bindQuery((String)sqlStatement, Collections.emptyMap())).build();
        this.newBlockingStub(channel).execute(request);
    }

    @Override
    public void startStreaming(Vgtid vgtid, final ReplicationMessageProcessor processor, final AtomicReference<Throwable> error) {
        Objects.requireNonNull(vgtid);
        ManagedChannel channel = this.newChannel(this.config.getVtgateHost(), this.config.getVtgatePort(), this.config.getGrpcMaxInboundMessageSize());
        this.managedChannel.compareAndSet(null, channel);
        VitessGrpc.VitessStub stub = this.newStub(channel);
        Map<String, String> grpcHeaders = this.config.getGrpcHeaders();
        if (!grpcHeaders.isEmpty()) {
            LOGGER.info("Setting VStream gRPC headers: {}", grpcHeaders);
            Metadata metadata = new Metadata();
            for (Map.Entry<String, String> entry : grpcHeaders.entrySet()) {
                metadata.put(Metadata.Key.of((String)entry.getKey(), (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER), (Object)entry.getValue());
            }
            stub = (VitessGrpc.VitessStub)MetadataUtils.attachHeaders((AbstractStub)stub, (Metadata)metadata);
        }
        StreamObserver<Vtgate.VStreamResponse> responseObserver = new StreamObserver<Vtgate.VStreamResponse>(){

            public void onNext(Vtgate.VStreamResponse response) {
                LOGGER.debug("Received {} VEvents in the VStreamResponse:", (Object)response.getEventsCount());
                for (Binlogdata.VEvent vEvent : response.getEventsList()) {
                    LOGGER.debug("VEvent: {}", (Object)vEvent);
                }
                Vgtid newVgtid = this.getVgtid(response);
                int numOfRowEvents = this.getNumOfRowEvents(response);
                try {
                    int rowEventSeen = 0;
                    for (int i = 0; i < response.getEventsCount(); ++i) {
                        Binlogdata.VEvent vEvent = response.getEvents(i);
                        if (vEvent.getType() == Binlogdata.VEventType.ROW) {
                            ++rowEventSeen;
                        }
                        boolean isLastRowEventOfTransaction = newVgtid != null && numOfRowEvents != 0 && rowEventSeen == numOfRowEvents;
                        VitessReplicationConnection.this.messageDecoder.processMessage(response.getEvents(i), processor, newVgtid, isLastRowEventOfTransaction);
                    }
                }
                catch (InterruptedException e) {
                    LOGGER.error("Message processing is interrupted", (Throwable)e);
                    error.compareAndSet(null, e);
                    Thread.currentThread().interrupt();
                }
            }

            public void onError(Throwable t) {
                LOGGER.info("VStream streaming onError. Status: " + Status.fromThrowable((Throwable)t), t);
                error.compareAndSet(null, t);
            }

            public void onCompleted() {
                LOGGER.info("VStream streaming completed.");
            }

            private Vgtid getVgtid(Vtgate.VStreamResponse response) {
                LinkedList<Vgtid> vgtids = new LinkedList<Vgtid>();
                for (Binlogdata.VEvent vEvent : response.getEventsList()) {
                    if (vEvent.getType() != Binlogdata.VEventType.VGTID) continue;
                    vgtids.addLast(Vgtid.of(vEvent.getVgtid()));
                }
                if (vgtids.size() == 0) {
                    LOGGER.trace("No vgtid found in response {}...", (Object)response.toString().substring(0, Math.min(100, response.toString().length())));
                    LOGGER.debug("Full response is {}", (Object)response);
                    return null;
                }
                if (vgtids.size() > 1) {
                    LOGGER.error("Should only have 1 vgtid per VStreamResponse, but found {}. Use the last vgtid {}.", (Object)vgtids.size(), vgtids.getLast());
                }
                return (Vgtid)vgtids.getLast();
            }

            private int getNumOfRowEvents(Vtgate.VStreamResponse response) {
                int num = 0;
                for (Binlogdata.VEvent vEvent : response.getEventsList()) {
                    if (vEvent.getType() != Binlogdata.VEventType.ROW) continue;
                    ++num;
                }
                return num;
            }
        };
        Vtgate.VStreamFlags vStreamFlags = Vtgate.VStreamFlags.newBuilder().setStopOnReshard(this.config.getStopOnReshard()).build();
        stub.vStream(Vtgate.VStreamRequest.newBuilder().setVgtid(vgtid.getRawVgtid()).setTabletType(VitessReplicationConnection.toTopodataTabletType(VitessTabletType.valueOf(this.config.getTabletType()))).setFlags(vStreamFlags).build(), (StreamObserver)responseObserver);
        LOGGER.info("Started VStream");
    }

    private VitessGrpc.VitessStub newStub(ManagedChannel channel) {
        VitessGrpc.VitessStub stub = VitessGrpc.newStub((Channel)channel);
        return this.withCredentials(stub);
    }

    private VitessGrpc.VitessBlockingStub newBlockingStub(ManagedChannel channel) {
        VitessGrpc.VitessBlockingStub stub = VitessGrpc.newBlockingStub((Channel)channel);
        return this.withCredentials(stub);
    }

    private <T extends AbstractStub<T>> T withCredentials(T stub) {
        if (this.config.getVtgateUsername() != null && this.config.getVtgatePassword() != null) {
            LOGGER.info("Use authenticated vtgate grpc.");
            stub = stub.withCallCredentials((CallCredentials)new StaticAuthCredentials(this.config.getVtgateUsername(), this.config.getVtgatePassword()));
        }
        return stub;
    }

    private ManagedChannel newChannel(String vtgateHost, int vtgatePort, int maxInboundMessageSize) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress((String)vtgateHost, (int)vtgatePort).usePlaintext().maxInboundMessageSize(maxInboundMessageSize).keepAliveTime(this.config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS).build();
        return channel;
    }

    @Override
    public void close() throws Exception {
        LOGGER.info("Closing replication connection");
        this.managedChannel.get().shutdownNow();
        LOGGER.trace("VStream GRPC channel shutdownNow is invoked.");
        if (this.managedChannel.get().awaitTermination(5L, TimeUnit.SECONDS)) {
            LOGGER.info("VStream GRPC channel is shutdown in time.");
        } else {
            LOGGER.warn("VStream GRPC channel is not shutdown in time. Give up waiting.");
        }
    }

    public static Vgtid defaultVgtid(VitessConnectorConfig config) {
        if (config.getShard() == null || config.getShard().isEmpty()) {
            Vgtid gtid = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(config.getKeyspace()).setGtid("current").build()).build());
            LOGGER.info("Default VGTID '{}' is set to the current gtid of all shards from keyspace: {}", (Object)gtid, (Object)config.getKeyspace());
            return gtid;
        }
        String shardGtid = config.getGtid();
        String shard = config.getShard();
        String keyspace = config.getKeyspace();
        Vgtid gtid = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(keyspace).setShard(shard).setGtid(shardGtid).build()).build());
        LOGGER.info("VGTID '{}' is set to the GTID {} for keyspace: {} shard: {}", new Object[]{gtid, shardGtid, config.getKeyspace(), shard});
        return gtid;
    }

    public String connectionString() {
        return String.format("vtgate gRPC connection %s:%s", this.config.getVtgateHost(), this.config.getVtgatePort());
    }

    public String username() {
        return this.config.getVtgateUsername();
    }

    private static Topodata.TabletType toTopodataTabletType(VitessTabletType tabletType) {
        switch (tabletType) {
            case MASTER: {
                return Topodata.TabletType.MASTER;
            }
            case REPLICA: {
                return Topodata.TabletType.REPLICA;
            }
            case RDONLY: {
                return Topodata.TabletType.RDONLY;
            }
        }
        LOGGER.warn("Unknown tabletType {}", (Object)tabletType);
        return null;
    }
}

