分布式系统(五)

任何分布式系统的开发都涉及到跨进程之间的远程过程调用,也就是所谓的 RPC。那么,现在假如没有这些开源框架,而是需要你自己来设计并实现一套远程过程调用机制,你应该怎么做的?

问题背景

因此,在面试过程中,面试官通常都会暂时抛开框架的具体使用方式,而从远程调用的基本概念执行流程出发来进行提问。

针对这类面试题,如果候选人没有做精心的准备,往往很难回答到位,原因就在于我们平时不大会从这一角度入手考虑问题,也就不会重点关注相关的技术体系。从面试角度讲,这道题也存在一些灵活的提问方式,包括:

  • 想要实现远程调用,整个流程应该包含那些基本的技术组件?
  • RPC 架构的组成结构是怎么样的?
  • 如果让你设计一个简单的 RPC 架构,你会怎么做?
  • 你认为 Dubbo 框架中最核心的组件有哪些?

问题分析

  • 作为一种架构模式,业界已经对 RPC 架构的各个组成部分进行了抽象和提炼,从而形成一套完整的组件体系。
  • 正是基于这套 RPC 的组件体系,业界诞生了各种具体实现框架,Dubbo 就是其中的代表。
  • 而无论 Dubbo 等框架如何实现,其底层的组成结构是完全遵循 RPC 架构的。
  • 因此,只要掌握了 RPC 架构的组成结构,关于这类问题的大部分内容我们已经可以回答了。

我们可以采用最简单、最常见的技术体系实现一个最基础的 RPC 架构。通过这一过程,一方面可以确保具体的实现技术和纯粹的理论体系能够对应起来;另一方面,在面试过程中,这也是凸显出个人优势的一个加分项,能够提升面试官对候选人的认可程度

技术体系

  1. 客户端组件
    • RpcClient,负责导入远程接口代理实现;
    • RpcProxy,远程接口的代理实现;
    • RpcCaller,负责执行远程调用;
    • RpcConnector,负责连接服务器。
  2. 服务端组件
    • RpcServer,负责导出远程接口;
    • RpcInvoker,负责调用服务端接口;
    • RpcAcceptor,负责接收网络请求;
    • RpcProcessor,负责处理调用过程。
  3. 公共组件
    • RpcProtocol,负责执行网络传输;
    • RpcChannel,数据传输的通道。

源码解析

接下来我们通过一个简单的示例来实现前面所介绍的的各个技术组件。该示例的主要目的是演示如何从零开始构建一个最基本的 RPC 架构。

首先我们定义一个业务服务,称为 UserService,包含一个用于根据用户编码获取用户姓名的业务方法,如下所示:

1
2
3
4
public interface UserService {
//根据用户编码获取用户姓名
public String getUserNameByCode(String userCode);
}

UserService 接口的实现类也非常简单,我们通过一个内存 Map 来模拟对数据的存储和查询操作,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserServiceImpl implements UserService {

private Map<String, String> users = new HashMap<String, String>();

public UserServiceImpl() {
users.put("user1", "tianyalan1");
users.put("user2", "tianyalan2");
}

@Override
public String getUserNameByCode(String userCode) {
return users.get(userCode);
}
}

对于 RPC 架构而言,有了服务定义之后,我们就需要分别构建一个服务端组件 RpcServer 和一个客户端组件 RpcClient。但在这之前,我们首先需要定义一种在客户端和服务器端之间进行通信的消息格式,这里命名为 Protocol,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Data
public class Protocol implements Serializable {
//包名+接口名称
private String interfaceName;
//调用方法名
private String methodName;
//参数类型:按照接口参数顺序
private Class[] paramsTypes;
//参数:按照接口参数顺序
private Object[] parameters;

public Protocol (String interfaceName, String methodName, Class[] paramsTypes, Object[] parameters) {
super();
this.interfaceName = interfaceName;
this.methodName = methodName;
this.paramsTypes = paramsTypes;
this.parameters = parameters;
}
}

可以看到 Protocol 中定义了一次服务请求所需要的接口名、方法名以及方法调用所需要的参数。注意到该类同时实现了 Serializable 接口,这是 Java 中的序列化接口,实现该接口的类能够通过网络进行远程传输。在 RPC 基础架构图中,Protocol 类相当于是通过 RpcProtocol 进行传递的请求数据

接下来我们考虑构建 RpcServer 类,该类需要实现 RPC 基础架构图中的各个服务端组件。RpcServer 类完整代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class RpcServer {

//线程池
private int threadSize = 10;
private ExecutorService threadPool;
//自定义缓存
private Map<String, Object> servicePool;
//服务端口
private int port = 9000;

public RpcServer() {
super();
synchronized (this) {
threadPool = Executors.newFixedThreadPool(this.threadSize);
}
}

public RpcServer(int threadSize, int port) {
this.threadSize = threadSize;
this.port = port;
synchronized (this) {
threadPool = Executors.newFixedThreadPool(this.threadSize);
}
}

public RpcServer(Map<String, Object> servicePool, int threadSize, int port) {
this.threadSize = threadSize;
this.port = port;
this.servicePool = servicePool;
synchronized (this) {
threadPool = Executors.newFixedThreadPool(this.threadSize);
}
}

//1. 实现Socket监听:RpcAcceptor
public void service() throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
Socket receiveSocket = serverSocket.accept();
final Socket socket = receiveSocket;

//执行请求
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
//2. 处理请求
process(socket);

socket.close();
} catch(IOException e) {
//篇幅关系,省略对各种异常信息的处理
}
}
});
}
}

//2.处理请求:RpcProcessor
private void process(Socket receiveSocket) throws IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {

ObjectInputStream objectInputStream = new ObjectInputStream(receiveSocket.getInputStream());

Protocol protocol = (Protocol)objectInputStream.readObject();

//调用服务
Object result = call(protocol);

ObjectOutputStream objectOutputStream = new ObjectOutputStream(receiveSocket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.close();
}

//3.执行方法调用:RpcInvoker
private Object call(Protocol protocol) throws ClassNotFoundException, NoSuchMethodException,
IllegalAccessException, InstantiationException, InvocationTargetException {

if(servicePool == null) {
synchronized (this) {
servicePool = new HashMap<String, Object>();
}
}

//通过接口名称构建实现类
String interfaceName = protocol.getInterfaceName();
Object service = servicePool.get(interfaceName);
Class<?> serviceClass = Class.forName(interfaceName);

//判断servicePool对象是否存在,如果不存在,就创建新对象并放入池中
if(service == null) {
synchronized (this) {
service = serviceClass.newInstance();
servicePool.put(interfaceName, service);
}
}

//通过实现类来构建方法
Method method = serviceClass.getMethod(protocol.getMethodName(), protocol.getParamsTypes());

//通过反射来实现方法的执行
Object result = method.invoke(service, protocol.getParameters());
return result;
}
}

RpcServer类代码相对比较长,我们结合 RPC 基本架构对其进行分段解析。

  • service 方法:
    service 方法接收请求并基于 Socket 启动端口监听,通过线程池为进入的每个请求启动一个线程进行处理。就 RPC 基础架构而言,该 service 方法相当于扮演 RpcAcceptor 的角色
  • process 方法
    service 方法启动了线程池,而每个线程负责执行 process 方法。这里的 process 方法从 Socket 中获取输入流,然后把输入流中的数据转化为 Protocol,从而获取远程方法调用的各项元数据。就 RPC 基础架构而言,该 process 方法充当了 RpcProcessor 的角色
  • call 方法
    一旦获取 Protocol,process 方法就调用内部的 call 方法来执行真正的方法调用。这里通过反射机制获取位于服务器端的方法并进行执行。显然,该 call 方法对应于 RpcInvoker 角色

介绍完 RpcServer 中的各个技术组件,我们再来看一下 RpcClient 的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcClient {
private String serverAddress;
private int serverPort;

public RpcClient(String serverAddress, int serverPort) {
this.serverAddress = serverAddress;
this.serverPort = serverPort;
}

//RpcConnector + RpcCaller
public Object sendAndReceive(Protocol protocol) {
Object result = null;

try {
Socket socket = new Socket(serverAddress, serverPort);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(protocol);

ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
result = objectInputStream.readObject();
} catch (Exception e) {
//篇幅关系,省略对各种异常信息的处理
}

return result;
}
//省略getter/setter方法
}

RpcClient 类的代码相对比较简单,主要就是根据远程服务的地址通过 Socket 发起通信,传入一个 Protocol 对象并返回远程调用的结果。

完成了 RpcServer 和 RpcClient 类之后,我们就可以编写一些测试用例来进行验证。验证方法就是启动 RpcServer,然后通过 RpcClient 执行远程调用。这里我们编写一个 ServerTest 来启动 RpcServer,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ServerTest {

public static void main(String[] args) {

Map<String, Object> servicePool = new HashMap<String, Object>();
servicePool.put("com.juejin.rpc.service.UserService", new UserServiceImpl());

RpcServer server = new RpcServer(servicePool, 4, 9000);

try{
server.service();
} catch (IOException e) {
e.printStackTrace();
}
}
}

然后我们再编写一个 ClientTest 对远程服务发起请求,整个过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ClientTest {

public static void main(String[] args) {

String serverAddress = "127.0.0.1";
int serverPort = 9000;

RpcClient client = new RpcClient(serverAddress, serverPort);
Protocol protocol = buildProtocol("user1");
Object result = client.sendAndReceive(protocol);
System.out.println(result);

protocol = buildProtocol("user2");
result = client.sendAndReceive(protocol);
System.out.println(result);

}

private static Protocol buildProtocol(String userCode) {
String interfaceName = "com.juejin.rpc.service.UserService";
Class[] paramsTypes = {String.class};
Object[] parameters = {userCode};
String methodName = "getUserNameByCode";

Protocol protocol = new Protocol(interfaceName, methodName, paramsTypes, parameters);
return protocol;
}
}

至此,我们构建了一个简洁但又完整的 RPC 架构。通过这个示例,我们可以对 RPC 的整体结构有一个清晰的认识。事实上,无论是多么复杂的 RPC 框架,都是从这样的基础架构开始逐步演进而来

解题要点

围绕 RPC 架构,我们首先要明确的第一个解题要点就是它的组成结构。RPC 架构的组成呈现一种非常标准的对称结构,围绕远程调用过程,我们可以提炼出一组分别针对服务消费者和服务提供者的技术组件。把各个技术组件进行分类梳理是一种有助于记忆的学习方法,你也可以尝试能不能对这一组成结构再做进一步的细化。

RPC 架构的组成结构偏向理论描述,想要理解该架构中各个技术组件的具体实现过程,一种有效的方法就是自己动手做一些实践

小结

想要掌握分布式服务框架,首先得掌握 RPC 架构。

本讲内容对 RPC 架构的组成结构进行了详细的介绍。同时,我们也针对 RPC 架构所需要实现的各个技术组件,提供了一个简洁但又完整的示例,从而帮忙你对该架构有一个感性的认识。可以说,RPC 是分布式系统中一项基础设施类的技术体系,但凡涉及到服务与服务之间的交互就需要使用到 RPC 架构。当你在使用一个分布式框架时,可以尝试使用今天介绍的 RPC 架构的基本结构进行分析,从而加深对这项技术体系的理解