5 Topics

在前面的教程中,我们改进了日志系统,为了解决fanout exchange 的无脑广播,我们使用direct替换,从而实现了选择性接收日志。

尽管使用direct exchange 改进了系统,他还是有局限性——它不能基于多种准则来路由消息。

在我们的日志系统中,我们既想根据日志级别订阅日志,还想根据日志源订阅日志。你可能从syslog unix 工具中了解过这个概念,它基于日志的级别和设备来路由日志。

这将带来极大的灵活性——我们可能想要监听来自'cron'的严重错误日志和来自'kern'的所有日志。

为了在日志系统中实现该功能,我们需要学习一下稍微复杂一点的 topic exchange。

Topic exchange

发送给topic exchange的消息不能使用随意的routing_key——它必须是一组由.分割的单词。可以是任意单词,但通常与消息的某些特性相关。这些routing key都是合法的:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。routing key里的单词数量没有限制,但最长不能超过 255 字节。

binding key的格式也是一样。topic exchange的处理逻辑和direct差不多——通过指定routing key 发送的消息会被分发到所有与匹配的binding key绑定的队列上。

  • * (星号) 代表一个特定的单词

  • # (井号) 代表0个或多个单词

下面的例子可以很容易地解释这一点:

图中,我们将发送一些描述动物特征的消息。消息将会使用包含三个单词(两个点分割)的routing key。第一个单词描述速度,第二个描述颜色,第三个描述品种: "<speed>.<colour>.<species>"

创建三个绑定:Q1使用"*.orange.*"Q2使用"*.*.rabbit""lazy.#"作为bingding key

上面的绑定总结如下:

  • Q1对所有黄色动物感兴趣

  • Q2想订阅所有兔子和所有懒散动物的消息

routing key"quick.orange.rabbit"的消息会同时发送给两个队列, "lazy.orange.elephant"的消息也会发给两个队列。另外, "quick.orange.fox"会发给Q1"lazy.brown.fox"会发给Q2,"lazy.pink.rabbit"只会发给Q2一次,即使有两条匹配到Q2的绑定,"quick.brown.fox"匹配不到任何绑定,所以会被丢弃。

如果我们打破规则,使用一个或四个单词(作为routing key) 发送消息会怎样,比如“orange”或者“quick.orange.male.rabbit”?当然,这些消息不会匹配任何binding,会被丢弃。

不过,对于"lazy.orange.male.rabbit",尽管它有四个单词,仍然会匹配最后一个bingding(lazy.#)并发送到Q2

Topic exchange

Topic exchange 非常强大,它可以变成和其他 exchange 一样工作

当一个queue被binding key "#"绑定后——他将会收到所有消息,忽略routing key——就像 fanout exchange 一样

当绑定中不存在特殊字符:“*”和“#”时,topic exchange 会像direct exchange 一样工作

整合代码(Putting it all together)

我们将会在日志系统中使用 topic exchange。我们假设日志的routing key 由两个单词组成:"<设备>.<级别>”

代码几乎和前面的教程一样

EmitLogTopic.java 源码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java 源码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

编译并运行样例代码,记得像教程1那样引入 classpath——windows 使用 %CP%。

编译:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接收日志消息

java -cp $CP ReceiveLogsTopic "#"

接收设备"kern"的所有日志消息

java -cp $CP ReceiveLogsTopic "kern.*"

或者只关心“critical”级别的日志

java -cp $CP ReceiveLogsTopic "*.critical"

可以创建多个绑定

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

使用routing key "kern.critical" 发送日志消息:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

祝你程序玩得愉快。注意代码并没有设置routing key 或者binding key,你需要玩的时候通过参数指定

(完整源码参考: EmitLogTopic.javaReceiveLogsTopic.java)

下一篇,在 教程6 中将介绍如何利用往返消息实现远程过程调用(Remote Procedure Call)

最后更新于