/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.RedirectingSslHandler;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class RestServerEndpoint
implements AutoCloseableAsync {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Object lock = new Object();
    private final String restAddress;
    private final String restBindAddress;
    private final int restBindPort;
    @Nullable
    private final SSLEngineFactory sslEngineFactory;
    private final int maxContentLength;
    protected final Path uploadDir;
    protected final Map<String, String> responseHeaders;
    private final CompletableFuture<Void> terminationFuture;
    private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private String restBaseUrl;
    private State state = State.CREATED;

    public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException {
        Preconditions.checkNotNull((Object)configuration);
        this.restAddress = configuration.getRestAddress();
        this.restBindAddress = configuration.getRestBindAddress();
        this.restBindPort = configuration.getRestBindPort();
        this.sslEngineFactory = configuration.getSslEngineFactory();
        this.uploadDir = configuration.getUploadDir();
        RestServerEndpoint.createUploadDir(this.uploadDir, this.log);
        this.maxContentLength = configuration.getMaxContentLength();
        this.responseHeaders = configuration.getResponseHeaders();
        this.terminationFuture = new CompletableFuture();
    }

    protected abstract List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.state == State.CREATED ? 1 : 0) != 0, (Object)"The RestServerEndpoint cannot be restarted.");
            this.log.info("Starting rest endpoint.");
            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<String>();
            this.handlers = this.initializeHandlers(restAddressFuture);
            Collections.sort(this.handlers, RestHandlerUrlComparator.INSTANCE);
            this.handlers.forEach(handler -> {
                this.log.debug("Register handler {} under {}@{}.", new Object[]{handler.f1, ((RestHandlerSpecification)handler.f0).getHttpMethod(), ((RestHandlerSpecification)handler.f0).getTargetRestEndpointURL()});
                RestServerEndpoint.registerHandler(router, (Tuple2<RestHandlerSpecification, ChannelInboundHandler>)handler);
            });
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel ch) {
                    RouterHandler handler = new RouterHandler(router, RestServerEndpoint.this.responseHeaders);
                    if (RestServerEndpoint.this.sslEngineFactory != null) {
                        ch.pipeline().addLast("ssl", (ChannelHandler)new RedirectingSslHandler(RestServerEndpoint.this.restAddress, restAddressFuture, RestServerEndpoint.this.sslEngineFactory));
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new FileUploadHandler(RestServerEndpoint.this.uploadDir)}).addLast(new ChannelHandler[]{new FlinkHttpObjectAggregator(RestServerEndpoint.this.maxContentLength, RestServerEndpoint.this.responseHeaders)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(handler.name(), (ChannelHandler)handler).addLast(new ChannelHandler[]{new PipelineErrorHandler(RestServerEndpoint.this.log, RestServerEndpoint.this.responseHeaders)});
                }
            };
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, (ThreadFactory)new DefaultThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, (ThreadFactory)new DefaultThreadFactory("flink-rest-server-netty-worker"));
            this.bootstrap = new ServerBootstrap();
            ((ServerBootstrap)this.bootstrap.group((EventLoopGroup)bossGroup, (EventLoopGroup)workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)initializer);
            this.log.debug("Binding rest endpoint to {}:{}.", (Object)this.restBindAddress, (Object)this.restBindPort);
            ChannelFuture channel = this.restBindAddress == null ? this.bootstrap.bind(this.restBindPort) : this.bootstrap.bind(this.restBindAddress, this.restBindPort);
            this.serverChannel = channel.syncUninterruptibly().channel();
            InetSocketAddress bindAddress = (InetSocketAddress)this.serverChannel.localAddress();
            String advertisedAddress = bindAddress.getAddress().isAnyLocalAddress() ? this.restAddress : bindAddress.getAddress().getHostAddress();
            int port = bindAddress.getPort();
            this.log.info("Rest endpoint listening at {}:{}", (Object)advertisedAddress, (Object)port);
            String protocol = this.sslEngineFactory != null ? "https://" : "http://";
            this.restBaseUrl = protocol + advertisedAddress + ':' + port;
            restAddressFuture.complete(this.restBaseUrl);
            this.state = State.RUNNING;
            this.startInternal();
        }
    }

    protected abstract void startInternal() throws Exception;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public InetSocketAddress getServerAddress() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.state != State.CREATED ? 1 : 0) != 0, (Object)"The RestServerEndpoint has not been started yet.");
            Channel server = this.serverChannel;
            if (server != null) {
                try {
                    return (InetSocketAddress)server.localAddress();
                }
                catch (Exception e) {
                    this.log.error("Cannot access local server address", (Throwable)e);
                }
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getRestBaseUrl() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.state != State.CREATED ? 1 : 0) != 0, (Object)"The RestServerEndpoint has not been started yet.");
            return this.restBaseUrl;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            this.log.info("Shutting down rest endpoint.");
            if (this.state == State.RUNNING) {
                CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(this.closeHandlersAsync(), this::shutDownInternal);
                shutDownFuture.whenComplete((ignored, throwable) -> {
                    this.log.info("Shut down complete.");
                    if (throwable != null) {
                        this.terminationFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.terminationFuture.complete(null);
                    }
                });
                this.state = State.SHUTDOWN;
            } else if (this.state == State.CREATED) {
                this.terminationFuture.complete(null);
                this.state = State.SHUTDOWN;
            }
            return this.terminationFuture;
        }
    }

    private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
        return FutureUtils.waitForAll(this.handlers.stream().map(tuple -> (ChannelInboundHandler)tuple.f1).filter(handler -> handler instanceof AutoCloseableAsync).map(handler -> ((AutoCloseableAsync)handler).closeAsync()).collect(Collectors.toList()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> shutDownInternal() {
        Object object = this.lock;
        synchronized (object) {
            CompletableFuture channelFuture = new CompletableFuture();
            if (this.serverChannel != null) {
                this.serverChannel.close().addListener(finished -> {
                    if (finished.isSuccess()) {
                        channelFuture.complete(null);
                    } else {
                        channelFuture.completeExceptionally(finished.cause());
                    }
                });
                this.serverChannel = null;
            }
            CompletableFuture<Void> channelTerminationFuture = new CompletableFuture<Void>();
            channelFuture.thenRun(() -> {
                CompletableFuture<Object> groupFuture = new CompletableFuture<Object>();
                CompletableFuture<Object> childGroupFuture = new CompletableFuture<Object>();
                Time gracePeriod = Time.seconds((long)10L);
                if (this.bootstrap != null) {
                    if (this.bootstrap.group() != null) {
                        this.bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                groupFuture.complete(null);
                            } else {
                                groupFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        groupFuture.complete(null);
                    }
                    if (this.bootstrap.childGroup() != null) {
                        this.bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS).addListener(finished -> {
                            if (finished.isSuccess()) {
                                childGroupFuture.complete(null);
                            } else {
                                childGroupFuture.completeExceptionally(finished.cause());
                            }
                        });
                    } else {
                        childGroupFuture.complete(null);
                    }
                    this.bootstrap = null;
                } else {
                    groupFuture.complete(null);
                    childGroupFuture.complete(null);
                }
                FutureUtils.ConjunctFuture<Void> combinedFuture = FutureUtils.completeAll(Arrays.asList(groupFuture, childGroupFuture));
                FutureUtils.orTimeout(combinedFuture, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS);
                ((CompletableFuture)combinedFuture.exceptionally(throwable -> {
                    if (throwable instanceof TimeoutException) {
                        this.log.info("Could not properly shut down Netty. Continue shut down of RestServerEndpoint.");
                        return null;
                    }
                    throw new CompletionException(ExceptionUtils.stripCompletionException((Throwable)throwable));
                })).whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        channelTerminationFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        channelTerminationFuture.complete(null);
                    }
                });
            });
            return channelTerminationFuture;
        }
    }

    private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
        switch (((RestHandlerSpecification)specificationHandler.f0).getHttpMethod()) {
            case GET: {
                router.GET(((RestHandlerSpecification)specificationHandler.f0).getTargetRestEndpointURL(), specificationHandler.f1);
                break;
            }
            case POST: {
                router.POST(((RestHandlerSpecification)specificationHandler.f0).getTargetRestEndpointURL(), specificationHandler.f1);
                break;
            }
            case DELETE: {
                router.DELETE(((RestHandlerSpecification)specificationHandler.f0).getTargetRestEndpointURL(), specificationHandler.f1);
                break;
            }
            case PATCH: {
                router.PATCH(((RestHandlerSpecification)specificationHandler.f0).getTargetRestEndpointURL(), specificationHandler.f1);
                break;
            }
            default: {
                throw new RuntimeException("Unsupported http method: " + (Object)((Object)((RestHandlerSpecification)specificationHandler.f0).getHttpMethod()) + '.');
            }
        }
    }

    @VisibleForTesting
    static void createUploadDir(Path uploadDir, Logger log) throws IOException {
        if (!Files.exists(uploadDir, new LinkOption[0])) {
            log.warn("Upload directory {} does not exist, or has been deleted externally. Previously uploaded files are no longer available.", (Object)uploadDir);
            RestServerEndpoint.checkAndCreateUploadDir(uploadDir, log);
        }
    }

    private static synchronized void checkAndCreateUploadDir(Path uploadDir, Logger log) throws IOException {
        if (Files.exists(uploadDir, new LinkOption[0]) && Files.isWritable(uploadDir)) {
            log.info("Using directory {} for file uploads.", (Object)uploadDir);
        } else if (Files.isWritable(Files.createDirectories(uploadDir, new FileAttribute[0]))) {
            log.info("Created directory {} for file uploads.", (Object)uploadDir);
        } else {
            log.warn("Upload directory {} cannot be created or is not writable.", (Object)uploadDir);
            throw new IOException(String.format("Upload directory %s cannot be created or is not writable.", uploadDir));
        }
    }

    private static enum State {
        CREATED,
        RUNNING,
        SHUTDOWN;

    }

    public static final class RestHandlerUrlComparator
    implements Comparator<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>,
    Serializable {
        private static final long serialVersionUID = 2388466767835547926L;
        private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
        static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

        @Override
        public int compare(Tuple2<RestHandlerSpecification, ChannelInboundHandler> o1, Tuple2<RestHandlerSpecification, ChannelInboundHandler> o2) {
            return CASE_INSENSITIVE_ORDER.compare(((RestHandlerSpecification)o1.f0).getTargetRestEndpointURL(), ((RestHandlerSpecification)o2.f0).getTargetRestEndpointURL());
        }

        public static final class CaseInsensitiveOrderComparator
        implements Comparator<String>,
        Serializable {
            private static final long serialVersionUID = 8550835445193437027L;

            @Override
            public int compare(String s1, String s2) {
                int n1 = s1.length();
                int n2 = s2.length();
                int min = Math.min(n1, n2);
                for (int i = 0; i < min; ++i) {
                    char c2;
                    char c1 = s1.charAt(i);
                    if (c1 == (c2 = s2.charAt(i)) || (c1 = Character.toUpperCase(c1)) == (c2 = Character.toUpperCase(c2)) || (c1 = Character.toLowerCase(c1)) == (c2 = Character.toLowerCase(c2))) continue;
                    if (c1 == ':') {
                        return 1;
                    }
                    if (c2 == ':') {
                        return -1;
                    }
                    return c1 - c2;
                }
                return n1 - n2;
            }
        }
    }
}

