/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardRemoteGroupPort
extends RemoteGroupPort {
    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L);
    public static final String USER_AGENT = "NiFi-Site-to-Site";
    public static final String CONTENT_TYPE = "application/octet-stream";
    public static final int GZIP_COMPRESSION_LEVEL = 1;
    private static final String CATEGORY = "Site to Site";
    private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
    private final RemoteProcessGroup remoteGroup;
    private final AtomicBoolean useCompression = new AtomicBoolean(false);
    private final AtomicBoolean targetExists = new AtomicBoolean(true);
    private final AtomicBoolean targetRunning = new AtomicBoolean(true);
    private final SSLContext sslContext;
    private final TransferDirection transferDirection;
    private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference();

    public StandardRemoteGroupPort(String id, String name, ProcessGroup processGroup, RemoteProcessGroup remoteGroup, TransferDirection direction, ConnectableType type, SSLContext sslContext, ProcessScheduler scheduler) {
        super(id, name, processGroup, type, scheduler);
        this.remoteGroup = remoteGroup;
        this.transferDirection = direction;
        this.sslContext = sslContext;
        this.setScheduldingPeriod("30000 nanos");
    }

    private static File getPeerPersistenceFile(String portId) {
        File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
        return new File(stateDir, portId + ".peers");
    }

    public boolean isTargetRunning() {
        return this.targetRunning.get();
    }

    public void setTargetRunning(boolean targetRunning) {
        this.targetRunning.set(targetRunning);
    }

    public boolean isTriggerWhenEmpty() {
        return this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
    }

    public void shutdown() {
        super.shutdown();
        SiteToSiteClient client = this.clientRef.get();
        if (client != null) {
            try {
                client.close();
            }
            catch (IOException ioe) {
                logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", (Throwable)ioe);
            }
        }
    }

    public void onSchedulingStart() {
        super.onSchedulingStart();
        SiteToSiteClient client = new SiteToSiteClient.Builder().url(this.remoteGroup.getTargetUri().toString()).portIdentifier(this.getIdentifier()).sslContext(this.sslContext).eventReporter(this.remoteGroup.getEventReporter()).peerPersistenceFile(StandardRemoteGroupPort.getPeerPersistenceFile(this.getIdentifier())).build();
        this.clientRef.set(client);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        Transaction transaction;
        FlowFile firstFlowFile;
        if (!this.remoteGroup.isTransmitting()) {
            logger.debug("{} {} is not transmitting; will not send/receive", (Object)this, (Object)this.remoteGroup);
            return;
        }
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) {
            logger.debug("{} No data to send", (Object)this);
            return;
        }
        String url = this.getRemoteProcessGroup().getTargetUri().toString();
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
            firstFlowFile = session.get();
            if (firstFlowFile == null) {
                return;
            }
        } else {
            firstFlowFile = null;
        }
        SiteToSiteClient client = this.clientRef.get();
        try {
            transaction = client.createTransaction(this.transferDirection);
        }
        catch (PortNotRunningException e) {
            context.yield();
            this.targetRunning.set(false);
            String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", new Object[]{this, url});
            logger.error(message);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            return;
        }
        catch (UnknownPortException e) {
            context.yield();
            this.targetExists.set(false);
            String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", new Object[]{this, url});
            logger.error(message);
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            return;
        }
        catch (IOException e) {
            String message = String.format("%s failed to communicate with %s due to %s", new Object[]{this, url, e.toString()});
            logger.error(message);
            if (logger.isDebugEnabled()) {
                logger.error("", (Throwable)e);
            }
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            session.rollback();
            return;
        }
        if (transaction == null) {
            logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", (Object)this);
            context.yield();
            return;
        }
        try {
            if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                this.transferFlowFiles(transaction, context, session, firstFlowFile);
            } else {
                int numReceived = this.receiveFlowFiles(transaction, context, session);
                if (numReceived == 0) {
                    context.yield();
                }
            }
            session.commit();
        }
        catch (Throwable t) {
            String message = String.format("%s failed to communicate with remote NiFi instance due to %s", new Object[]{this, t.toString()});
            logger.error("{} failed to communicate with remote NiFi instance due to {}", (Object)this, (Object)t.toString());
            if (logger.isDebugEnabled()) {
                logger.error("", t);
            }
            this.remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
            transaction.error();
            session.rollback();
        }
    }

    public String getYieldPeriod() {
        return this.remoteGroup.getYieldDuration();
    }

    private int transferFlowFiles(final Transaction transaction, ProcessContext context, ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
        Object flowFile = firstFlowFile;
        try {
            String userDn = transaction.getCommunicant().getDistinguishedName();
            long startSendingNanos = System.nanoTime();
            StopWatch stopWatch = new StopWatch(true);
            long bytesSent = 0L;
            HashSet<FlowFile> flowFilesSent = new HashSet<FlowFile>();
            boolean continueTransaction = true;
            while (continueTransaction) {
                long startNanos = System.nanoTime();
                final FlowFile toWrap = flowFile;
                session.read(flowFile, new InputStreamCallback(){

                    public void process(InputStream in) throws IOException {
                        StandardDataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
                        transaction.send((DataPacket)dataPacket);
                    }
                });
                long transferNanos = System.nanoTime() - startNanos;
                long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
                flowFilesSent.add((FlowFile)flowFile);
                bytesSent += flowFile.getSize();
                logger.debug("{} Sent {} to {}", new Object[]{this, flowFile, transaction.getCommunicant().getUrl()});
                String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
                session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
                session.remove(flowFile);
                long sendingNanos = System.nanoTime() - startSendingNanos;
                flowFile = sendingNanos < BATCH_SEND_NANOS ? session.get() : null;
                continueTransaction = flowFile != null;
            }
            transaction.confirm();
            stopWatch.stop();
            String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
            long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            String dataSize = FormatUtils.formatDataSize((double)bytesSent);
            session.commit();
            transaction.complete();
            String flowFileDescription = flowFilesSent.size() < 20 ? ((Object)flowFilesSent).toString() : flowFilesSent.size() + " FlowFiles";
            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
            return flowFilesSent.size();
        }
        catch (Exception e) {
            session.rollback();
            throw e;
        }
    }

    private int receiveFlowFiles(Transaction transaction, ProcessContext context, ProcessSession session) throws IOException, ProtocolException {
        String userDn = transaction.getCommunicant().getDistinguishedName();
        StopWatch stopWatch = new StopWatch(true);
        HashSet flowFilesReceived = new HashSet();
        long bytesReceived = 0L;
        while (true) {
            long start = System.nanoTime();
            DataPacket dataPacket = transaction.receive();
            if (dataPacket == null) break;
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
            flowFile = session.importFrom(dataPacket.getData(), flowFile);
            long receiveNanos = System.nanoTime() - start;
            String sourceFlowFileIdentifier = (String)dataPacket.getAttributes().get(CoreAttributes.UUID.key());
            if (sourceFlowFileIdentifier == null) {
                sourceFlowFileIdentifier = "<Unknown Identifier>";
            }
            String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
            session.transfer(flowFile, Relationship.ANONYMOUS);
            bytesReceived += dataPacket.getSize();
        }
        transaction.confirm();
        session.commit();
        transaction.complete();
        if (!flowFilesReceived.isEmpty()) {
            stopWatch.stop();
            String flowFileDescription = flowFilesReceived.size() < 20 ? ((Object)flowFilesReceived).toString() : flowFilesReceived.size() + " FlowFiles";
            String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
            long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
            String dataSize = FormatUtils.formatDataSize((double)bytesReceived);
            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
        }
        return flowFilesReceived.size();
    }

    public boolean getTargetExists() {
        return this.targetExists.get();
    }

    public boolean isValid() {
        return this.getValidationErrors().isEmpty();
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList<ValidationResult> validationErrors = new ArrayList<ValidationResult>();
        ValidationResult error = null;
        if (!this.targetExists.get()) {
            error = new ValidationResult.Builder().explanation(String.format("Remote instance indicates that port '%s' no longer exists.", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
        } else if (this.getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && this.getConnections(Relationship.ANONYMOUS).isEmpty()) {
            error = new ValidationResult.Builder().explanation(String.format("Port '%s' has no outbound connections", this.getName())).subject(String.format("Remote port '%s'", this.getName())).valid(false).build();
        }
        if (error != null) {
            validationErrors.add(error);
        }
        return validationErrors;
    }

    public void verifyCanStart() {
        super.verifyCanStart();
        if (this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && this.getIncomingConnections().isEmpty()) {
            throw new IllegalStateException("Port " + this.getName() + " has no incoming connections");
        }
    }

    public void setUseCompression(boolean useCompression) {
        this.useCompression.set(useCompression);
    }

    public boolean isUseCompression() {
        return this.useCompression.get();
    }

    public String toString() {
        return "RemoteGroupPort[name=" + this.getName() + ",target=" + this.remoteGroup.getTargetUri().toString() + "]";
    }

    public RemoteProcessGroup getRemoteProcessGroup() {
        return this.remoteGroup;
    }

    public TransferDirection getTransferDirection() {
        return this.getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ? TransferDirection.SEND : TransferDirection.RECEIVE;
    }

    public void setTargetExists(boolean exists) {
        this.targetExists.set(exists);
    }

    public void removeConnection(Connection connection) throws IllegalArgumentException, IllegalStateException {
        super.removeConnection(connection);
        if (!this.getTargetExists() && !this.hasIncomingConnection() && this.getConnections().isEmpty()) {
            this.remoteGroup.removeNonExistentPort((RemoteGroupPort)this);
        }
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }
}

