6 Remote procedure call (RPC)

教程2中,我们学习了如何使用 Work Queue 在多个worker 中分发“耗时”任务。

但是,如果我们需要调用一个远程服务器上的接口,并且等待其相应结果呢?这将是另一个话题了。这种模式被广泛地称为 远程服务调用(Remote Procedure Call ,RPC)。

本篇教程中,我们将使用RabbitMQ搭建一个RPC系统:一个客户端和一个可扩展的RPC服务端。由于我们没有任何值得分配的耗时任务,我们将创建一个返回斐波那契数列的虚拟 RPC 服务

客户端接口(Client interface)

为了说明RPC服务如何被使用,我们将创建一个简单的客户端类。它将对外暴露一个名为call的方法用来发送RPC请求并阻塞直到收到响应:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC注意事项A note on RPC

尽管 RPC 是计算中非常常见的模式,但也备受指摘。当程序员无法区分函数是本地函数还是远程调用函数时,问题就出现了。这样的困惑会导致系统不可控,并且增加了代码调试的复杂度。滥用RPC不仅不会简化代码,反而会增加代码的不可维护性。

时刻谨记上面的注意事项,考虑以下提议:

  • 确保可以明确区分哪个函数调用是本地的,哪个是远程的

  • 完善系统文档,明确组件间的依赖关系

  • 处理错误情况。当RPC服务器长时间宕机时,客户端应该如何应对?

如果对此有疑问则应避免使用RPC。如果能保证以上三点,应该使用异步调用的方式来避免RPC调用时的阻塞问题,结果被异步返回并推入下一处理阶段。

回调队列(Callback queue)

通常,使用RabbitMQ实现RPC很简单。客户端发送请求消息,服务端响应一个消息。为了接收响应,我们发送请求时需要附带一个'callback'队列地址。我们可以使用默认队列(在 Java 客户端中是独占的)。样例代码如下:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性(Message properties)

AMQP 0-9-1 协议预定义了14个消息属性。其中大部分都很少使用,除了下面这几个:

  • deliveryMode:持久化消息(值为2)或者临时消息(任意其他值)。你应该还记得这个属性,在第二节教程

  • contentType: 用来描述 mime-type 类型。例如,JSON类型通常将该属性设置为:application/json。

  • replyTo: 通常用于给回调队列命名。

  • correlationId: 用于将响应和请求关联起来

需要引入下面的包:

import com.rabbitmq.client.AMQP.BasicProperties;

相关性Id(Correlation Id)

在上面的方法中,我们每进行一次RPC调用,都会创建一个回调队列(callback queue)。这是很低效的,我们有更好的方式——为每个客户端创建一个单独的回调队列。

.这会带来一个新问题,收到回调队列的消息后无法区分消息是哪个请求的。相关性 Id (corelationId)就是用来解决这个问题的。我们会为每一个请求设置一个唯一的 corelationId ,然后基于此,我们可以将请求和响应关联起来。如果我们收到一个未知的correlationId值,可以安全地丢弃它——因为它并不是我们生成的请求。

你可能会问,为什么我们要忽略回调队列中的未知消息,而不是失败并报错?这是因为服务端可能存在的条件竞争。虽然不太可能,但仍然存在RPC服务端发回了响应,但还没有来得及确认请求的消息就挂掉的情况。这种情况下,重启后的 RPC 服务端 将会重复处理这个请求。这就是为什么在客户端我们必须优雅地处理重复响应,并且理想情况下 RPC 应该是幂等的。

总结:

我们的RPC工作流程如下:

  • For an RPC request, the Client sends a message with two properties: replyTo, which is set to an anonymous exclusive queue created just for the request, and correlationId, which is set to a unique value for every request.对于一个 RPC 请求,客户端发送的消息包含两个属性:replyTo,为这次请求创建的唯一队列。correlationId,为每次请求设置唯一值。

  • The request is sent to an rpc_queue queue.请求被发送给队列:rpc_queue

  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.RPC工作端(也就是Server)正在等待这个队列上的消息,当消息出现时,他开始工作并将结果通过 replyTo 设置的响应队列返回给客户端。

  • The client waits for data on the reply queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.客户端在等待响应队列的消息。如果

Putting it all together

The Fibonacci task:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).

The code for our RPC server can be found here: RPCServer.java.

The server code is rather straightforward:

  • As usual we start by establishing the connection, channel and declaring the queue.

  • We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos.

  • We use basicConsume to access the queue, where we provide a callback in the form of an object (DeliverCallback) that will do the work and send the response back.

The code for our RPC client can be found here: RPCClient.java.

The client code is slightly more involved:

  • We establish a connection and channel.

  • Our call method makes the actual RPC request.

  • Here, we first generate a unique correlationId number and save it - our consumer callback will use this value to match the appropriate response.

  • Then, we create a dedicated exclusive queue for the reply and subscribe to it.

  • Next, we publish the request message, with two properties: replyTo and correlationId.

  • At this point we can sit back and wait until the proper response arrives.

  • Since our consumer delivery handling is happening in a separate thread, we're going to need something to suspend the main thread before the response arrives. Usage of BlockingQueue is one possible solutions to do so. Here we are creating ArrayBlockingQueue with capacity set to 1 as we need to wait for only one response.

  • The consumer is doing a very simple job, for every consumed response message it checks if the correlationId is the one we're looking for. If so, it puts the response to BlockingQueue.

  • At the same time main thread is waiting for response to take it from BlockingQueue.

  • Finally we return the response back to the user.

Making the Client request:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.java and RPCServer.java.

Compile and set up the classpath as usual (see tutorial one):

javac -cp $CP RPCClient.java RPCServer.java

Our RPC service is now ready. We can start the server:

java -cp $CP RPCServer
# => [x] Awaiting RPC requests

To request a fibonacci number run the client:

java -cp $CP RPCClient
# => [x] Requesting fib(30)

The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.

  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.

Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:

  • How should the client react if there are no servers running?

  • Should a client have some kind of timeout for the RPC?

  • If the server malfunctions and raises an exception, should it be forwarded to the client?

  • Protecting against invalid incoming messages (eg checking bounds, type) before processing.

If you want to experiment, you may find the management UI useful for viewing the queues.

最后更新于