/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.contrib.streaming.state.RocksDBStateDataTransferHelper;
import org.apache.flink.contrib.streaming.state.StateHandleDownloadSpec;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

public class RocksDBStateDownloader
implements Closeable {
    private final RocksDBStateDataTransferHelper transfer;

    @VisibleForTesting
    public RocksDBStateDownloader(int restoringThreadNum) {
        this(RocksDBStateDataTransferHelper.forThreadNum(restoringThreadNum));
    }

    public RocksDBStateDownloader(RocksDBStateDataTransferHelper transfer) {
        this.transfer = transfer;
    }

    public void transferAllStateDataToDirectory(Collection<StateHandleDownloadSpec> downloadRequests, CloseableRegistry closeableRegistry) throws Exception {
        CloseableRegistry internalCloser = new CloseableRegistry();
        closeableRegistry.registerCloseable((AutoCloseable)internalCloser);
        try {
            List futures = this.transferAllStateDataToDirectoryAsync(downloadRequests, internalCloser).collect(Collectors.toList());
            FutureUtils.completeAll(futures).get();
        }
        catch (Exception e) {
            downloadRequests.stream().map(StateHandleDownloadSpec::getDownloadDestination).map(Path::toFile).forEach(FileUtils::deleteDirectoryQuietly);
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to download data for state handles.", (Throwable)e);
        }
        finally {
            if (closeableRegistry.unregisterCloseable((AutoCloseable)internalCloser)) {
                IOUtils.closeQuietly((AutoCloseable)internalCloser);
            }
        }
    }

    private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(Collection<StateHandleDownloadSpec> handleWithPaths, CloseableRegistry closeableRegistry) {
        return handleWithPaths.stream().flatMap(downloadRequest -> Stream.concat(downloadRequest.getStateHandle().getSharedState().stream(), downloadRequest.getStateHandle().getPrivateState().stream()).map(entry -> {
            String localPath = entry.getLocalPath();
            StreamStateHandle remoteFileHandle = entry.getHandle();
            Path downloadDest = downloadRequest.getDownloadDestination().resolve(localPath);
            return ThrowingRunnable.unchecked(() -> this.downloadDataForStateHandle(downloadDest, remoteFileHandle, closeableRegistry));
        })).map(runnable -> CompletableFuture.runAsync(runnable, this.transfer.getExecutorService()));
    }

    private void downloadDataForStateHandle(Path restoreFilePath, StreamStateHandle remoteFileHandle, CloseableRegistry closeableRegistry) throws IOException {
        if (closeableRegistry.isClosed()) {
            return;
        }
        try {
            int numBytes;
            FSDataInputStream inputStream = remoteFileHandle.openInputStream();
            closeableRegistry.registerCloseable((AutoCloseable)inputStream);
            Files.createDirectories(restoreFilePath.getParent(), new FileAttribute[0]);
            OutputStream outputStream = Files.newOutputStream(restoreFilePath, new OpenOption[0]);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            byte[] buffer = new byte[8192];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            closeableRegistry.unregisterAndCloseAll(new Closeable[]{outputStream, inputStream});
        }
        catch (Exception ex) {
            IOUtils.closeQuietly((AutoCloseable)closeableRegistry);
            throw new IOException(ex);
        }
    }

    @Override
    public void close() throws IOException {
        this.transfer.close();
    }
}

