分布式系统(七)

我们知道服务发布的过程就是服务提供者对外暴露可访问入口的过程。基于所暴露的访问入口,服务消费者就可以成功发起远程调用。我们把这个过程称为服务引用

和服务发布类似,服务引用也具备一套完整的执行流程。那么,服务引用有哪些具体的实现方式呢?

问题背景

和服务发布的过程类似,服务引用看上去并不复杂,但背后要考虑的事情也非常多,包括:

  • 如何实现远程调用过程的透明化?
  • 如何使用缓存机制提高远程调用的效率?
  • 除了缓存机制,你还有什么办法可以提高远程调用的性能?
  • 如何实现异步调用、泛化调用等多种调用形式?

具体针对Dubbo等实现框架,面试官可以这样问:

  • Dubbo 中所采用的服务引用流程是怎么样的?
  • Dubbo 框架中提供了哪几种服务调用方式?

问题分析

然而,对于服务引用而言,也存在与服务发布不一样的地方。首要一点在于服务引用的类型可以是多样的,我们可以使用同步调用异步调用等多种方式来完成远程调用过程。

在日常开发过程中,开发人员倾向于使用同步调用模式来完成远程调用,因为这一模式对于编码过程而言非常友好。而从性能上讲,异步调用模式显然更具优势,但实现复杂度较高。

这就诞生了一种新的实现机制,即“异步转同步”,诸如 Dubbo 等框架就内置了这种实现机制。

技术体系

通用服务引用流程

相较服务发布,服务的调用是一个导入(Import)的过程,整体流程如下图所示:

在上图中,我们可以看到服务调用流程与服务发布流程呈对称结构,所包含的组件包括以下。

调用启动器

调用启动器和上一讲介绍的发布启动器是对应的,这里不再重复介绍。

动态代理

在服务引用过程中,动态代理的作用就是确保远程调用过程的透明化,即开发人员可以使用本地对象来完成对远程对象的处理。

调用管理器

和发布管理器相比,调用管理器的核心功能是提供了一种缓存机制,从而根据保存在服务调用者本地的远程服务地址信息来发起调用。

协议客户端

和协议服务器相对应,协议客户端会创建与服务端的网络连接,发起请求并获取结果。

注册中心

注册中心在这里的作用是提供查询服务定义元数据的入口。

服务调用的类型

服务调用存在两种基本方式,即同步调用模式异步调用模式。其中,同步调用的示意图如下图所示:

可以看到,同步调用的执行流程比较简单。在同步调用中,服务消费者在获取来自服务提供者的响应结果之前一直处于等待状态。

而异步调用则不同,服务消费者一旦发送完请求之后就可以继续执行其他操作,直到服务提供者异步返回结果并通知服务消费者进行接收,如下图所示:

显然,使用异步调用的目的在于获取高性能。

但是,异步调用的开发过程比较复杂,对开发人员的要求较高,所以很多 RPC 框架提供了专门的异步转同步机制,即面向开发人员提供的是同步调用的 API,而具体执行过程则使用的是异步机制

源码解析

ServiceConfig 中的 export 方法相对应,ReferenceConfig 中也存在一个 init 方法,该方法就是 Dubbo 服务引用流程的入口。

服务引用

ReferenceConfiginit 方法中,Dubbo 做了非常多的准备和校验工作,最终来到了如下所示的这行代码中:

1
ref = createProxy(map);

这个 createProxy 方法是理解服务引用的关键入口,我们梳理它的主体结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private T createProxy(Map<String, String> map) {
if (isJvmRefer) {
//生成本地引用URL,使用injvm协议进行本地调用
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
} else {
if (url != null && url.length() > 0) {
//URL不为空,执行点对点调用
} else {
//加载注册中心URL
}

if (urls.size() == 1) {
//单个服务提供者,直接调用
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
//多个服务提供者
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 如果注册中心链接不为空,则将使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
// 生成服务代理类
return (T) proxyFactory.getProxy(invoker);
}

虽然 createProxy 方法的代码比较长,但它的执行逻辑还是比较清晰的。

  • 首先我们根据配置检查是否为本地调用,
  • 如果是则调用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 实例;
  • 如果不是,则读取 URL 配置项,包括用于直联的 URL 或基于注册中心的 URL。
  • 然后,我们对 URL 对象数量进行判断。
  • 如果 URL 数量为 1,则直接通过 Protocol 构建 Invoker 对象;
  • 如果 URL 数量大于 1,即存在多个服务地址,此时先根据每个 URL 构建 Invoker,
  • 然后再通过集群对象 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。

这个过程实际上完成了两个步骤,首先是创建 Invoker 对象,然后才是生成服务代理类。

实际上,Invoker 的构建过程是在 Protocol 中。与服务暴露的讲解思路一样,我们将从 DubboProtocol 这个 Protocol 的 refer 方法入手,如下所示:

1
2
3
4
5
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

这里出现了一个 getClients 方法,该方法用于获取客户端实例,实例类型为 ExchangeClient。

在理解了 getClients 方法之后,我们发现 DubboProtocol 的 refer 方法的作用就是返回一个新建的 DubboInvoker。

DubboInvoker 继承了 AbstractInvoker,而 AbstractInvoker 实现了 Invoker 接口。

AbstractInvoker 是一个抽象的模板方法类,提供了一个 doInvoke 模板方法。

我们来看 DubboInvoker 中如何实现了这个模板方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {//单向调用
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//异步调用
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//同步调用
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
}
//...略
}

可以看到,Dubbo 的远程调用存在三种调用方式:

  • 单向调用
  • 异步无返回
  • 异步转同步

异步转同步是默认的实现方式。下面我们重点对其做进一步展开

服务调用异步转同步过程

在介绍 Dubbo 中异步转同步的服务调用方式之前,我们先围绕 JDK 中的 Future 模式讨论如何实现异步调用。

本质上,Future 模式为我们提供了一种无需等待的服务调用机制

当我们发起一次服务调用时,Future 机制可以直接返回并继续执行其他任务,而不是像传统调用模式那样一直需要等到调用方法的返回。

JDK 对 Future 模式提供了内置的实现,表现为如下所示的 Future 接口:

1
2
3
4
5
6
7
8
9
10
11
12
public interface Future<V> {
//去掉执行
boolean cancel(boolean mayInterruptIfRunning);
//判断是否已取消
boolean isCancelled();
//判断是否已完成
boolean isDone();
//等待任务执行完毕并获取结果
V get() throws InterruptedException, ExecutionException;
//基于一定的超时时间等待任务执行完毕并获取结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Dubbo 中大量使用了基于 Future 机制的异步调用过程,同时也提供了异步转同步的实现机制,这是 Dubbo 提供的这三种远程调用方式中默认的实现方式。

这部分内容实际上已经超出了服务引用的范围,而是更多偏向于讨论底层的网络通信,所以需要你对网络通信相关的内容先进行学习和掌握。

DubboInvokerdoInvoke 方法中,异步转同步过程的实现如下所示:

1
2
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();

我们先来看这里的 request 方法定义(位于 HeaderExchangeChannel 类中),如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ResponseFuture request(Object request, int timeout) throws RemotingException {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

请注意,这里用于发送请求的 channel.send 方法是异步执行的,也就说该方法一旦调用就会直接返回。

为了实现“异步转同步”,Dubbo 在这里使用了 DefaultFuture 这个辅助类。请记住这个类,我们在后续内容中还会再次提到该类。

另一方面,当请求到达服务器端时,在 NettyServer 中会使用一个 NettyHandler 作为网络事件的处理器,如下所示:

1
pipeline.addLast("handler", nettyHandler);

NettyHandler 是一个接口,我们来看它的 messageReceived 方法实现,如下所示:

1
2
3
4
5
6
7
8
9
10
11
private final ChannelHandler handler;

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}

这里把具体的处理逻辑转移到了 Dubbo 中自定义的 ChannelHandler 接口,这个接口有很多实现类,也包括 ChannelHandlerDelegate 这个代理类,而真正处理事件接收逻辑的 HeaderExchangeHandler 正是实现了这个代理类。

HeaderExchangeHandler 中处理响应的实现过程如下所示:

1
2
3
4
5
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}

我们在这里再次看到了 DefaultFuture,这里的 DefaultFuture 就是前面客户端发送请求时用到的 DefaultFutureDefaultFuturereceived 方法中有进一步调用了如下所示的 doReceived 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void doReceived(Response res) {
lock.lock();
try {
//设置响应response对象
response = res;
if (done != null) {
//唤醒阻塞的线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}

注意到这里的 done.signal 方法的执行效果会唤醒阻塞的线程,那么这个阻塞的线程在哪里的?显然,这时候我们应该回到客户端组件看看同步获取调用结果的入口。

我们再次回到在 DubboInvokerdoInvoke 方法中,看到了如下所示的核心代码:

1
2
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();

我们来具体看一下这个获取调用结果的 get 方法执行逻辑,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) { //当响应response对象为空
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}

可以看到,当响应 response 对象为空时,Condition 会执行 await 方法来阻塞当前线程,直到该线程被唤醒、被中断或超过阻塞时间。

而在前面所述的 DefaultFuture 类的 doReceived 方法中,我们也看到会先为 response 赋上返回值,之后执行 Conditionsignal 方法唤醒被阻塞的线程,这样 get 方法就会释放锁,进而执行 returnFromResponse 方法来返回值。

这样,整个远程调用的异步转同步过程就介绍完毕。

作为总结,我们明确 Dubbo 异步转同步的原理其实就是利用 LockCondition 实现了等待通知机制

当客户端发送请求时,将一个请求 Id 和一个 DefaultFuture 对象包装在请求对象中。

而当客户端异步收到响应时,则根据这个请求 Id 从响应结果中获取对应的 DefaultFuture 对象,并 调用该 DefaultFuture 对象的 get 方法获取最终的调用结果

解题要点

在涉及到远程调用的应用场景,很多开源框架都会基于 Future 或它的一些变种,例如 JDK 自身提供的改进版 CompleteFuture,或是 Google 的 guava 框架中提供的 ListenableFuture 等。

类似的问题主要还是关注 Future 机制本身的一些特性,可以发散出一系列的问题,但基本的考点是一致的,回答的思路也类似。

Future 机制本身提供的几个接口也并不复杂,需要理解它们的含义和作用,但也要理解它们存在的不足。

普通 Future 机制的最大问题在于没有提供通知的机制,也就是说我们不知道 Future 什么时候能够完成

前面提到的 CompleteFuture 和 ListenableFuture 实际上都是为了改进普通 Future 存在的这一问题而诞生的。

另一方面,对于 Dubbo 框架中的服务引用过程,我们需要重点掌握的是它的三种调用方式,即单向同步异步

其中前面两种比较好理解,而针对异步,我们在使用 Dubbo 的过程中实际上最终也是转换为同步操作。

针对这一问题,如果只是回答这个问题中所提出的实现方式的种类,那么只要简单列举即可。

但要说明具体的实现细节,尤其是 Dubbo 中“异步转同步”的实现细节,那么需要对本讲内容做深入的理解,并尝试使用自己的语言来总结整个过程。