/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobConnection;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.jobgraph.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BlobServer
extends Thread
implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    static final int BUFFER_SIZE = 4096;
    static final int MAX_KEY_LENGTH = 64;
    static final byte PUT_OPERATION = 0;
    static final byte GET_OPERATION = 1;
    static final byte DELETE_OPERATION = 2;
    private final AtomicInteger tempFileCounter = new AtomicInteger(0);
    private final ServerSocket serverSocket;
    private AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Thread shutdownHook;
    private final File storageDir;

    public BlobServer(Configuration config) throws IOException {
        String storageDirectory = config.getString("blob.storage.directory", null);
        this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
        LOG.info("Created BLOB server storage directory {}", (Object)this.storageDir);
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        try {
            this.serverSocket = new ServerSocket(0);
            this.start();
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Started BLOB server on port %d", this.serverSocket.getLocalPort()));
            }
        }
        catch (IOException e) {
            throw new IOException("Could not create BlobServer with random port.", e);
        }
    }

    public int getServerPort() {
        return this.serverSocket.getLocalPort();
    }

    public File getStorageLocation(BlobKey key) {
        return BlobUtils.getStorageLocation(this.storageDir, key);
    }

    public File getStorageLocation(JobID jobID, String key) {
        return BlobUtils.getStorageLocation(this.storageDir, jobID, key);
    }

    public void deleteJobDirectory(JobID jobID) throws IOException {
        BlobUtils.deleteJobDirectory(this.storageDir, jobID);
    }

    File getTemporaryFilename() {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir), String.format("temp-%08d", this.tempFileCounter.getAndIncrement()));
    }

    @Override
    public void run() {
        block3: {
            try {
                while (!this.shutdownRequested.get()) {
                    new BlobConnection(this.serverSocket.accept(), this).start();
                }
            }
            catch (Throwable t) {
                if (this.shutdownRequested.get()) break block3;
                LOG.error("BLOB server stopped working. Shutting down", t);
                this.shutdown();
            }
        }
    }

    @Override
    public void shutdown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            try {
                this.serverSocket.close();
            }
            catch (IOException ioe) {
                LOG.debug("Error while closing the server socket.", (Throwable)ioe);
            }
            try {
                this.join();
            }
            catch (InterruptedException ie) {
                LOG.debug("Error while waiting for this thread to die.", (Throwable)ie);
            }
            try {
                FileUtils.deleteDirectory((File)this.storageDir);
            }
            catch (IOException e) {
                LOG.error("BLOB server failed to properly clean up its storage directory.");
            }
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                }
                catch (IllegalStateException e) {
                }
                catch (Throwable t) {
                    LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
                }
            }
        }
    }

    @Override
    public URL getURL(BlobKey requiredBlob) throws IOException {
        if (requiredBlob == null) {
            throw new IllegalArgumentException("Required BLOB cannot be null.");
        }
        File localFile = BlobUtils.getStorageLocation(this.storageDir, requiredBlob);
        if (!localFile.exists()) {
            throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does " + "not exist.");
        }
        return localFile.toURI().toURL();
    }

    @Override
    public void delete(BlobKey blobKey) throws IOException {
        File localFile = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (localFile.exists()) {
            localFile.delete();
        }
    }

    @Override
    public int getPort() {
        return this.getServerPort();
    }

    static void writeLength(int length, byte[] buf, OutputStream outputStream) throws IOException {
        buf[0] = (byte)(length & 0xFF);
        buf[1] = (byte)(length >> 8 & 0xFF);
        buf[2] = (byte)(length >> 16 & 0xFF);
        buf[3] = (byte)(length >> 24 & 0xFF);
        outputStream.write(buf, 0, 4);
    }

    static int readLength(byte[] buf, InputStream inputStream) throws IOException {
        int bytesRead;
        int read;
        for (bytesRead = 0; bytesRead < 4; bytesRead += read) {
            read = inputStream.read(buf, bytesRead, 4 - bytesRead);
            if (read >= 0) continue;
            throw new EOFException();
        }
        bytesRead = buf[0] & 0xFF;
        bytesRead |= (buf[1] & 0xFF) << 8;
        bytesRead |= (buf[2] & 0xFF) << 16;
        return bytesRead |= (buf[3] & 0xFF) << 24;
    }

    static void readFully(InputStream inputStream, byte[] buf, int off, int len) throws IOException {
        int read;
        for (int bytesRead = 0; bytesRead < len; bytesRead += read) {
            read = inputStream.read(buf, off + bytesRead, len - bytesRead);
            if (read >= 0) continue;
            throw new EOFException();
        }
    }
}

