/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.filecache;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.taskmanager.runtime.ExecutorThreadFactory;
import org.apache.flink.runtime.util.IOUtils;

public class FileCache {
    private LocalFileSystem lfs = new LocalFileSystem();
    private static final Object lock = new Object();
    private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID, String>, Integer>();
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FutureTask<Path> createTmpFile(String name, DistributedCache.DistributedCacheEntry entry, JobID jobID) {
        Map<Pair<JobID, String>, Integer> map = this.count;
        synchronized (map) {
            ImmutablePair key = new ImmutablePair((Object)jobID, (Object)name);
            if (this.count.containsKey(key)) {
                this.count.put((Pair<JobID, String>)key, this.count.get(key) + 1);
            } else {
                this.count.put((Pair<JobID, String>)key, 1);
            }
        }
        CopyProcess cp = new CopyProcess(name, entry, jobID);
        FutureTask<Path> copyTask = new FutureTask<Path>(cp);
        this.executorService.submit(copyTask);
        return copyTask;
    }

    public void deleteTmpFile(String name, DistributedCache.DistributedCacheEntry entry, JobID jobID) {
        DeleteProcess dp = new DeleteProcess(name, entry, jobID, this.count.get(new ImmutablePair((Object)jobID, (Object)name)));
        this.executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
    }

    public Path getTempDir(JobID jobID, String childPath) {
        return new Path(GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", (String)ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), "tmp_" + jobID.toString() + "/" + childPath);
    }

    public void shutdown() {
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Error shutting down the file cache", e);
            }
        }
    }

    public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
        block6: {
            FileSystem sFS = sourcePath.getFileSystem();
            FileSystem tFS = targetPath.getFileSystem();
            if (tFS.exists(targetPath)) break block6;
            if (sFS.getFileStatus(sourcePath).isDir()) {
                FileStatus[] contents;
                tFS.mkdirs(targetPath);
                for (FileStatus content : contents = sFS.listStatus(sourcePath)) {
                    String distPath = content.getPath().toString();
                    if (content.isDir() && distPath.endsWith("/")) {
                        distPath = distPath.substring(0, distPath.length() - 1);
                    }
                    String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
                    FileCache.copy(content.getPath(), new Path(localPath), executable);
                }
            } else {
                try {
                    FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
                    FSDataInputStream fsInput = sFS.open(sourcePath);
                    IOUtils.copyBytes((InputStream)fsInput, (OutputStream)lfsOutput);
                    new File(targetPath.toString()).setExecutable(executable);
                }
                catch (IOException ioe) {
                    // empty catch block
                }
            }
        }
    }

    private class DeleteProcess
    implements Runnable {
        private String name;
        private JobID jobID;
        private int oldCount;

        public DeleteProcess(String name, DistributedCache.DistributedCacheEntry e, JobID jobID, int c) {
            this.name = name;
            this.jobID = jobID;
            this.oldCount = c;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Map map = FileCache.this.count;
            synchronized (map) {
                if ((Integer)FileCache.this.count.get(new ImmutablePair((Object)this.jobID, (Object)this.name)) != this.oldCount) {
                    return;
                }
            }
            Path tmp = FileCache.this.getTempDir(this.jobID, "");
            try {
                if (FileCache.this.lfs.exists(tmp)) {
                    FileCache.this.lfs.delete(tmp, true);
                }
            }
            catch (IOException e1) {
                throw new RuntimeException("Error deleting the file", e1);
            }
        }
    }

    private class CopyProcess
    implements Callable<Path> {
        private JobID jobID;
        private String filePath;
        private Boolean executable;

        public CopyProcess(String name, DistributedCache.DistributedCacheEntry e, JobID jobID) {
            this.filePath = e.filePath;
            this.executable = e.isExecutable;
            this.jobID = jobID;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Path call() {
            Path tmp = FileCache.this.getTempDir(this.jobID, this.filePath.substring(this.filePath.lastIndexOf("/") + 1));
            try {
                Object object = lock;
                synchronized (object) {
                    FileCache.copy(new Path(this.filePath), tmp, this.executable);
                }
            }
            catch (IOException e1) {
                throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
            }
            return tmp;
        }
    }
}

