/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateDirectory {
    private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";
    static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";
    private final Object taskDirCreationLock = new Object();
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private final HashMap<TaskId, FileChannel> channels = new HashMap();
    private final HashMap<TaskId, LockAndOwner> locks = new HashMap();
    private FileChannel stateDirLockChannel;
    private FileLock stateDirLock;
    private FileChannel globalStateChannel;
    private FileLock globalStateLock;

    public StateDirectory(StreamsConfig config, Time time, boolean hasPersistentStores) {
        this.time = time;
        this.hasPersistentStores = hasPersistentStores;
        this.appId = config.getString("application.id");
        String stateDirName = config.getString("state.dir");
        File baseDir = new File(stateDirName);
        this.stateDir = new File(baseDir, this.appId);
        if (this.hasPersistentStores) {
            if (!baseDir.exists() && !baseDir.mkdirs()) {
                throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
            }
            if (!this.stateDir.exists() && !this.stateDir.mkdir()) {
                throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
            }
            if (stateDirName.startsWith(System.getProperty("java.io.tmpdir"))) {
                log.warn("Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [" + stateDirName + "]");
            }
            this.configurePermissions(baseDir);
            this.configurePermissions(this.stateDir);
        }
    }

    private void configurePermissions(File file) {
        Path path = file.toPath();
        if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
            Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
            try {
                Files.setPosixFilePermissions(path, perms);
            }
            catch (IOException e) {
                log.error("Error changing permissions for the directory {} ", (Object)path, (Object)e);
            }
        } else {
            boolean set = file.setReadable(true, true);
            set &= file.setWritable(true, true);
            if (!(set &= file.setExecutable(true, true))) {
                log.error("Failed to change permissions for the directory {}", (Object)file);
            }
        }
    }

    private boolean lockStateDirectory() {
        File lockFile = new File(this.stateDir, LOCK_FILE_NAME);
        try {
            this.stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            this.stateDirLock = this.tryLock(this.stateDirLockChannel);
        }
        catch (IOException e) {
            log.error("Unable to lock the state directory due to unexpected exception", (Throwable)e);
            throw new ProcessorStateException("Failed to lock the state directory during startup", e);
        }
        return this.stateDirLock != null;
    }

    public UUID initializeProcessId() {
        if (!this.hasPersistentStores) {
            return UUID.randomUUID();
        }
        if (!this.lockStateDirectory()) {
            log.error("Unable to obtain lock as state directory is already locked by another process");
            throw new StreamsException("Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory");
        }
        File processFile = new File(this.stateDir, PROCESS_FILE_NAME);
        ObjectMapper mapper = new ObjectMapper();
        try {
            StateDirectoryProcessFile processFileData;
            if (processFile.exists()) {
                try {
                    processFileData = (StateDirectoryProcessFile)mapper.readValue(processFile, StateDirectoryProcessFile.class);
                    log.info("Reading UUID from process file: {}", (Object)processFileData.processId);
                    if (processFileData.processId != null) {
                        return processFileData.processId;
                    }
                }
                catch (Exception e) {
                    log.warn("Failed to read json process file", (Throwable)e);
                }
            }
            processFileData = new StateDirectoryProcessFile(UUID.randomUUID());
            log.info("No process id found on disk, got fresh process id {}", (Object)processFileData.processId);
            mapper.writeValue(processFile, (Object)processFileData);
            return processFileData.processId;
        }
        catch (IOException e) {
            log.error("Unable to read/write process file due to unexpected exception", (Throwable)e);
            throw new ProcessorStateException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public File directoryForTask(TaskId taskId) {
        File taskDir = new File(this.stateDir, taskId.toString());
        if (this.hasPersistentStores && !taskDir.exists()) {
            Object object = this.taskDirCreationLock;
            synchronized (object) {
                if (!taskDir.exists() && !taskDir.mkdir()) {
                    throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
                }
            }
        }
        return taskDir;
    }

    File checkpointFileFor(TaskId taskId) {
        return new File(this.directoryForTask(taskId), ".checkpoint");
    }

    boolean directoryForTaskIsEmpty(TaskId taskId) {
        File taskDir = this.directoryForTask(taskId);
        return this.taskDirEmpty(taskDir);
    }

    private boolean taskDirEmpty(File taskDir) {
        File[] storeDirs = taskDir.listFiles(pathname -> !pathname.getName().equals(LOCK_FILE_NAME) && !pathname.getName().equals(".checkpoint"));
        return storeDirs == null || storeDirs.length == 0;
    }

    File globalStateDir() {
        File dir = new File(this.stateDir, "global");
        if (this.hasPersistentStores && !dir.exists() && !dir.mkdir()) {
            throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    synchronized boolean lock(TaskId taskId) throws IOException {
        FileChannel channel;
        File lockFile;
        if (!this.hasPersistentStores) {
            return true;
        }
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            log.trace("{} Found cached state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            return true;
        }
        if (lockAndOwner != null) {
            return false;
        }
        try {
            lockFile = new File(this.directoryForTask(taskId), LOCK_FILE_NAME);
        }
        catch (ProcessorStateException e) {
            return false;
        }
        try {
            channel = this.getOrCreateFileChannel(taskId, lockFile.toPath());
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock lock = this.tryLock(channel);
        if (lock != null) {
            this.locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
            log.debug("{} Acquired state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
        }
        return lock != null;
    }

    synchronized boolean lockGlobalState() throws IOException {
        FileChannel channel;
        if (!this.hasPersistentStores) {
            return true;
        }
        if (this.globalStateLock != null) {
            log.trace("{} Found cached state dir lock for the global task", (Object)this.logPrefix());
            return true;
        }
        File lockFile = new File(this.globalStateDir(), LOCK_FILE_NAME);
        try {
            channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock fileLock = this.tryLock(channel);
        if (fileLock == null) {
            channel.close();
            return false;
        }
        this.globalStateChannel = channel;
        this.globalStateLock = fileLock;
        log.debug("{} Acquired global state dir lock", (Object)this.logPrefix());
        return true;
    }

    synchronized void unlockGlobalState() throws IOException {
        if (this.globalStateLock == null) {
            return;
        }
        this.globalStateLock.release();
        this.globalStateChannel.close();
        this.globalStateLock = null;
        this.globalStateChannel = null;
        log.debug("{} Released global state dir lock", (Object)this.logPrefix());
    }

    synchronized void unlock(TaskId taskId) throws IOException {
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            this.locks.remove(taskId);
            lockAndOwner.lock.release();
            log.debug("{} Released state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            FileChannel fileChannel = this.channels.remove(taskId);
            if (fileChannel != null) {
                fileChannel.close();
            }
        }
    }

    public void close() {
        if (this.hasPersistentStores) {
            try {
                this.stateDirLock.release();
                this.stateDirLockChannel.close();
                this.stateDirLock = null;
                this.stateDirLockChannel = null;
            }
            catch (IOException e) {
                log.error("Unexpected exception while unlocking the state dir", (Throwable)e);
                throw new StreamsException("Failed to release the lock on the state directory", e);
            }
            if (this.locks.isEmpty()) {
                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", this.locks);
            }
            if (this.globalStateLock != null) {
                log.error("Global state lock is present while closing the state, this indicates unclean shutdown");
            }
        }
    }

    public synchronized void clean() {
        try {
            this.cleanRemovedTasksCalledByUser();
        }
        catch (Exception e) {
            throw new StreamsException(e);
        }
        try {
            if (this.stateDir.exists()) {
                Utils.delete((File)this.globalStateDir().getAbsoluteFile());
            }
        }
        catch (IOException exception) {
            log.error(String.format("%s Failed to delete global state directory of %s due to an unexpected exception", this.logPrefix(), this.appId), (Throwable)exception);
            throw new StreamsException(exception);
        }
    }

    public synchronized void cleanRemovedTasks(long cleanupDelayMs) {
        try {
            this.cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
        }
        catch (Exception cannotHappen) {
            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void cleanRemovedTasksCalledByCleanerThread(long cleanupDelayMs) {
        for (File taskDir : this.listNonEmptyTaskDirectories()) {
            TaskId id;
            String dirName;
            block11: {
                long lastModifiedMs;
                long now;
                dirName = taskDir.getName();
                id = TaskId.parse(dirName);
                if (this.locks.containsKey(id)) continue;
                if (!this.lock(id) || (now = this.time.milliseconds()) <= (lastModifiedMs = taskDir.lastModified()) + cleanupDelayMs) break block11;
                log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", new Object[]{this.logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs});
                Utils.delete((File)taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
            }
            try {
                this.unlock(id);
            }
            catch (IOException exception) {
                log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", this.logPrefix(), dirName, id), (Throwable)exception);
            }
            continue;
            catch (IOException | OverlappingFileLockException exception) {
                try {
                    log.warn(String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:", this.logPrefix(), dirName, id), (Throwable)exception);
                }
                catch (Throwable throwable) {
                    try {
                        this.unlock(id);
                    }
                    catch (IOException exception2) {
                        log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", this.logPrefix(), dirName, id), (Throwable)exception2);
                    }
                    throw throwable;
                }
                try {
                    this.unlock(id);
                }
                catch (IOException exception3) {
                    log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", this.logPrefix(), dirName, id), (Throwable)exception3);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    private void cleanRemovedTasksCalledByUser() throws Exception {
        AtomicReference<Exception> firstException = new AtomicReference<Exception>();
        for (File taskDir : this.listAllTaskDirectories()) {
            TaskId id;
            String dirName;
            block13: {
                dirName = taskDir.getName();
                id = TaskId.parse(dirName);
                if (this.locks.containsKey(id)) continue;
                if (this.lock(id)) {
                    log.info("{} Deleting state directory {} for task {} as user calling cleanup.", new Object[]{this.logPrefix(), dirName, id});
                    Utils.delete((File)taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                    break block13;
                }
                log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.", new Object[]{this.logPrefix(), dirName, id});
            }
            try {
                this.unlock(id);
                Utils.delete((File)taskDir);
            }
            catch (IOException exception) {
                log.error(String.format("%s Failed to release lock on state directory %s for task %s with exception:", this.logPrefix(), dirName, id), (Throwable)exception);
                firstException.compareAndSet(null, exception);
            }
            continue;
            catch (IOException | OverlappingFileLockException exception) {
                try {
                    log.error(String.format("%s Failed to delete state directory %s for task %s with exception:", this.logPrefix(), dirName, id), (Throwable)exception);
                    firstException.compareAndSet(null, exception);
                }
                catch (Throwable throwable) {
                    try {
                        this.unlock(id);
                        Utils.delete((File)taskDir);
                    }
                    catch (IOException exception2) {
                        log.error(String.format("%s Failed to release lock on state directory %s for task %s with exception:", this.logPrefix(), dirName, id), (Throwable)exception2);
                        firstException.compareAndSet(null, exception2);
                    }
                    throw throwable;
                }
                try {
                    this.unlock(id);
                    Utils.delete((File)taskDir);
                }
                catch (IOException exception3) {
                    log.error(String.format("%s Failed to release lock on state directory %s for task %s with exception:", this.logPrefix(), dirName, id), (Throwable)exception3);
                    firstException.compareAndSet(null, exception3);
                }
            }
        }
        Exception exception = (Exception)firstException.get();
        if (exception != null) {
            throw exception;
        }
    }

    File[] listNonEmptyTaskDirectories() {
        File[] taskDirectories = !this.hasPersistentStores || !this.stateDir.exists() ? new File[]{} : this.stateDir.listFiles(pathname -> {
            if (!pathname.isDirectory() || !PATH_NAME.matcher(pathname.getName()).matches()) {
                return false;
            }
            return !this.taskDirEmpty(pathname);
        });
        return taskDirectories == null ? new File[]{} : taskDirectories;
    }

    File[] listAllTaskDirectories() {
        File[] taskDirectories = !this.hasPersistentStores || !this.stateDir.exists() ? new File[]{} : this.stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
        return taskDirectories == null ? new File[]{} : taskDirectories;
    }

    private FileChannel getOrCreateFileChannel(TaskId taskId, Path lockPath) throws IOException {
        if (!this.channels.containsKey(taskId)) {
            this.channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
        }
        return this.channels.get(taskId);
    }

    private FileLock tryLock(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    private static class LockAndOwner {
        final FileLock lock;
        final String owningThread;

        LockAndOwner(String owningThread, FileLock lock) {
            this.owningThread = owningThread;
            this.lock = lock;
        }
    }

    @JsonIgnoreProperties(ignoreUnknown=true)
    static class StateDirectoryProcessFile {
        @JsonProperty
        private final UUID processId;

        public StateDirectoryProcessFile() {
            this.processId = null;
        }

        StateDirectoryProcessFile(UUID processId) {
            this.processId = processId;
        }
    }
}

