/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcServerException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.TestRpcBase;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestProtoBufRpc
extends TestRpcBase {
    private static RPC.Server server;
    private static final int SLEEP_DURATION = 1000;

    @Before
    public void setUp() throws IOException {
        conf = new Configuration();
        conf.setInt("ipc.maximum.data.length", 1024);
        conf.setBoolean("ipc.server.log.slow.rpc", true);
        RPC.setProtocolEngine((Configuration)conf, TestRpcBase.TestRpcService.class, ProtobufRpcEngine.class);
        RPC.setProtocolEngine((Configuration)conf, TestRpcService2.class, ProtobufRpcEngine.class);
        TestRpcBase.PBServerImpl serverImpl = new TestRpcBase.PBServerImpl();
        BlockingService service = TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl);
        server = new RPC.Builder(conf).setProtocol(TestRpcBase.TestRpcService.class).setInstance((Object)service).setBindAddress("0.0.0.0").setPort(0).build();
        addr = NetUtils.getConnectAddress((Server)server);
        PBServer2Impl server2Impl = new PBServer2Impl();
        BlockingService service2 = TestRpcServiceProtos.TestProtobufRpc2Proto.newReflectiveBlockingService(server2Impl);
        server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, (Object)service2);
        server.start();
    }

    @After
    public void tearDown() throws Exception {
        server.stop();
    }

    private TestRpcService2 getClient2() throws IOException {
        return (TestRpcService2)RPC.getProxy(TestRpcService2.class, (long)0L, (InetSocketAddress)addr, (Configuration)conf);
    }

    @Test(timeout=5000L)
    public void testProtoBufRpc() throws Exception {
        TestRpcBase.TestRpcService client = TestProtoBufRpc.getClient(addr, conf);
        TestProtoBufRpc.testProtoBufRpc(client);
    }

    public static void testProtoBufRpc(TestRpcBase.TestRpcService client) throws Exception {
        client.ping(null, TestProtoBufRpc.newEmptyRequest());
        TestProtos.EchoRequestProto echoRequest = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        TestProtos.EchoResponseProto echoResponse = client.echo(null, echoRequest);
        Assert.assertEquals((Object)echoResponse.getMessage(), (Object)"hello");
        try {
            client.error(null, TestProtoBufRpc.newEmptyRequest());
            Assert.fail((String)"Expected exception is not thrown");
        }
        catch (ServiceException e) {
            RemoteException re = (RemoteException)e.getCause();
            RpcServerException rse = (RpcServerException)re.unwrapRemoteException(new Class[]{RpcServerException.class});
            Assert.assertNotNull((Object)rse);
            Assert.assertTrue((boolean)re.getErrorCode().equals((Object)RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER));
        }
    }

    @Test(timeout=5000L)
    public void testProtoBufRpc2() throws Exception {
        TestRpcService2 client = this.getClient2();
        client.ping2(null, TestProtoBufRpc.newEmptyRequest());
        TestProtos.EchoResponseProto echoResponse = client.echo2(null, TestProtoBufRpc.newEchoRequest("hello"));
        Assert.assertEquals((Object)echoResponse.getMessage(), (Object)"hello");
        MetricsRecordBuilder rpcMetrics = MetricsAsserts.getMetrics(server.getRpcMetrics().name());
        MetricsAsserts.assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
        MetricsAsserts.assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
        MetricsRecordBuilder rpcDetailedMetrics = MetricsAsserts.getMetrics(server.getRpcDetailedMetrics().name());
        MetricsAsserts.assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
    }

    @Test(timeout=5000L)
    public void testProtoBufRandomException() throws Exception {
        TestRpcBase.TestRpcService client = TestProtoBufRpc.getClient(addr, conf);
        try {
            client.error2(null, TestProtoBufRpc.newEmptyRequest());
        }
        catch (ServiceException se) {
            Assert.assertTrue((boolean)(se.getCause() instanceof RemoteException));
            RemoteException re = (RemoteException)se.getCause();
            Assert.assertTrue((boolean)re.getClassName().equals(URISyntaxException.class.getName()));
            Assert.assertTrue((boolean)re.getMessage().contains("testException"));
            Assert.assertTrue((boolean)re.getErrorCode().equals((Object)RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_APPLICATION));
        }
    }

    @Test(timeout=6000L)
    public void testExtraLongRpc() throws Exception {
        TestRpcService2 client = this.getClient2();
        String shortString = StringUtils.repeat((String)"X", (int)4);
        TestProtos.EchoResponseProto echoResponse = client.echo2(null, TestProtoBufRpc.newEchoRequest(shortString));
        Assert.assertEquals((Object)shortString, (Object)echoResponse.getMessage());
        String longString = StringUtils.repeat((String)"X", (int)4096);
        try {
            client.echo2(null, TestProtoBufRpc.newEchoRequest(longString));
            Assert.fail((String)"expected extra-long RPC to fail");
        }
        catch (ServiceException serviceException) {
            // empty catch block
        }
    }

    @Test(timeout=12000L)
    public void testLogSlowRPC() throws IOException, ServiceException {
        TestRpcService2 client = this.getClient2();
        for (int x = 0; x < 10000; ++x) {
            client.ping2(null, TestProtoBufRpc.newEmptyRequest());
            continue;
        }
        RpcMetrics rpcMetrics = server.getRpcMetrics();
        Assert.assertTrue((rpcMetrics.getProcessingSampleCount() > 999L ? 1 : 0) != 0);
        long before = rpcMetrics.getRpcSlowCalls();
        client.sleep(null, TestProtoBufRpc.newSleepRequest(3000));
        long after = rpcMetrics.getRpcSlowCalls();
        Assert.assertEquals((long)(before + 1L), (long)after);
    }

    @Test(timeout=12000L)
    public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
        server.setLogSlowRPC(false);
        TestRpcService2 client = this.getClient2();
        for (int x = 0; x < 10000; ++x) {
            client.ping2(null, TestProtoBufRpc.newEmptyRequest());
        }
        RpcMetrics rpcMetrics = server.getRpcMetrics();
        Assert.assertTrue((rpcMetrics.getProcessingSampleCount() > 999L ? 1 : 0) != 0);
        long before = rpcMetrics.getRpcSlowCalls();
        client.sleep(null, TestProtoBufRpc.newSleepRequest(1000));
        long after = rpcMetrics.getRpcSlowCalls();
        Assert.assertEquals((long)before, (long)after);
    }

    public static class PBServer2Impl
    implements TestRpcService2 {
        @Override
        public TestProtos.EmptyResponseProto ping2(RpcController unused, TestProtos.EmptyRequestProto request) throws ServiceException {
            return TestProtos.EmptyResponseProto.newBuilder().build();
        }

        @Override
        public TestProtos.EchoResponseProto echo2(RpcController unused, TestProtos.EchoRequestProto request) throws ServiceException {
            return TestProtos.EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
        }

        @Override
        public TestProtos.SleepResponseProto sleep(RpcController controller, TestProtos.SleepRequestProto request) throws ServiceException {
            try {
                Thread.sleep(request.getMilliSeconds());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return TestProtos.SleepResponseProto.newBuilder().build();
        }
    }

    @ProtocolInfo(protocolName="testProto2", protocolVersion=1L)
    public static interface TestRpcService2
    extends TestRpcServiceProtos.TestProtobufRpc2Proto.BlockingInterface {
    }
}

