/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.rest;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class SqlGatewayRestEndpointStatementITCase
extends AbstractSqlGatewayStatementITCase {
    @RegisterExtension
    @Order(value=3)
    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
    private static final RestClient restClient = SqlGatewayRestEndpointStatementITCase.getTestRestClient();
    private static final ExecuteStatementHeaders executeStatementHeaders = ExecuteStatementHeaders.getInstance();
    private static SessionMessageParameters sessionMessageParameters;
    private static final FetchResultsHeaders fetchResultsHeaders;
    private static final int OPERATION_WAIT_SECONDS = 100;
    private static final String PATTERN1 = "Caused by: ";
    private static final String PATTERN2 = "\tat ";
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).build();
    private SessionHandle sessionHandle;

    SqlGatewayRestEndpointStatementITCase() {
    }

    @Override
    @BeforeEach
    public void before(@TempDir Path temporaryFolder) throws Exception {
        super.before(temporaryFolder);
        this.sessionHandle = service.openSession(this.defaultSessionEnvironment);
        sessionMessageParameters = new SessionMessageParameters(this.sessionHandle);
    }

    @Override
    protected String runSingleStatement(String statement) throws Exception {
        ExecuteStatementRequestBody executeStatementRequestBody = new ExecuteStatementRequestBody(statement, Long.valueOf(0L), new HashMap());
        CompletableFuture response = restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)executeStatementHeaders, (MessageParameters)sessionMessageParameters, (RequestBody)executeStatementRequestBody);
        ExecuteStatementResponseBody executeStatementResponseBody = (ExecuteStatementResponseBody)response.get();
        String operationHandleString = executeStatementResponseBody.getOperationHandle();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)operationHandleString);
        OperationHandle operationHandle = new OperationHandle(UUID.fromString(operationHandleString));
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(this.sessionHandle).getOperationManager().getOperation(operationHandle));
        CommonTestUtils.waitUtil(() -> SQL_GATEWAY_SERVICE_EXTENSION.getService().getOperationInfo(this.sessionHandle, operationHandle).getStatus().isTerminalStatus(), (Duration)Duration.ofSeconds(100L), (String)"Failed to wait operation finish.");
        FetchResultsResponseBody fetchResultsResponseBody = this.fetchResults(this.sessionHandle, operationHandle, 0L);
        ResultSet resultSet = fetchResultsResponseBody.getResults();
        String resultType = fetchResultsResponseBody.getResultType();
        Assertions.assertThat((Object)resultSet).isNotNull();
        Assertions.assertThat(Arrays.asList(ResultSet.ResultType.PAYLOAD.name(), ResultSet.ResultType.EOS.name())).contains((Object[])new String[]{resultType});
        return this.toString(AbstractSqlGatewayStatementITCase.StatementType.match(statement), resultSet.getResultSchema(), (RowDataToStringConverter)new RowDataToStringConverterImpl(resultSet.getResultSchema().toPhysicalRowDataType(), DateTimeUtils.UTC_ZONE.toZoneId(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader(), false), new RowDataIterator(this.sessionHandle, operationHandle));
    }

    FetchResultsResponseBody fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, Long token) throws Exception {
        FetchResultsTokenParameters fetchResultsTokenParameters = new FetchResultsTokenParameters(sessionHandle, operationHandle, token);
        CompletableFuture response = restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)fetchResultsHeaders, (MessageParameters)fetchResultsTokenParameters, (RequestBody)EmptyRequestBody.getInstance());
        return (FetchResultsResponseBody)response.get();
    }

    @Override
    protected String stringifyException(Throwable t) {
        String message = t.getMessage();
        String[] splitExceptions = message.split(PATTERN1);
        return splitExceptions[splitExceptions.length - 1].split(PATTERN2)[0];
    }

    @Override
    protected boolean isStreaming() {
        return ((RuntimeExecutionMode)Configuration.fromMap((Map)service.getSessionConfig(this.sessionHandle)).get(ExecutionOptions.RUNTIME_MODE)).equals((Object)RuntimeExecutionMode.STREAMING);
    }

    private static RestClient getTestRestClient() {
        try {
            return new RestClient(new Configuration(), (Executor)Executors.newFixedThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("rest-client-thread-pool")));
        }
        catch (ConfigurationException e) {
            throw new SqlGatewayException("Cannot get rest client.", (Throwable)e);
        }
    }

    private static Long parseTokenFromUri(String uri) {
        if (uri == null || uri.length() == 0) {
            return null;
        }
        String[] split = uri.split("/");
        return Long.valueOf(split[split.length - 1]);
    }

    static {
        fetchResultsHeaders = FetchResultsHeaders.getInstance();
    }

    private class RowDataIterator
    implements Iterator<RowData> {
        private final SessionHandle sessionHandle;
        private final OperationHandle operationHandle;
        private Long token = 0L;
        private Iterator<RowData> fetchedRows = Collections.emptyIterator();

        public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
            this.sessionHandle = sessionHandle;
            this.operationHandle = operationHandle;
            this.fetch();
        }

        @Override
        public boolean hasNext() {
            while (this.token != null && !this.fetchedRows.hasNext()) {
                try {
                    this.fetch();
                }
                catch (Exception exception) {}
            }
            return this.token != null;
        }

        @Override
        public RowData next() {
            return this.fetchedRows.next();
        }

        private void fetch() throws Exception {
            FetchResultsResponseBody fetchResultsResponseBody = SqlGatewayRestEndpointStatementITCase.this.fetchResults(this.sessionHandle, this.operationHandle, this.token);
            String nextResultUri = fetchResultsResponseBody.getNextResultUri();
            ResultSet resultSet = fetchResultsResponseBody.getResults();
            this.token = SqlGatewayRestEndpointStatementITCase.parseTokenFromUri(nextResultUri);
            this.fetchedRows = resultSet.getData().iterator();
        }
    }
}

