/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol.impl;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSocket;
import javax.security.cert.X509Certificate;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.impl.CopyingInputStream;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketListener;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketProtocolListener
extends SocketListener
implements ProtocolListener {
    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class));
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<ProtocolHandler>();
    private volatile BulletinRepository bulletinRepository;

    public SocketProtocolListener(int numThreads, int port, ServerSocketConfiguration configuration, ProtocolContext<ProtocolMessage> protocolContext) {
        super(numThreads, port, configuration);
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        this.protocolContext = protocolContext;
    }

    @Override
    public void setBulletinRepository(BulletinRepository bulletinRepository) {
        this.bulletinRepository = bulletinRepository;
    }

    @Override
    public void start() throws IOException {
        if (super.isRunning()) {
            throw new IllegalStateException("Instance is already started.");
        }
        super.start();
    }

    @Override
    public void stop() throws IOException {
        if (!super.isRunning()) {
            throw new IOException("Instance is already stopped.");
        }
        super.stop();
    }

    @Override
    public Collection<ProtocolHandler> getHandlers() {
        return Collections.unmodifiableCollection(this.handlers);
    }

    @Override
    public void addHandler(ProtocolHandler handler) {
        if (handler == null) {
            throw new NullPointerException("Protocol handler may not be null.");
        }
        this.handlers.add(handler);
    }

    @Override
    public boolean removeHandler(ProtocolHandler handler) {
        return this.handlers.remove(handler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispatchRequest(Socket socket) {
        block18: {
            byte[] receivedMessage = null;
            String hostname = null;
            int maxMsgBuffer = 0x100000;
            try {
                ProtocolMessage request;
                StopWatch stopWatch = new StopWatch(true);
                hostname = socket.getInetAddress().getHostName();
                String requestId = UUID.randomUUID().toString();
                logger.info("Received request {} from {}", (Object)requestId, (Object)hostname);
                String requestorDn = null;
                if (socket instanceof SSLSocket) {
                    SSLSocket sslSocket = (SSLSocket)socket;
                    try {
                        X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
                        if (certChains != null && certChains.length > 0) {
                            requestorDn = certChains[0].getSubjectDN().getName();
                        }
                    }
                    catch (ProtocolException pe) {
                        throw pe;
                    }
                    catch (Exception e) {
                        throw new ProtocolException(e);
                    }
                }
                ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                InputStream inStream = socket.getInputStream();
                CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, 0x100000);
                logger.debug("Request {} has a message length of {}", (Object)requestId, (Object)copyingInputStream.getNumberOfBytesCopied());
                try {
                    request = unmarshaller.unmarshal(copyingInputStream);
                }
                finally {
                    receivedMessage = copyingInputStream.getBytesRead();
                }
                request.setRequestorDN(requestorDn);
                ProtocolHandler desiredHandler = null;
                for (ProtocolHandler handler : this.getHandlers()) {
                    if (!handler.canHandle(request)) continue;
                    desiredHandler = handler;
                    break;
                }
                if (desiredHandler == null) {
                    throw new ProtocolException("No handler assigned to handle message type: " + (Object)((Object)request.getType()));
                }
                ProtocolMessage response = desiredHandler.handle(request);
                if (response != null) {
                    try {
                        logger.debug("Sending response for request {}", (Object)requestId);
                        ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                        marshaller.marshal(response, socket.getOutputStream());
                    }
                    catch (IOException ioe) {
                        throw new ProtocolException("Failed marshalling protocol message in response to message type: " + (Object)((Object)request.getType()) + " due to " + ioe, ioe);
                    }
                }
                stopWatch.stop();
                logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", new Object[]{requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS)});
            }
            catch (IOException e) {
                logger.warn("Failed processing protocol message from " + hostname + " due to " + e, (Throwable)e);
                if (this.bulletinRepository != null) {
                    Bulletin bulletin = BulletinFactory.createBulletin((String)"Clustering", (String)"WARNING", (String)String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
                    this.bulletinRepository.addBulletin(bulletin);
                }
            }
            catch (ProtocolException e) {
                logger.warn("Failed processing protocol message from " + hostname + " due to " + e, (Throwable)e);
                if (this.bulletinRepository == null) break block18;
                Bulletin bulletin = BulletinFactory.createBulletin((String)"Clustering", (String)"WARNING", (String)String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
                this.bulletinRepository.addBulletin(bulletin);
            }
        }
    }
}

