原创

手把手实现一个简单的RPC(远程服务调用)

RPC简介

相信能看到RPC的那一定有了解过Dubbo这个框架了,Dubbo:是一个分布式的服务框架,致力于高性能和透明化的RPC远程服务调用方案,以及SOA治理方案。 那就奇怪了,这RPC的远程服务调用方案是啥呀,我们来看一下Dubbo的架构图,从图上可以看见,在服务提供者(Provider)向注册中心(Registry)注册服务并暴露注册的服务。消费者(Consumer)向注册中心获取想要的服务后,消费者(Consumer)是直接调用服务提供者(Provider)通信进行服务调用的,而消费者和服务者不属于一个服务内,这种之间的调用也是我们需要探究的点,他们俩是咋调用的。

在这里插入图片描述

Jy就通过制作一个简单的远程服务调用的案例来透析dubbo中服务调用的大概真相。

制作一个简单的RPC

最终完成的有以下的服务:rpc-api:提供实体类和服务接口,rpc-common:封装请求和响应,rpc-core:核心组件,test-client:消费者测试, test-server:服务提供者测试

在这里插入图片描述

先将需要的依赖定义好,这是SimpleRpc外层的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.df</groupId>
    <artifactId>SimpleRpc</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>rpc-api</module>
        <module>rpc-common</module>
        <module>rpc-core</module>
        <module>test-client</module>
        <module>test-server</module>
    </modules>
    
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>
</project>

然后先创建rpc-api,将需要的实体类和服务接口定义好,后面要进行调用的。整体的结构是这样的

在这里插入图片描述

内部的实体类定义

package com.df;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class HelloObject implements Serializable {

    private Integer id;
    private String message;

}

服务接口定义

package com.df;

public interface HelloService {

    String hello(HelloObject helloObject);

}

然后先创建rpc-common,这里封装了我们要发送的请求文件格式,和响应文件格式。 pom.xml并没有增加任何东西,原封不动。

在这里插入图片描述

先看RpcRequest类,这是一个存储发送请求实体的类,内部封装了请求发送到服务器端所带有的数据。

package com.df.entity;


import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * 向服务端发送的请求实体。带有接口,方法,参数,类型信息
 */
@Data
@Builder
public class RpcRequest implements Serializable {
	
    //待调用接口名称
    private String interfaceName;
	
    //待调用方法的名称 后续使用反射创建方法的时候需要使用
    private String methodName;

    //调用方法的参数 服务端调用服务的时候需要传入对应的参数
    private Object[] parameters;

    //调用方法的参数类型 后续使用反射创建方法的时候需要使用
    private  Class<?>[] paramTypes;
}

然后来看响应实体,响应实体主要返回服务执行的结果和执行的情况,也就是状态码。

package com.df.entity;

import com.df.enumeration.ResponseCode;
import lombok.Data;

import java.io.Serializable;

/**
 * 消费者处理完毕后返回给客户端的信息
 * @param <T>
 */
@Data

public class RpcResponse<T> implements Serializable {


    //状态码
    private Integer statusCode;

    //响应状态补充信息
    private String message;

    //返回数据
    private T data;


    public static <T> RpcResponse<T> success(T data){
        RpcResponse<T> response = new RpcResponse<T>();
        response.setStatusCode(200);
        response.setData(data);
        return response;
    }

    public static <T> RpcResponse<T> fail(ResponseCode code){
        RpcResponse<T> response = new RpcResponse<T>();
        response.setStatusCode(code.getCode());
        response.setMessage(code.getMessage());
        return response;
    }

}

还需要一个状态码的实体类

package com.df.enumeration;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum ResponseCode {

    SUCCESS(200,"调用方法成功"),
    FAIL(500,"调用方法失败"),
    NOT_FOUND_METHOD(500,"未找到指定方法"),
    NOT_FOUND_CLASS(500,"未找到指定类");

    private final int code;
    private final String message;

}

这样就将我们需要的请求实体和响应实体封装好,到时候直接将数据保存到请求实体进行发送到服务器端即可。 之后来建设rpc-core核心实现类(关键方法)。先进行pom.xml的配置,引入刚设立的rpc-common

在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>SimpleRpc</artifactId>
        <groupId>com.df</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-core</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>com.df</groupId>
            <artifactId>rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

先来设置rpcClient类,通过这个类设立socket进行字节流传输。

package com.df.client;

import com.df.entity.RpcRequest;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

/**
 * 远程调用方法的客户端
 */
@Slf4j
public class RpcClient {

    /**
     *
     * @param rpcRequest  发送的请求实体
     * @param host 发送的域名
     * @param port 发送的端口
     * @return 返回结果
     */
    public Object sendRequest(RpcRequest rpcRequest, String host, int port){
        //创建socket连接,封装目标域名和端口
        try(Socket socket = new Socket(host, port)) {
            //设置字节二进制字节输出流,封装socket
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            //写入请求实体
            objectOutputStream.writeObject(rpcRequest);
            //发送刷新
            objectOutputStream.flush();
            log.info("成功写入");

            //等待服务端返回的响应信息
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            //返回响应中的内容
            return objectInputStream.readObject();
        }catch (IOException | ClassNotFoundException e) {
            log.info("调用时发生异常");
            e.printStackTrace();
            return null;
}

在之后就要去设置动态代理类,通过invoke方法调用写好的RpcClient发送请求连接。具体在注释里啦

package com.df.client;

import com.df.entity.RpcRequest;
import com.df.entity.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class RpcClientProxy implements InvocationHandler {

    /**
     *  客户端动态代理
     *
     */
    private String host;
    private int port;

    //构造方法传入域名和端口
    public RpcClientProxy(String host, int port){
        this.host = host;
        this.port = port;
    }

    //进行代理对象的构造,传入类加载器和接口以及当前的invocationHandler对象用于重定向
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }

    //调用代理对象方法时会调用到invoke方法
    /**
     *
     * @param proxy 代理对象
     * @param method 调用的代理方法
     * @param args 传入的参数
     * @return 服务器端响应回来的结果
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //封装rpcRequest实体,传入接口名称,调用方法名称,参数和参数类型
        RpcRequest rpcRequest = RpcRequest.builder().interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName()).parameters(args).paramTypes(method.getParameterTypes()).build();
        RpcClient rpcClient = new RpcClient();
        //调用rpcClient 刚封装好的请求,传入请求实体,域名和端口,发送到服务器端,返回处理后的实体数据
        return ((RpcResponse)rpcClient.sendRequest(rpcRequest, host, port)).getData();
    }
}

到这里客户端向服务器端发送对应接口方法的请求的步骤就好了,之后就是服务器端接收客户端发来的请求,然后进行处理的过程。 先来定义RpcServer类,该类通过serverSocket监听端口,并使用线程池异步处理socket请求。

package com.df.server;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.lang.reflect.Array;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;

/**
 * 远程调用服务端
 */
@Slf4j
public class RpcServer {

    private final ExecutorService threadPool;

    public RpcServer() {
        //线程池核心池大小
        int corePoolSize = 5;
        //线程池总池大小
        int maxinumPoolSize = 50;
        //空闲线程持续时间
        long keepAliveTime = 60;
        //阻塞队列定义 定义最大容量为100的有界队列
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
        //线程工场
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //设置线程池
        threadPool = new ThreadPoolExecutor(corePoolSize, maxinumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
    }

    //调用后开启 服务端socket监听,传入service就是对应的接口实现类, port为端口
    public void register(Object service, int port){
        //定义serverSocket服务器类监听对应的端口,一旦接收到请求则进行处理
        try(ServerSocket serverSocket = new ServerSocket(port)){
            log.info("接收到,服务器开始启动");
            Socket socket;
            //如果socket服务端接收到socket则进行对socket进行处理
            while((socket = serverSocket.accept()) != null){
                log.info("客户端连接!Ip为:" + socket.getInetAddress());
                //通过线程池创建线程进行异步处理
                threadPool.execute(new WorkerThread(socket, service));
            }
        } catch (IOException e) {
            log.info("连接时发生错误");
            e.printStackTrace();
        }
    }
}

WorkerThread实现类,用来正式的处理传入的请求。

package com.df.server;

import com.df.entity.RpcRequest;
import com.df.entity.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

/**
 * 进行实际过程调用的工作线程
 */
@Slf4j
public class WorkerThread implements Runnable {

    private Socket socket;

    private Object service;

    //构造方法 在RpcServer中调用传入socket请求和实现类
    public WorkerThread(Socket socket, Object service){
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        //创建输入输出流
        try(ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            //从输入流中读取出请求实体
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            //通过反射调用方法实现类,获取对应调用的方法
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            //调用实现类的方法,传入参数
            Object result = method.invoke(service, rpcRequest.getParameters());
            //将输出结果写入输出流返回
            objectOutputStream.writeObject(RpcResponse.success(result));
            //刷新返回
            objectOutputStream.flush();

        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            log.info("调用时有错误发生");
            e.printStackTrace();
        }
    }
}

ok,到这客户端发送和服务器实现返回数据都已经完成,就剩下最后的客户端调用和服务器开启监听了。 test-client客户端调用服务 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>SimpleRpc</artifactId>
        <groupId>com.df</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>test-client</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.df</groupId>
            <artifactId>rpc-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.df</groupId>
            <artifactId>rpc-core</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

设置TestClient进行调用

import com.df.HelloObject;
import com.df.HelloService;
import com.df.client.RpcClient;
import com.df.client.RpcClientProxy;

/**
 * 测试客户端
 */
public class TestClient {
    public static void main(String[] args) {
        //设置实现了invocationHandler接口的代理客户端,传入要请求的域名和端口
        RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
        //生成一个代理对象
        HelloService helloService = proxy.getProxy(HelloService.class);
        //创建传入的数据
        HelloObject helloObject = new HelloObject(12, "This is message");
        //调用代理的方法,会转到invoke方法内,发送socket请求,返回服务端响应的结果
        String hello = helloService.hello(helloObject);
        System.out.println(hello);
    }
}

然后是设置服务端开启监听 test-server pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>SimpleRpc</artifactId>
        <groupId>com.df</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>test-server</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.df</groupId>
            <artifactId>rpc-core</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.df</groupId>
            <artifactId>rpc-api</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

设置helloService的实现类

package com.df;

public class HelloServiceImpl implements  HelloService{
    public String hello(HelloObject helloObject) {
        System.out.println("接收到" + helloObject);
        return "这是用掉的返回值";
    }
}

设置testServer进行监听端口,等待监听请求

package com.df;

import com.df.server.RpcServer;

public class TestServer {

    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        RpcServer rpcServer = new RpcServer();
        //调用register方法进行监听9000端口,传入的请求会带有对应的方法,参数,参数类型,通过反射生成方法调用
        //helloService实现类的对应方法,再返回数据
        rpcServer.register(helloService, 9000);
    }
}

进行调用测试并总结

好了,这样就配完了,我们来试验一下,先打开testServer进行监听。再打开testClient进行发送。查看效果。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

成功返回数据。相对来说做的比较简陋,也是参考着别人大佬已经完成的作品进行做的,但是核心调用的思想和功能还是有做到的。

总结:我们渐渐揭开了远程服务调用的面纱,了解到真相。总的来说就是客户端通过接口创建动态代理对象实例,在对实例进行调用方法的时候,进行记录方法以及参数进行socket字节流传输到服务器端。服务器端进行socket监听严阵以待。一旦监听到数据流到,就进行解析并通过反射调用传来的方法。然后以字节流的方式原路返回结果集。客户端就可以获取到结果。这样做在业务层面是感知不到变化的。甚至有种在同一个服务中调用的感觉。 当然做的肯定没有dubbo那么完善,如果哪里做错了请大佬直接指出(痛骂)。

附一波大佬的github:https://github.com/CN-GuoZiyang/My-RPC-Framework

java
rpc

  • 作者:LinJy(联系作者)
  • 发表时间:2020-08-15 22:35
  • 版权声明:自由转载-非商用-非衍生-保持署名(null)
  • undefined
  • 评论

    留言