3 发布/订阅(Publish/Subscribe)
最后更新于
最后更新于
在上一节中,我们创建了一个工作队列。其目的是将每个任务只分发给一个worker。本节我们将换一种玩法:我们投递一条消息,让所有消费者都能接收到。这种模式称为发布/订阅(Publish/Subscribe)。
为了演示这种模式,我们将构建一个日志记录系统。它包含两个应用——第一个发送日志消息,第二个接收并打印日志消息。
在我们的日志记录系统中,每个运行中的接收程序都能接收到消息(课代表注:相同的一条消息会被每个消费者收到)。这样一来,我们就可以让一个接收者保存日志到硬盘;另一个在屏幕上打印日志。
实际上,已发布的日志消息将会被广播给所有接收者。
在前面的教程中,我们直接通过队列(queue)来发送和接收消息。现在是时候介绍一下 RabbitMQ 中的完整消息模型了。
先对前面介绍过的内容做个简单回顾:
生产者(producer)是用来发送消息的应用。
队列(queue)是一个消息缓冲区。
消费者(consumer)是用来接收消息的应用。
RabbitMQ 消息模型的核心思想是:生产者(producer)从不直接将消息发送给队列(queue)。实际上在大多数情况下,生产者甚至不知道消息会被分发到哪个队列。
相反,生产者只可以发消息给交换(exchange)。交换非常简单,一方面它从生产者接收消息,另一方面它将消息推送到队列。交换必须确切地知道如何处理收到的消息。是该把消息发给某个队列?还是发给多个队列?或者扔掉消息?路由类型(exchange type)定义了具体的行为规则。
有如下几种路由类型:direct, topic, headers 和 fanout。我们先看一下最后一种,fanout:
fanout 类型的交换非常简单。正如其名,它就是把收到的消息广播给所有它所知道的队列。这正是我们的日志记录系统需要的方式。
列出所有交换
为了列出服务器上的所有交换,可以使用 rabbitmqctl 命令:
列表中将会出现一些名如 amq.* 的交换和默认(没名字的)交换。这些是默认创建的,目前不需要使用他们。
没名字的交换
在前面的教程中,我们并不知道交换的存在,但是依然可以发送消息到队列。这是因为我们使用了默认交换,用空字符("")来标识.
回想一下我们之前如发布消息:
第一个参数是交换的名字,空字符表明使用默认交换:如果消息存在,则通过指定的 routingKey 将消息路由到队列中。
现在我们可以发送给指定名称的交换了:
你可能还记得之前我们使用有名称的队列(记得 hello 和 task_queue 吗?)。给队列命名至关重要,因为我们需要让 worker 监听相应队列。当你想把队列在生产者和消费者之间共享时,必须给队列命名。
但这并不适用于我们的日志记录系统。我们需要监听全部日志消息,而非部分。而且我们只关心当前正在发送的消息,历史消息并不关心。为此,我们需要做两点:
首先,每次当我们连接到 RabbitMQ ,我们需要一个全新的队列。为此我们可以每次创建一个随机命名的队列,或者更好的选择是让服务器创建一个随机命名的队列。
其次,一旦队列没有消费者连接,它将自动删除。
在 Java 客户端中,当我们调用无参方法 queueDeclare() 时,就创建了一个非持久化,专用的(课代表注:连接关闭时自动删除队列),自动删除的队列:
了解更多关于 exclusive 标志和其他属性,查看guide on queue。
此时,变量queueName
是一个随机生成的队列名称字符串。它的值可能是:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
我们已经创建了一个 fanout 类型的交换,现在我们需要告诉交换该把消息发送给哪个队列。交换和队列之间的这种关系我们称为绑定(binding)。
如上代码会将名为"logs"的交换的消息发送到我们的队列中。
列出绑定(Listing bindings)
猜猜看用什么工具可以列出绑定关系?
发布日志消息的生产者程序和前面教程中的代码没有多大区别。最大的改动是现在我们把消息发送给名为"logs"的交换,而以前我们发送给默认的匿名交换。发送消息的时候,需要提供routingKey
,不过对于fanout类型的交换,它会忽略routingKey
的值。下面是发送日志程序的代码EmitLog.java
:
如你所见,当我们建立连接之后,声明了交换。这一步是非常必要的
如果还没有队列被绑定到交换,消息将会丢失,不过这并不影响我们当前的应用场景,如果当前没有消费者,我们可以放心地丢弃消息。
ReceiveLogs.java
:
像之前那样编译。
如果想保存日志到文件,可以打开终端并输入:
如果想在屏幕上输出日志,打开一个新终端并运行:
要发出日志类型,输入:
使用rabbitmqctl list_bindings
可以验证代码创建的绑定和队列是否正确。运行两个 ReceiveLogs.java
程序后,你应该能看到如下输出:
对此的解释也很简单:logs交换的消息发送给了两个由服务端生成名字的队列。这正是我们期望的结果。
想要知道如何监听众多消息中的一部分(子集),请查阅教程4。