/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.listener.MarlinListener;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.GenericWorkerCoordinator;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinWorkerCoordinator
implements GenericWorkerCoordinator {
    private static final Logger log = LoggerFactory.getLogger(MarlinWorkerCoordinator.class);
    private MarlinListener<byte[], byte[]> joiner;
    private final String groupId;
    private final String restUrl;
    private final String syncTopic;
    private final String coordStream;
    private Long groupGenerationId;
    private static final String UNKNOWN_MEMBER_ID_STR = "";
    private String memberId;
    private KafkaConsumer<Long, byte[]> syncReceiver;
    private KafkaProducer<Long, byte[]> syncProducer;
    private static final long SYNC_POLL_TIMEOUT = 15000L;
    private final KafkaConfigStorage configStorage;
    private final WorkerRebalanceListener rebalanceCb;
    private ClusterConfigState configSnapshot;
    private ConnectProtocol.Assignment assignmentSnapshot;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    private boolean joinComplete;
    private boolean rejoinEvent;
    private boolean wakeupEvent;
    private boolean isRejoinRequested;
    private boolean needsJoinPrepare;
    private MarlinWorkerJoinCallback joinerCallback;
    private long backoffTimeMs;
    private static final int kMaxBackoffTimeMs = 900000;

    public MarlinWorkerCoordinator(DistributedConfig config, String groupId, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener rebalanceCb) {
        this.groupId = groupId;
        this.restUrl = restUrl;
        this.configStorage = configStorage;
        this.rebalanceCb = rebalanceCb;
        this.syncTopic = this.generateSyncTopic(this.groupId, config.originals());
        this.coordStream = this.generateCoordStream(config.originals());
        this.groupGenerationId = 0L;
        this.memberId = UNKNOWN_MEMBER_ID_STR;
        this.joiner = this.generateJoiner(groupId, this.coordStream);
        this.initSync(this.syncTopic);
        this.assignmentSnapshot = null;
        this.joinComplete = false;
        this.rejoinEvent = false;
        this.wakeupEvent = false;
        this.isRejoinRequested = false;
        this.needsJoinPrepare = false;
        this.joinerCallback = new MarlinWorkerJoinCallback();
        this.resetBackoff();
        log.debug("MarlinWorkerCoordinator constructor");
    }

    private MarlinListener<byte[], byte[]> generateJoiner(String groupId, String coordStream) {
        Properties props = new Properties();
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("streams.consumer.default.stream", coordStream);
        GenericHFactory configFactory = new GenericHFactory();
        ConsumerConfig config = (ConsumerConfig)configFactory.getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{props}, new Class[]{Map.class});
        MarlinListener<byte[], byte[]> marlinListener = new MarlinListener<byte[], byte[]>(config, null, null);
        return marlinListener;
    }

    private void initSync(String syncTopic) {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("auto.offset.reset", "latest");
        this.syncReceiver = new KafkaConsumer(props);
        this.syncProducer = new KafkaProducer(props);
        this.syncReceiver.subscribe(Arrays.asList(syncTopic));
    }

    public void ensureActiveGroup() {
        while (this.rejoinNeeded()) {
            this.resetRejoinFlags();
            if (this.needsJoinPrepare) {
                this.revokeAssignments();
                this.needsJoinPrepare = false;
            }
            Marlinserver.JoinGroupDesc desc = this.generateJoinDesc();
            this.joinComplete = false;
            Marlinserver.JoinGroupResponse resp = this.joiner.join(desc, this.joinerCallback);
            log.debug("ensureActiveGroup: joinStatus {}", (Object)resp.getJoinStatus());
            this.handleJoinGroupResponse(resp);
            this.waitForJoinOrRejoinEvent();
            if (!this.joinComplete) continue;
            this.doSync();
        }
    }

    private void handleJoinGroupResponse(Marlinserver.JoinGroupResponse resp) {
        switch (resp.getJoinStatus()) {
            case UNKNOWN_MEMBER_ID: {
                this.memberId = UNKNOWN_MEMBER_ID_STR;
                this.lock.lock();
                this.rejoinEvent = true;
                this.lock.unlock();
                break;
            }
            case STATUS_OK: {
                this.memberId = resp.getMemberId();
                this.resetBackoff();
                break;
            }
            case FUNCTION_UNAVAILABLE: {
                throw new BrokerNotAvailableException("Feature not available on server. Please upgrade to at least Version 5.2.1");
            }
            case STREAM_AUTHORIZATION_FAILED: {
                throw new AuthorizationException("Need produceperm and consumeperm permissions on stream " + this.coordStream);
            }
            case STREAM_UNAVAILABLE: {
                log.error("Could not open stream " + this.coordStream);
            }
            default: {
                log.error("Join Group request failed with {}. Retrying with exponential backoff", (Object)resp.getJoinStatus());
                this.backoff();
            }
        }
    }

    private void backoff() {
        this.backoffTimeMs = this.backoffTimeMs * 2L + 1000L < 900000L ? this.backoffTimeMs * 2L + 1000L : 900000L;
        Utils.sleep((long)this.backoffTimeMs);
    }

    private void resetBackoff() {
        this.backoffTimeMs = 0L;
    }

    private void revokeAssignments() {
        log.debug("Revoking previous assignment {}", (Object)this.assignmentSnapshot);
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.failed()) {
            this.rebalanceCb.onRevoked(this.assignmentSnapshot.leader(), (Collection)this.assignmentSnapshot.connectors(), (Collection)this.assignmentSnapshot.tasks());
        }
    }

    private void waitForJoinOrRejoinEvent() {
        log.debug("waitForJoinOrRejoinEvent: memberId {}start", (Object)this.memberId);
        try {
            this.lock.lock();
            while (!this.joinComplete && !this.rejoinEvent) {
                this.condition.await();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            log.debug("waitForJoinOrRejoinEvent: memberId {} interrupted", (Object)this.memberId);
        }
        finally {
            this.lock.unlock();
        }
        log.debug("waitForJoinOrRejoinEvent: memberId {} awoken. joinComplete {} rejoinEvent {}", new Object[]{this.memberId, this.joinComplete, this.rejoinEvent});
    }

    private void doSync() {
        int i = 0;
        while (true) {
            ConsumerRecords records = this.syncReceiver.poll(15000L);
            log.debug("doSync: memberId {} returned from poll {}", (Object)this.memberId, (Object)i);
            Long lastSeen = 0L;
            for (ConsumerRecord record : records) {
                Marlinserver.GroupAssignment ga;
                log.debug("doSync: consumer record..generation ID {}", record.key());
                lastSeen = (Long)record.key();
                if (!this.groupGenerationIdMatches(lastSeen)) continue;
                try {
                    ga = Marlinserver.GroupAssignment.parseFrom((byte[])((byte[])record.value()));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new KafkaException("Error parsing Sync response");
                }
                this.onSyncComplete(ga);
                return;
            }
            if (this.rejoinEventOccured()) {
                return;
            }
            ++i;
        }
    }

    private boolean groupGenerationIdMatches(Long lastSeen) {
        this.lock.lock();
        boolean matches = this.groupGenerationId.equals(lastSeen);
        this.lock.unlock();
        return matches;
    }

    private void onSyncComplete(Marlinserver.GroupAssignment ga) {
        log.debug("ga gen id {}", (Object)ga.getGroupGenerationId());
        for (Marlinserver.MemberState ms : ga.getMemberStateList()) {
            log.debug("ms id {} ", (Object)ms.getMemberId());
            if (this.memberId() != null) {
                log.debug("this id {}", (Object)this.memberId());
            }
            if (!ms.getMemberId().equals(this.memberId())) continue;
            this.assignmentSnapshot = ConnectProtocol.deserializeAssignment((ByteBuffer)ms.getMemberAssignment().asReadOnlyByteBuffer());
            this.rebalanceCb.onAssigned(this.assignmentSnapshot);
            this.needsJoinPrepare = true;
        }
    }

    public void requestRejoin() {
        log.debug("requestRejoin");
        this.lock.lock();
        this.isRejoinRequested = true;
        this.condition.signal();
        this.lock.unlock();
    }

    public String memberId() {
        return this.memberId;
    }

    public void close() {
        this.syncReceiver.close();
        this.syncProducer.close();
        this.joiner.close();
        this.assignmentSnapshot = null;
        this.lock.lock();
        this.isRejoinRequested = false;
        this.rejoinEvent = false;
        this.lock.unlock();
    }

    public void ensureCoordinatorKnown() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void poll(long timeout) throws WakeupException {
        try {
            log.debug("poll timeout {}", (Object)timeout);
            this.lock.lock();
            while (!(this.isRejoinRequested || this.rejoinEvent || this.wakeupEvent)) {
                boolean continueWaiting = this.condition.await(timeout, TimeUnit.MILLISECONDS);
                if (continueWaiting) continue;
                log.debug("MarlinWorkerCoordinator: poll time expired");
                return;
            }
            if (this.wakeupEvent) {
                this.wakeupEvent = false;
                throw new WakeupException();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            log.debug("exiting poll");
            this.lock.unlock();
        }
    }

    public void wakeup() {
        log.debug("wakeup: waking up");
        this.lock.lock();
        this.wakeupEvent = true;
        this.condition.signal();
        this.lock.unlock();
    }

    private String generateSyncTopic(String groupId, Map<String, ?> configs) {
        String streamName = this.generateCoordStream(configs);
        String topic = streamName + ":__mapr__" + groupId + "_assignment";
        return topic;
    }

    private String generateCoordStream(Map<String, ?> configs) {
        String configTopic = (String)configs.get("config.storage.topic");
        int idx = configTopic.lastIndexOf(58);
        String streamName = configTopic.substring(0, idx);
        return streamName;
    }

    private boolean rejoinNeeded() {
        this.lock.lock();
        boolean isRejoinNeeded = this.isRejoinRequested || this.rejoinEvent || this.assignmentSnapshot == null || this.assignmentSnapshot.failed();
        this.lock.unlock();
        log.debug("isRejoinRequested {} rejoinEvent {} assignmentSnapshot {}", new Object[]{this.isRejoinRequested, this.rejoinEvent, this.assignmentSnapshot});
        if (this.assignmentSnapshot != null) {
            log.debug("assignmentSnapshot.failed {}", (Object)this.assignmentSnapshot.failed());
        }
        return isRejoinNeeded;
    }

    private void resetRejoinFlags() {
        this.lock.lock();
        this.rejoinEvent = false;
        this.isRejoinRequested = false;
        this.lock.unlock();
    }

    private boolean rejoinEventOccured() {
        this.lock.lock();
        boolean eventOccured = this.rejoinEvent;
        this.lock.unlock();
        return eventOccured;
    }

    private Marlinserver.JoinGroupDesc generateJoinDesc() {
        this.configSnapshot = this.configStorage.snapshot();
        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(this.restUrl, this.configSnapshot.offset());
        ByteBuffer metadata = ConnectProtocol.serializeMetadata((ConnectProtocol.WorkerState)workerState);
        Marlinserver.JoinGroupDesc desc = Marlinserver.JoinGroupDesc.newBuilder().setProtocolType("connect").setMemberId(this.memberId).addMemberProtocols(Marlinserver.MemberProtocol.newBuilder().setProtocol("default").setMemberMetadata(ByteString.copyFrom((ByteBuffer)metadata)).build()).build();
        return desc;
    }

    public class MarlinWorkerJoinCallback
    implements MarlinListener.MarlinJoinCallback {
        @Override
        public void onJoin(Marlinserver.JoinGroupInfo jgi) {
            String leaderId = jgi.getGroupLeaderId();
            log.debug("onJoin: memberId {} leaderId {}", (Object)MarlinWorkerCoordinator.this.memberId(), (Object)leaderId);
            if (leaderId.equals(MarlinWorkerCoordinator.this.memberId())) {
                try {
                    Map<String, ByteBuffer> assignments = this.performAssignment(leaderId, jgi.getMembersList());
                    Marlinserver.GroupAssignment.Builder gaBuilder = Marlinserver.GroupAssignment.newBuilder().setGroupGenerationId(jgi.getGroupGenerationId());
                    for (Map.Entry<String, ByteBuffer> e : assignments.entrySet()) {
                        log.debug("setting memberstate member id to {}", (Object)e.getKey());
                        Marlinserver.MemberState ms = Marlinserver.MemberState.newBuilder().setMemberId(e.getKey()).setMemberAssignment(ByteString.copyFrom((ByteBuffer)e.getValue())).build();
                        gaBuilder.addMemberState(ms);
                    }
                    Marlinserver.GroupAssignment ga = gaBuilder.build();
                    log.debug("onJoin: memberId {} producing assignment for generation {}", (Object)MarlinWorkerCoordinator.this.memberId(), (Object)jgi.getGroupGenerationId());
                    MarlinWorkerCoordinator.this.syncProducer.send(new ProducerRecord(MarlinWorkerCoordinator.this.syncTopic, (Object)jgi.getGroupGenerationId(), (Object)ga.toByteArray()));
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
            MarlinWorkerCoordinator.this.lock.lock();
            MarlinWorkerCoordinator.this.groupGenerationId = jgi.getGroupGenerationId();
            MarlinWorkerCoordinator.this.joinComplete = true;
            MarlinWorkerCoordinator.this.condition.signal();
            MarlinWorkerCoordinator.this.lock.unlock();
        }

        @Override
        public void onRejoin(Marlinserver.JoinGroupInfo jgi) {
            log.debug("onRejoin {}", (Object)MarlinWorkerCoordinator.this.memberId());
            MarlinWorkerCoordinator.this.lock.lock();
            MarlinWorkerCoordinator.this.rejoinEvent = true;
            MarlinWorkerCoordinator.this.condition.signal();
            MarlinWorkerCoordinator.this.lock.unlock();
        }

        private Map<String, ByteBuffer> performAssignment(String leaderId, List<Marlinserver.Member> members) {
            HashMap<String, ConnectProtocol.WorkerState> wsMap = new HashMap<String, ConnectProtocol.WorkerState>();
            for (Marlinserver.Member member : members) {
                wsMap.put(member.getMemberId(), ConnectProtocol.deserializeMetadata((ByteBuffer)member.getMemberMetadata().asReadOnlyByteBuffer()));
            }
            long maxOffset = this.findMaxMemberConfigOffset(wsMap);
            Long leaderOffset = this.ensureLeaderConfig(maxOffset);
            if (leaderOffset == null) {
                return this.fillAssignmentsAndSerialize(wsMap.keySet(), (short)1, leaderId, ((ConnectProtocol.WorkerState)wsMap.get(leaderId)).url(), maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
            }
            return this.performTaskAssignment(leaderId, leaderOffset, wsMap);
        }

        private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> allConfigs) {
            Long maxOffset = null;
            for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : allConfigs.entrySet()) {
                long memberRootOffset = stateEntry.getValue().offset();
                if (maxOffset == null) {
                    maxOffset = memberRootOffset;
                    continue;
                }
                maxOffset = Math.max(maxOffset, memberRootOffset);
            }
            log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, (Object)MarlinWorkerCoordinator.this.configSnapshot.offset());
            return maxOffset;
        }

        private Long ensureLeaderConfig(long maxOffset) {
            if (MarlinWorkerCoordinator.this.configSnapshot.offset() < maxOffset) {
                ClusterConfigState updatedSnapshot = MarlinWorkerCoordinator.this.configStorage.snapshot();
                if (updatedSnapshot.offset() < maxOffset) {
                    log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                    return null;
                }
                MarlinWorkerCoordinator.this.configSnapshot = updatedSnapshot;
                return MarlinWorkerCoordinator.this.configSnapshot.offset();
            }
            return maxOffset;
        }

        private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {
            HashMap<String, List<String>> connectorAssignments = new HashMap<String, List<String>>();
            HashMap<String, List<ConnectorTaskId>> taskAssignments = new HashMap<String, List<ConnectorTaskId>>();
            CircularIterator memberIt = new CircularIterator(Utils.sorted(allConfigs.keySet()));
            for (String connectorId : Utils.sorted((Collection)MarlinWorkerCoordinator.this.configSnapshot.connectors())) {
                String connectorAssignedTo = (String)memberIt.next();
                log.trace("Assigning connector {} to {}", (Object)connectorId, (Object)connectorAssignedTo);
                ArrayList<String> memberConnectors = (ArrayList<String>)connectorAssignments.get(connectorAssignedTo);
                if (memberConnectors == null) {
                    memberConnectors = new ArrayList<String>();
                    connectorAssignments.put(connectorAssignedTo, memberConnectors);
                }
                memberConnectors.add(connectorId);
                for (ConnectorTaskId taskId : Utils.sorted((Collection)MarlinWorkerCoordinator.this.configSnapshot.tasks(connectorId))) {
                    String taskAssignedTo = (String)memberIt.next();
                    log.trace("Assigning task {} to {}", (Object)taskId, (Object)taskAssignedTo);
                    ArrayList<ConnectorTaskId> memberTasks = (ArrayList<ConnectorTaskId>)taskAssignments.get(taskAssignedTo);
                    if (memberTasks == null) {
                        memberTasks = new ArrayList<ConnectorTaskId>();
                        taskAssignments.put(taskAssignedTo, memberTasks);
                    }
                    memberTasks.add(taskId);
                }
            }
            return this.fillAssignmentsAndSerialize(allConfigs.keySet(), (short)0, leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
        }

        private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, Map<String, List<String>> connectorAssignments, Map<String, List<ConnectorTaskId>> taskAssignments) {
            HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
            for (String member : members) {
                List<Object> tasks;
                List<String> connectors = connectorAssignments.get(member);
                if (connectors == null) {
                    connectors = Collections.emptyList();
                }
                if ((tasks = taskAssignments.get(member)) == null) {
                    tasks = Collections.emptyList();
                }
                ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
                log.debug("Assignment: {} -> {}", (Object)member, (Object)assignment);
                groupAssignment.put(member, ConnectProtocol.serializeAssignment((ConnectProtocol.Assignment)assignment));
            }
            log.debug("Finished assignment");
            return groupAssignment;
        }
    }
}

