/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.security.MessageDigest;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class BlobOutputStream
extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(BlobOutputStream.class);
    private final BlobKey.BlobType blobType;
    private final OutputStream socketStream;
    private final Socket socket;
    private final MessageDigest md;

    BlobOutputStream(JobID jobID, BlobKey.BlobType blobType, Socket socket) throws IOException {
        this.blobType = blobType;
        if (socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        this.socket = socket;
        this.socketStream = socket.getOutputStream();
        this.md = BlobUtils.createMessageDigest();
        BlobOutputStream.sendPutHeader(this.socketStream, jobID, blobType);
    }

    @Override
    public void write(int b) throws IOException {
        BlobUtils.writeLength(1, this.socketStream);
        this.socketStream.write(b);
        this.md.update((byte)b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        int remainingBytes = len;
        while (remainingBytes > 0) {
            int bytesToSend = Math.min(65536, remainingBytes);
            BlobUtils.writeLength(bytesToSend, this.socketStream);
            this.socketStream.write(b, off, bytesToSend);
            this.md.update(b, off, bytesToSend);
            remainingBytes -= bytesToSend;
            off += bytesToSend;
        }
    }

    public BlobKey finish() throws IOException {
        BlobUtils.writeLength(-1, this.socketStream);
        InputStream is = this.socket.getInputStream();
        return BlobOutputStream.receiveAndCheckPutResponse(is, this.md, this.blobType);
    }

    private static void sendPutHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        outputStream.write(0);
        if (jobId == null) {
            outputStream.write(0);
        } else {
            outputStream.write(2);
            outputStream.write(jobId.getBytes());
        }
        outputStream.write(blobType.ordinal());
    }

    private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md, BlobKey.BlobType blobType) throws IOException {
        int response = is.read();
        if (response < 0) {
            throw new EOFException("Premature end of response");
        }
        if (response == 0) {
            BlobKey remoteKey = BlobKey.readFromInputStream(is);
            byte[] localHash = md.digest();
            if (blobType != remoteKey.getType()) {
                throw new IOException("Detected data corruption during transfer");
            }
            if (!Arrays.equals(localHash, remoteKey.getHash())) {
                throw new IOException("Detected data corruption during transfer");
            }
            return remoteKey;
        }
        if (response == 1) {
            Throwable cause = BlobUtils.readExceptionFromStream(is);
            throw new IOException("Server side error: " + cause.getMessage(), cause);
        }
        throw new IOException("Unrecognized response: " + response + '.');
    }
}

