Rabbitmq 路由
这里我们将为日志系统增加一个特性–只订阅一部分信息。例如,我们可以将错误信息存入日志文件,将其他信息打印出来。
绑定(Bindings)
在日志系统中我们已经使用过绑定,像这样调用代码:
channel.queue_bind(
exchange=exchange_name,
queue=queue_name
)一个绑定是交换和队列之间的关系,可以简单地理解为这个队列只对这个交换的信息感兴趣。绑定可以指定额外的routing_key参数。为了避免和一个basic_publish参数混淆,我们称它binding key,可以通过一下方式创建一个带有key的绑定:
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key='black'
)binding key的含义依赖于交换的类型。fanout交换类型会直接忽略这个值。
Direct exchange
我们之前的日志系统使用fanout交换类型,直接将信息广播给所有消费者。现在我们想要扩展它允许根据根据级别来过滤。例如,将错误级别的日志存入磁盘,将普通的日志直接输出而不浪费磁盘空间。为了达到这个目的,这里将使用direct交换。direct交换的路由算法也比较简单,一个消息只推送到binding key和routing key匹配的队列,举例如下图:

在上述例子中可以看到direct交换x有两个与之绑定的队列。第一个队列的binding key是orange,第二个队列有两个binding key,分别是black和green。通过这个配置,一个带有orage的routing key的信息推送到交换后会被路由到队列Q1;一个带有black或者green的routing key的信息推送到交换后会被路由到队列Q2,其他的信息会被丢弃。
多绑定(Muliple bindings)

用相同的binding key绑定多个队列完全是可行的。在我们的例子中可以在x和Q1之间添加一个名为black的binding key,这样的话,direct交换将会表现得像fanout并且会将信息广播到所有匹配的队列。一个带有black的routing key的信息会递送到Q1和Q2队列。
发送日志
我们将使用这个模型来构建日志系统,我们将会发送信息到direct交换,我们将会以日志的级别作为routing key。首先创建交换:
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)然后发送消息:
channel.basic_publish(
exchange='direct_logs',
routing_key=serverity,
body=message
)为了简化程序,我们假设日志级别只有info,warning,error三种情况。
订阅
我们将为每一个需要的日志级别创建一个新的绑定:
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)最终结果

emit_log_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('172.17.0.6', 5672, credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message
)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()receive_logs_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('172.17.0.6', 5672, credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(
exchange='direct_logs',
exchange_type='direct'
)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
callback,
queue=queue_name,
no_ack=True
)
channel.start_consuming()如果只想要保存warning和error的日志信息到文件中,只需运行:
./receive_logs_direct.py warning error > logs_from_rabbit.log发送error级别的日志:
./emit_log_direct.py error "Run. Run. Or it will explode"
