当前位置:首页 > 在山的那边 > 正文内容

什么是RPC?

那边1年前 (2021-08-01)在山的那边127

什么是RPC?

1. 基本的RPC模型

主要介绍RPC是什么,基本的RPC代码,RPC与REST的区别,gRPC的使用

1.1 基本概念

  • RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务

  • 本地过程调用:如果需要将本地student对象的age+1,可以实现一个addAge()方法,将student对象传入,对年龄进行更新之后返回即可,本地方法调用的函数体通过函数指针来指定。

  • 远程过程调用:上述操作的过程中,如果addAge()这个方法在服务端,执行函数的函数体在远程机器上,如何告诉机器需要调用这个方法呢?

  1. 首先客户端需要告诉服务器,需要调用的函数,这里函数和进程ID存在一个映射,客户端远程调用时,需要查一下函数,找到对应的ID,然后执行函数的代码。

  2. 客户端需要把本地参数传给远程函数,本地调用的过程中,直接压栈即可,但是在远程调用过程中不再同一个内存里,无法直接传递函数的参数,因此需要客户端把参数转换成字节流,传给服务端,然后服务端将字节流转换成自身能读取的格式,是一个序列化和反序列化的过程。
    3.数据准备好了之后,如何进行传输?网络传输层需要把调用的ID和序列化后的参数传给服务端,然后把计算好的结果序列化传给客户端,因此TCP层即可完成上述过程,gRPC中采用的是HTTP2协议。
    总结一下上述过程:

// Client端 //    Student student = Call(ServerAddr, addAge, student)1. 将这个调用映射为Call ID。2. 将Call ID,student(params)序列化,以二进制形式打包3. 把2中得到的数据包发送给ServerAddr,这需要使用网络传输层4. 等待服务器返回结果5. 如果服务器调用成功,那么就将结果反序列化,并赋给student,年龄更新// Server端1. 在本地维护一个Call ID到函数指针的映射call_id_map,可以用Map<String, Method> callIdMap2. 等待客户端请求3. 得到一个请求后,将其数据包反序列化,得到Call ID4. 通过在callIdMap中查找,得到相应的函数指针5. 将student(params)反序列化后,在本地调用addAge()函数,得到结果6. 将student结果序列化后通过网络返回给Client
  • 在微服务的设计中,一个服务A如果访问另一个Module下的服务B,可以采用HTTP REST传输数据,并在两个服务之间进行序列化和反序列化操作,服务B把执行结果返回过来。


  • 由于HTTP在应用层中完成,整个通信的代价较高,远程过程调用中直接基于TCP进行远程调用,数据传输在传输层TCP层完成,更适合对效率要求比较高的场景,RPC主要依赖于客户端和服务端之间建立Socket链接进行,底层实现比REST更复杂。

1.2 rpc demo

系统类图
系统调用过程

客户端:

public class RPCClient<T> {
    public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
        // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream output = null;
                        ObjectInputStream input = null;
                        try{
                            // 2.创建Socket客户端,根据指定地址连接远程服务提供者
                            socket = new Socket();
                            socket.connect(addr);

                            // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
                            output = new ObjectOutputStream(socket.getOutputStream());
                            output.writeUTF(serviceInterface.getName());
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(args);

                            // 4.同步阻塞等待服务器返回应答,获取应答后返回
                            input = new ObjectInputStream(socket.getInputStream());
                            return input.readObject();
                        }finally {
                            if (socket != null){
                                socket.close();
                            }
                            if (output != null){
                                output.close();
                            }
                            if (input != null){
                                input.close();
                            }
                        }
                    }
                });
    }}

服务端:

public class ServiceCenter implements Server {

    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();

    private static boolean isRunning = false;

    private static int port;


    public ServiceCenter(int port){
        ServiceCenter.port = port;
    }


    @Override
    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(port));
        System.out.println("Server Start .....");
        try{
            while(true){
                executor.execute(new ServiceTask(server.accept()));
            }
        }finally {
            server.close();
        }
    }

    @Override
    public void register(Class serviceInterface, Class impl) {
        serviceRegistry.put(serviceInterface.getName(), impl);
    }

    @Override
    public boolean isRunning() {
        return isRunning;
    }

    @Override
    public int getPort() {
        return port;
    }

    @Override
    public void stop() {
        isRunning = false;
        executor.shutdown();
    }
   private static class ServiceTask implements Runnable {
        Socket client = null;

        public ServiceTask(Socket client) {
            this.client = client;
        }

        @Override
        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try{
                input = new ObjectInputStream(client.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                if(serviceClass == null){
                    throw new ClassNotFoundException(serviceName + "not found!");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);

                output = new ObjectOutputStream(client.getOutputStream());
                output.writeObject(result);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(output!=null){
                    try{
                        output.close();
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
                if (input != null) {
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (client != null) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }}
public class ServiceProducerImpl implements ServiceProducer{
    @Override
    public String sendData(String data) {
        return "I am service producer!!!, the data is "+ data;
    }}
public class RPCTest {
    public static void main(String[] args) throws IOException {
        new Thread(new Runnable() {
            @Override            public void run() {
                try {
                    Server serviceServer = new ServiceCenter(8088);
                    serviceServer.register(ServiceProducer.class, ServiceProducerImpl.class);
                    serviceServer.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        ServiceProducer service = RPCClient.getRemoteProxyObj(ServiceProducer.class, new InetSocketAddress("localhost", 8088));
        System.out.println(service.sendData("test"));
    }}

1.3 完整源码

RPCdemo

1.4 分析

这里客户端只需要知道Server端的接口ServiceProducer即可,服务端在执行的时候,会根据具体实例调用实际的方法ServiceProducerImpl,符合面向对象过程中父类引用指向子类对象。

2. gRPC的使用

2.1. gRPC与REST

  • REST通常以业务为导向,将业务对象上执行的操作映射到HTTP动词,格式非常简单,可以使用浏览器进行扩展和传输,通过JSON数据完成客户端和服务端之间的消息通信,直接支持请求/响应方式的通信。不需要中间的代理,简化了系统的架构,不同系统之间只需要对JSON进行解析和序列化即可完成数据的传递。

  • 但是REST也存在一些弊端,比如只支持请求/响应这种单一的通信方式,对象和字符串之间的序列化操作也会影响消息传递速度,客户端需要通过服务发现的方式,知道服务实例的位置,在单个请求获取多个资源时存在着挑战,而且有时候很难将所有的动作都映射到HTTP动词。

  • 正是因为REST面临一些问题,因此可以采用gRPC作为一种替代方案,gRPC 是一种基于二进制流的消息协议,可以采用基于Protocol Buffer的IDL定义grpc API,这是Google公司用于序列化结构化数据提供的一套语言中立的序列化机制,客户端和服务端使用HTTP/2以Protocol Buffer格式交换二进制消息。

  • gRPC的优势是,设计复杂更新操作的API非常简单,具有高效紧凑的进程通信机制,在交换大量消息时效率高,远程过程调用和消息传递时可以采用双向的流式消息方式,同时客户端和服务端支持多种语言编写,互操作性强;不过gRPC的缺点是不方便与JavaScript集成,某些防火墙不支持该协议。

  • 注册中心:当项目中有很多服务时,可以把所有的服务在启动的时候注册到一个注册中心里面,用于维护服务和服务器之间的列表,当注册中心接收到客户端请求时,去找到该服务是否远程可以调用,如果可以调用需要提供服务地址返回给客户端,客户端根据返回的地址和端口,去调用远程服务端的方法,执行完成之后将结果返回给客户端。这样在服务端加新功能的时候,客户端不需要直接感知服务端的方法,服务端将更新之后的结果在注册中心注册即可,而且当修改了服务端某些方法的时候,或者服务降级服务多机部署想实现负载均衡的时候,我们只需要更新注册中心的服务群即可。


    RPC调用过程

2.2. gRPC与Spring Boot

这里使用SpringBoot+gRPC的形式实现RPC调用过程
项目结构分为三部分:client、grpc、server


项目结构

2.2.2 grpc


pom.xml中引入依赖:

<dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-all</artifactId>
       <version>1.12.0</version>
 </dependency>

引入bulid

<build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.4.1.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <pluginId>grpc-java</pluginId>
                    <protocArtifact>com.google.protobuf:protoc:3.0.2:exe:${os.detected.classifier}</protocArtifact>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.2.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

创建.proto文件

syntax = "proto3";   // 语法版本// stub选项option java_package = "com.shgx.grpc.api";option java_outer_classname = "RPCDateServiceApi";option java_multiple_files = true;// 定义包名package com.shgx.grpc.api;// 服务接口定义,服务端和客户端都要遵守该接口进行通信service RPCDateService {
    rpc getDate (RPCDateRequest) returns (RPCDateResponse) {}}// 定义消息(请求)message RPCDateRequest {
    string userName = 1;}// 定义消息(响应)message RPCDateResponse {
    string serverDate = 1;}

mvn complie


生成代码:


2.2.3 client


根据gRPC中的项目配置在client和server两个Module的pom.xml添加依赖


        <dependency>
            <groupId>com.shgx</groupId>
            <artifactId>grpc</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>

编写GRPCClient

public class GRPCClient {
    private static final String host = "localhost";
    private static final int serverPort = 9999;

    public static void main( String[] args ) throws Exception {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress( host, serverPort ).usePlaintext().build();
        try {
            RPCDateServiceGrpc.RPCDateServiceBlockingStub rpcDateService = RPCDateServiceGrpc.newBlockingStub( managedChannel );
            RPCDateRequest rpcDateRequest = RPCDateRequest
                    .newBuilder()
                    .setUserName("shgx")
                    .build();
            RPCDateResponse rpcDateResponse = rpcDateService.getDate( rpcDateRequest );
            System.out.println( rpcDateResponse.getServerDate() );
        } finally {
            managedChannel.shutdown();
        }
    }}

2.2.4 server


按照2.2.3 client的方式添加依赖
创建RPCDateServiceImpl

public class RPCDateServiceImpl extends RPCDateServiceGrpc.RPCDateServiceImplBase{
    @Override
    public void getDate(RPCDateRequest request, StreamObserver<RPCDateResponse> responseObserver) {
        RPCDateResponse rpcDateResponse = null;
        Date now=new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("今天是"+"yyyy年MM月dd日 E kk点mm分");
        String nowTime = simpleDateFormat.format( now );
        try {
            rpcDateResponse = RPCDateResponse
                    .newBuilder()
                    .setServerDate( "Welcome " + request.getUserName()  + ", " + nowTime )
                    .build();
        } catch (Exception e) {
            responseObserver.onError(e);
        } finally {
            responseObserver.onNext( rpcDateResponse );
        }
        responseObserver.onCompleted();
    }}

创建GRPCServer

public class GRPCServer {
   private static final int port = 9999;
   public static void main( String[] args ) throws Exception {
       Server server = ServerBuilder.
               forPort(port)
               .addService( new RPCDateServiceImpl() )
               .build().start();
       System.out.println( "grpc服务端启动成功, 端口=" + port );
       server.awaitTermination();
   }}

2.3. 完整源码

源码参考

3. 参考文章

链接:https://www.jianshu.com/p/7d6853140e13
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

扫描二维码推送至手机访问。

版权声明:本文由那边博客发布,如需转载请注明出处。

本文链接:http://nabian.cn/article/68.html

标签: RPC
分享给朋友:

相关文章

如果不是没得选,谁愿意去努力?

如果不是没得选,谁愿意去努力?

 这篇文章写的带了太多个人情绪了,请各位观看时忽略掉我的个人情绪!I am Sorry! “没伞的孩子只能努力奔跑”这句话挺多人应该都听过,今天有人拿这个话来劝我,我的很想怼回去,假如这孩子有伞他是不...

不要总想着时间会证明一切

不要总想着时间会证明一切

你觉得时间会证明一切?不!时间会揭露一切,过人生真的不能心存侥幸心理,现在把侥幸的因埋下去,经过时间的雨雪风霜侵蚀下早晚会露出他真正的会结出的果。最近有点心存侥幸了,总是想着依附着别人去取得成功,现实一次次证明,就算成功了也与你关系不大,别...

你真的有那么重要么

你真的有那么重要么

很多时候在我们自己的感觉里,我们是很重要的,不可或缺的,可在别人眼里你真的有那么重要么?简单做个小测试,请你现在开始想一下你觉得身边称的上重要人,能想到几个人?我猜估计大多都是家人和最好的朋友,同事和普通路人朋友你估计想都不会想不到他们吧?...

试着建立属于自己的时间管理体系

试着建立属于自己的时间管理体系

经过最近一段的观察,很多的优秀的创作者对于时间的运用和管理都有自己的一套逻辑体系,你是否有自己的时间管理体系呢?一、时间没有规划会比较乱我在以前的时间是没有规划的,业余时间基本上是想到啥做啥,从来没有给自己明确自己每天要做些什么,生活中要做...

承认自己是普通人也是一种成长

承认自己是普通人也是一种成长

小时候总幻想自己长大以后会多么多么的厉害,幻想的多了就真的觉得自己很厉害了,别人说自己是普通人的时候,我嘴上说我们确实是普通人,但是心底里还是不愿承认自己是普通的。人心底不愿意承认的事,就是经历过再多的说教也是不太容易改变。因此我也就一直觉...

人有欲望总归是要控制一下的

人有欲望总归是要控制一下的

欲望这个东西在可控的情况下,是可以给我们带来前进的动力的,欲望一旦失控导致的后果是让人崩溃的。人要对自己的欲望有所控制,不能沦为欲望的奴隶,因为你一旦沦为了欲望的奴隶,你所有的一切都会因此而变的不可控。因为一旦沦陷,你的所想所做的、你所爱的...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。