/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote;

import com.sun.jersey.api.client.ClientHandlerException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException;
import org.apache.nifi.cluster.ClusterNodeInformation;
import org.apache.nifi.cluster.NodeInformation;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.AbstractCommunicationsSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardRemoteGroupPort
extends AbstractPort
implements RemoteGroupPort {
    public static final String USER_AGENT = "NiFi-Site-to-Site";
    public static final String CONTENT_TYPE = "application/octet-stream";
    public static final int GZIP_COMPRESSION_LEVEL = 1;
    public static final long PEER_REFRESH_PERIOD = 60000L;
    private static final String CATEGORY = "Site to Site";
    private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
    private final RemoteProcessGroup remoteGroup;
    private final SSLContext sslContext;
    private final AtomicBoolean useCompression = new AtomicBoolean(false);
    private final AtomicBoolean targetExists = new AtomicBoolean(true);
    private final AtomicBoolean targetRunning = new AtomicBoolean(true);
    private final AtomicLong peerIndex = new AtomicLong(0L);
    private volatile List<PeerStatus> peerStatuses;
    private volatile long peerRefreshTime = 0L;
    private final ReentrantLock peerRefreshLock = new ReentrantLock();
    private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<String, BlockingQueue<EndpointConnectionState>>();
    private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<PeerStatus, Long>();
    private final Set<CommunicationsSession> activeCommsChannels = new HashSet<CommunicationsSession>();
    private final Lock interruptLock = new ReentrantLock();
    private boolean shutdown = false;

    public StandardRemoteGroupPort(String id, String name, ProcessGroup processGroup, RemoteProcessGroup remoteGroup, TransferDirection direction, ConnectableType type, SSLContext sslContext, ProcessScheduler scheduler) {
        super(id, name, processGroup, type, scheduler);
        this.remoteGroup = remoteGroup;
        this.sslContext = sslContext;
        this.setScheduldingPeriod("30000 nanos");
    }

    public boolean isTargetRunning() {
        return this.targetRunning.get();
    }

    public void setTargetRunning(boolean targetRunning) {
        this.targetRunning.set(targetRunning);
    }

    public boolean isTriggerWhenEmpty() {
        return this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        super.shutdown();
        this.peerTimeoutExpirations.clear();
        this.interruptLock.lock();
        try {
            this.shutdown = true;
            for (CommunicationsSession commsSession : this.activeCommsChannels) {
                commsSession.interrupt();
            }
            for (BlockingQueue queue : this.endpointConnectionMap.values()) {
                EndpointConnectionState state;
                while ((state = (EndpointConnectionState)queue.poll()) != null) {
                    this.cleanup(state.getSocketClientProtocol(), state.getPeer());
                }
            }
            this.endpointConnectionMap.clear();
        }
        finally {
            this.interruptLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onSchedulingStart() {
        super.onSchedulingStart();
        this.interruptLock.lock();
        try {
            this.shutdown = false;
        }
        finally {
            this.interruptLock.unlock();
        }
    }

    void cleanupSockets() {
        ArrayList<EndpointConnectionState> states = new ArrayList<EndpointConnectionState>();
        for (BlockingQueue queue : this.endpointConnectionMap.values()) {
            EndpointConnectionState state;
            states.clear();
            while ((state = (EndpointConnectionState)queue.poll()) != null) {
                long lastUsed = state.getLastTimeUsed();
                if (lastUsed < System.currentTimeMillis() - 10000L) {
                    try {
                        state.getSocketClientProtocol().shutdown(state.getPeer());
                    }
                    catch (Exception e) {
                        logger.debug("Failed to shut down {} using {} due to {}", new Object[]{state.getSocketClientProtocol(), state.getPeer(), e});
                    }
                    this.cleanup(state.getSocketClientProtocol(), state.getPeer());
                    continue;
                }
                states.add(state);
            }
            queue.addAll(states);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) {
        String message;
        EndpointConnectionState connectionState;
        BlockingQueue existingQueue;
        if (!this.remoteGroup.isTransmitting()) {
            logger.debug("{} {} is not transmitting; will not send/receive", (Object)this, (Object)this.remoteGroup);
            return;
        }
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) {
            logger.debug("{} No data to send", (Object)this);
            return;
        }
        String url = this.getRemoteProcessGroup().getTargetUri().toString();
        Peer peer = null;
        PeerStatus peerStatus = this.getNextPeerStatus();
        if (peerStatus == null) {
            logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", (Object)this);
            context.yield();
            return;
        }
        url = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
        BlockingQueue<EndpointConnectionState> connectionStateQueue = (LinkedBlockingQueue)this.endpointConnectionMap.get(url);
        if (connectionStateQueue == null && (existingQueue = (BlockingQueue)this.endpointConnectionMap.putIfAbsent(url, connectionStateQueue = new LinkedBlockingQueue())) != null) {
            connectionStateQueue = existingQueue;
        }
        FlowFileCodec codec = null;
        CommunicationsSession commsSession = null;
        SocketClientProtocol protocol = null;
        do {
            connectionState = (EndpointConnectionState)connectionStateQueue.poll();
            logger.debug("{} Connection State for {} = {}", new Object[]{this, url, connectionState});
            if (connectionState == null) {
                protocol = new SocketClientProtocol();
                protocol.setPort(this);
                try {
                    commsSession = this.establishSiteToSiteConnection(peerStatus);
                    DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
                    DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
                    try {
                        RemoteResourceFactory.initiateResourceNegotiation((VersionedRemoteResource)protocol, dis, dos);
                    }
                    catch (HandshakeException e) {
                        try {
                            commsSession.close();
                        }
                        catch (IOException ioe) {
                            String message2 = String.format("%s unable to close communications session %s due to %s; resources may not be appropriately cleaned up", new Object[]{this, commsSession, ioe.toString()});
                            logger.error(message2);
                            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message2);
                        }
                    }
                }
                catch (IOException e) {
                    message = String.format("%s failed to communicate with %s due to %s", new Object[]{this, peer == null ? url : peer, e.toString()});
                    logger.error(message);
                    if (logger.isDebugEnabled()) {
                        logger.error("", (Throwable)e);
                    }
                    this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
                    session.rollback();
                    return;
                }
                peer = new Peer(commsSession, url);
                try {
                    String message3;
                    protocol.handshake(peer);
                    if (protocol.isDestinationFull()) {
                        logger.warn("{} {} indicates that port's destination is full; penalizing peer", (Object)this, (Object)peer);
                        this.penalize(peer);
                        this.cleanup(protocol, peer);
                        return;
                    }
                    if (protocol.isPortInvalid()) {
                        this.penalize(peer);
                        context.yield();
                        this.cleanup(protocol, peer);
                        this.targetRunning.set(false);
                        message3 = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", new Object[]{this, peer});
                        logger.error(message3);
                        this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message3);
                        return;
                    }
                    if (protocol.isPortUnknown()) {
                        this.penalize(peer);
                        context.yield();
                        this.cleanup(protocol, peer);
                        this.targetExists.set(false);
                        message3 = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", new Object[]{this, peer});
                        logger.error(message3);
                        this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message3);
                        return;
                    }
                    codec = protocol.negotiateCodec(peer);
                }
                catch (Exception e) {
                    this.penalize(peer);
                    this.cleanup(protocol, peer);
                    message = String.format("%s failed to communicate with %s due to %s", new Object[]{this, peer == null ? url : peer, e.toString()});
                    logger.error(message);
                    if (logger.isDebugEnabled()) {
                        logger.error("", (Throwable)e);
                    }
                    this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
                    session.rollback();
                    return;
                }
                connectionState = new EndpointConnectionState(peer, protocol, codec);
                continue;
            }
            long lastTimeUsed = connectionState.getLastTimeUsed();
            long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
            long timeoutMillis = this.remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
            if (timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis) {
                this.cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
                connectionState = null;
                continue;
            }
            codec = connectionState.getCodec();
            peer = connectionState.getPeer();
            commsSession = peer.getCommunicationsSession();
            protocol = connectionState.getSocketClientProtocol();
        } while (connectionState == null || codec == null || commsSession == null || protocol == null);
        try {
            this.interruptLock.lock();
            try {
                if (this.shutdown) {
                    peer.getCommunicationsSession().interrupt();
                }
                this.activeCommsChannels.add(peer.getCommunicationsSession());
            }
            finally {
                this.interruptLock.unlock();
            }
            if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                this.transferFlowFiles(peer, protocol, context, session, codec);
            } else {
                this.receiveFlowFiles(peer, protocol, context, session, codec);
            }
            if (peer.isPenalized()) {
                logger.debug("{} {} was penalized", (Object)this, (Object)peer);
                this.penalize(peer);
            }
            this.interruptLock.lock();
            try {
                if (this.shutdown) {
                    peer.getCommunicationsSession().interrupt();
                }
                this.activeCommsChannels.remove(peer.getCommunicationsSession());
            }
            finally {
                this.interruptLock.unlock();
            }
            session.commit();
            connectionState.setLastTimeUsed();
            connectionStateQueue.add(connectionState);
        }
        catch (TransmissionDisabledException e) {
            this.cleanup(protocol, peer);
            session.rollback();
        }
        catch (Exception e) {
            this.penalize(peer);
            message = String.format("%s failed to communicate with %s (%s) due to %s", new Object[]{this, peer == null ? url : peer, protocol, e.toString()});
            logger.error(message);
            if (logger.isDebugEnabled()) {
                logger.error("", (Throwable)e);
            }
            this.cleanup(protocol, peer);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            session.rollback();
        }
    }

    private void penalize(Peer peer) {
        int port;
        String host;
        try {
            URI uri = new URI(peer.getUrl());
            host = uri.getHost();
            port = uri.getPort();
        }
        catch (URISyntaxException e) {
            host = peer.getHost();
            port = -1;
        }
        PeerStatus status = new PeerStatus(host, port, true, 1);
        Long expiration = (Long)this.peerTimeoutExpirations.get(status);
        if (expiration == null) {
            expiration = 0L;
        }
        long penalizationMillis = this.getYieldPeriod(TimeUnit.MILLISECONDS);
        long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
        this.peerTimeoutExpirations.put(status, newExpiration);
    }

    private void cleanup(SocketClientProtocol protocol, Peer peer) {
        if (protocol != null && peer != null) {
            try {
                protocol.shutdown(peer);
            }
            catch (TransmissionDisabledException e) {
                logger.debug((Object)((Object)this) + " Transmission Disabled by User");
            }
            catch (IOException e1) {
                // empty catch block
            }
        }
        if (peer != null) {
            try {
                peer.close();
            }
            catch (TransmissionDisabledException e) {
                logger.debug((Object)((Object)this) + " Transmission Disabled by User");
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public String getYieldPeriod() {
        return this.remoteGroup.getYieldDuration();
    }

    public CommunicationsSession establishSiteToSiteConnection(PeerStatus peerStatus) throws IOException {
        String destinationUri = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
        AbstractCommunicationsSession commsSession = null;
        try {
            if (peerStatus.isSecure()) {
                if (this.sslContext == null) {
                    throw new IOException("Unable to communicate with " + peerStatus.getHostname() + ":" + peerStatus.getPort() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
                }
                SSLSocketChannel socketChannel = new SSLSocketChannel(this.sslContext, peerStatus.getHostname(), peerStatus.getPort(), true);
                socketChannel.connect();
                commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
                try {
                    commsSession.setUserDn(socketChannel.getDn());
                }
                catch (CertificateExpiredException | CertificateNotYetValidException ex) {
                    throw new IOException(ex);
                }
            } else {
                SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(peerStatus.getHostname(), peerStatus.getPort()));
                commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
            }
            commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
            commsSession.setUri(destinationUri);
        }
        catch (IOException ioe) {
            if (commsSession != null) {
                commsSession.close();
            }
            throw ioe;
        }
        return commsSession;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PeerStatus getNextPeerStatus() {
        List<PeerStatus> peerList = this.peerStatuses;
        if ((peerList == null || peerList.isEmpty() || System.currentTimeMillis() > this.peerRefreshTime + 60000L) && this.peerRefreshLock.tryLock()) {
            try {
                try {
                    peerList = this.createPeerStatusList();
                }
                catch (ClientHandlerException | IOException | BadRequestException | HandshakeException | PortNotRunningException | UnknownPortException e) {
                    String message = String.format("%s Failed to update list of peers due to %s", new Object[]{this, e.toString()});
                    logger.warn(message);
                    if (logger.isDebugEnabled()) {
                        logger.warn("", e);
                    }
                    this.remoteGroup.getEventReporter().reportEvent(Severity.WARNING, CATEGORY, message);
                }
                this.peerStatuses = peerList;
                this.peerRefreshTime = System.currentTimeMillis();
            }
            finally {
                this.peerRefreshLock.unlock();
            }
        }
        if (peerList == null || peerList.isEmpty()) {
            return null;
        }
        for (int i = 0; i < peerList.size(); ++i) {
            long idx = this.peerIndex.getAndIncrement();
            int listIndex = (int)(idx % (long)peerList.size());
            PeerStatus peerStatus = peerList.get(listIndex);
            if (!this.isPenalized(peerStatus)) {
                return peerStatus;
            }
            logger.debug("{} {} is penalized; will not communicate with this peer", (Object)this, (Object)peerStatus);
        }
        logger.debug("{} All peers appear to be penalized; returning null", (Object)this);
        return null;
    }

    private boolean isPenalized(PeerStatus peerStatus) {
        Long expirationEnd = (Long)this.peerTimeoutExpirations.get(peerStatus);
        return expirationEnd == null ? false : expirationEnd > System.currentTimeMillis();
    }

    private List<PeerStatus> createPeerStatusList() throws IOException, BadRequestException, HandshakeException, UnknownPortException, PortNotRunningException {
        Set statuses = this.remoteGroup.getPeerStatuses();
        if (statuses == null) {
            return new ArrayList<PeerStatus>();
        }
        ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
        ArrayList<NodeInformation> nodeInfos = new ArrayList<NodeInformation>();
        for (PeerStatus peerStatus : statuses) {
            NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), Integer.valueOf(peerStatus.getPort()), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount());
            nodeInfos.add(nodeInfo);
        }
        clusterNodeInfo.setNodeInformation(nodeInfos);
        return this.formulateDestinationList(clusterNodeInfo);
    }

    private void transferFlowFiles(Peer peer, ClientProtocol protocol, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        protocol.transferFlowFiles(peer, context, session, codec);
    }

    private void receiveFlowFiles(Peer peer, ClientProtocol protocol, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        protocol.receiveFlowFiles(peer, context, session, codec);
    }

    private List<PeerStatus> formulateDestinationList(ClusterNodeInformation clusterNodeInfo) throws IOException {
        return StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, this.getConnectableType());
    }

    static List<PeerStatus> formulateDestinationList(ClusterNodeInformation clusterNodeInfo, ConnectableType connectableType) {
        Collection nodeInfoSet = clusterNodeInfo.getNodeInformation();
        int numDestinations = Math.max(128, nodeInfoSet.size());
        HashMap<NodeInformation, Integer> entryCountMap = new HashMap<NodeInformation, Integer>();
        long totalFlowFileCount = 0L;
        for (NodeInformation nodeInfo : nodeInfoSet) {
            totalFlowFileCount += (long)nodeInfo.getTotalFlowFiles();
        }
        int totalEntries = 0;
        for (NodeInformation nodeInfo : nodeInfoSet) {
            int flowFileCount = nodeInfo.getTotalFlowFiles();
            double percentageOfFlowFiles = Math.min(0.8, (double)flowFileCount / (double)totalFlowFileCount);
            double relativeWeighting = connectableType == ConnectableType.REMOTE_INPUT_PORT ? 1.0 - percentageOfFlowFiles : percentageOfFlowFiles;
            int entries = Math.max(1, (int)((double)numDestinations * relativeWeighting));
            entryCountMap.put(nodeInfo, Math.max(1, entries));
            totalEntries += entries;
        }
        ArrayList<PeerStatus> destinations = new ArrayList<PeerStatus>(totalEntries);
        for (int i = 0; i < totalEntries; ++i) {
            destinations.add(null);
        }
        for (Map.Entry entry : entryCountMap.entrySet()) {
            int numEntries;
            NodeInformation nodeInfo = (NodeInformation)entry.getKey();
            int skipIndex = numEntries = ((Integer)entry.getValue()).intValue();
            for (int i = 0; i < numEntries; ++i) {
                int index;
                PeerStatus status;
                int n = skipIndex * i;
                while (true) {
                    if ((status = (PeerStatus)destinations.get(index = n % destinations.size())) == null) break;
                    ++n;
                }
                status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort().intValue(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles());
                destinations.set(index, status);
            }
        }
        StringBuilder distributionDescription = new StringBuilder();
        distributionDescription.append("New Weighted Distribution of Nodes:");
        for (Map.Entry entry : entryCountMap.entrySet()) {
            double percentage = (double)((Integer)entry.getValue()).intValue() * 100.0 / (double)destinations.size();
            distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
        }
        logger.info(distributionDescription.toString());
        return destinations;
    }

    public boolean getTargetExists() {
        return this.targetExists.get();
    }

    public boolean isValid() {
        return this.getValidationErrors().isEmpty();
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList<ValidationResult> validationErrors = new ArrayList<ValidationResult>();
        ValidationResult error = null;
        if (!this.targetExists.get()) {
            error = new ValidationResult.Builder().explanation(String.format("Remote instance indicates that port '%s' no longer exists.", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
        } else if (this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && this.getConnections(Relationship.ANONYMOUS).isEmpty()) {
            error = new ValidationResult.Builder().explanation(String.format("Port '%s' has no outbound connections", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
        }
        if (error != null) {
            validationErrors.add(error);
        }
        return validationErrors;
    }

    public void verifyCanStart() {
        super.verifyCanStart();
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && this.getIncomingConnections().isEmpty()) {
            throw new IllegalStateException("Port " + this.getName() + " has no incoming connections");
        }
    }

    public void setUseCompression(boolean useCompression) {
        this.useCompression.set(useCompression);
    }

    public boolean isUseCompression() {
        return this.useCompression.get();
    }

    public String toString() {
        return "RemoteGroupPort[name=" + this.getName() + ",target=" + this.remoteGroup.getTargetUri().toString() + "]";
    }

    public RemoteProcessGroup getRemoteProcessGroup() {
        return this.remoteGroup;
    }

    public TransferDirection getTransferDirection() {
        return this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ? TransferDirection.SEND : TransferDirection.RECEIVE;
    }

    public void setTargetExists(boolean exists) {
        this.targetExists.set(exists);
    }

    public void removeConnection(Connection connection) throws IllegalArgumentException, IllegalStateException {
        super.removeConnection(connection);
        if (!this.getTargetExists() && !this.hasIncomingConnection() && this.getConnections().isEmpty()) {
            this.remoteGroup.removeNonExistentPort((RemoteGroupPort)this);
        }
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }

    private static class EndpointConnectionState {
        private final Peer peer;
        private final SocketClientProtocol socketClientProtocol;
        private final FlowFileCodec codec;
        private volatile long lastUsed;

        private EndpointConnectionState(Peer peer, SocketClientProtocol socketClientProtocol, FlowFileCodec codec) {
            this.peer = peer;
            this.socketClientProtocol = socketClientProtocol;
            this.codec = codec;
        }

        public FlowFileCodec getCodec() {
            return this.codec;
        }

        public SocketClientProtocol getSocketClientProtocol() {
            return this.socketClientProtocol;
        }

        public Peer getPeer() {
            return this.peer;
        }

        public void setLastTimeUsed() {
            this.lastUsed = System.currentTimeMillis();
        }

        public long getLastTimeUsed() {
            return this.lastUsed;
        }
    }
}

