Python 操作 Rabbit MQ 发布/订阅 (五)
一、发布、订阅:
我们将一个消息分发给
多个消费者
,这种模式被称为
发布/订阅
。
为了更好的理解这个模式,我们将构建一个日志系统,它包括两个程序:
- 第一个程序,负责发送日志消息;
- 第二个程序,负责获取消息并输出内容;
在日志系统中,所有正在运行的接收方程序都会接收消息;
- 一个接受者,把日志写入硬盘中;
- 另一个接受者,把日志输出到屏幕上;
最终,日志消息被广播给所有的接受者。
二、交换机(Exchanges):
概念
:应用程序发送消息时,先把消息给交换机,由交换机投递给队列,而不是直接给队列。交换机可以由多个
消息通道(Channel)
,用于投递消息。
简单概括下之前的知识 :
- 发布者(Producer):是发布消息的应用程序。
- 队列(Queue):用于消息存储的缓冲。
- 消费者(Consumer):是接收消息的应用程序。
- P:代表是发布者;
- X:是交换机;
详解图意 :发布者(P )→交换机(X)→队列(Q)→消费者(C );
-
交换机一边从发布者方接收消息,一边把消息推送到队列(Q)。
交换机必须知道如何处理它接收到的消息,是推送到指定的队列、还是多个队列,或者是忽略消息
。这些都是通过交换机类型(Exchange Type)
来定义的。
交换机类型 :
1.直连交换机(Direct);
2.主题交换机(Topic);
3.头交换机(Headers);
4.扇形交换机(Fanout);
-
主要说明—扇形交换,它把消息发送给它所知道的所有队列。
channel . exchange_declare ( exchange = 'fanout_logs' , exchange_type = 'fanout' )
参数讲解 :
-
exchange:就是交换机的名称,
空字符串代表默认或者匿名交换机;
channel . basic_publish ( exchange = '' )
-
exchange_type:就是交换机的类型;
-
routing_key:分发到指定的队列;
-
body:发送的内容;
-
properties:使消息持久化;
查看交换器列表 :
命令:
rabbitmqctl list_exchanges
Listing exchanges
...
amq
.
rabbitmq
.
log topic
amq
.
direct direct
amq
.
topic topic
amq
.
headers headers
direct
amq
.
fanout fanout
amq
.
rabbitmq
.
trace topic
amq
.
match headers
列表中以amq.*的开头的交换器,都是默认创建的,目前不需要管它们。
三、临时队列:
我们连接上Rabbit MQ的时候,需要一个
全新的、空的队列
(也就是说不使用之前提到的,routing_key参数指定的队列名),我们可以
手动创建一个随机的队列名
,或者让
服务器为我们选择一个随机的队列名(推荐)
。我们仅需要在
调用queue_declare方法时,不提供queue参数
即可:
# 在管道里
,
不声明队列名称
result
=
channel
.
queue_declare
(
)
可通过
result.method.queue
获取已经生成的随机队列名,大概的样子如下所示:
amq
.
gen
-
DIAODS2sDSAKJKS
==
与消费者断开连接时,这个队列应被立即删除:
# 需要一个空的队列 exclusive
=
True 表示与消费者断开时
,
队列立即删除
result
=
channel
.
queue_declare
(
exclusive
=
True
)
四、绑定:
目前已经创建一个扇形交换机和一个队列。现在需要告诉交换机如果发送消息给队列。
交换机和队列之间的联系我们称为绑定(binding)
# 将fanount_logs交换机将会把消息添加到我们的队列中
,
队列名服务器随机生成
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
result
.
method
.
queue
)
查看绑定列表 :
列出所有现存的绑定命令:
rabbitmqctl list_bindings
五、整理本节最终代码:
图解最终流程 :
发布日志与之前的区别 :
1.我们把消息发送给fanout_logs交换机而不是匿名的交换机;
2.发送的时候需要提供routing_key参数,但它的值会被扇形交换机忽略;
以下是
send.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 创建一个实例 本地访问
IP
地址可以为 localhost
后面5672是端口地址
(
可以不用指
# 定
,
因为默认就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 声明一个管道
,
在管道里发送消息
channel
=
connection
.
channel
(
)
# 指定交换机的类型为fanout
,
执行交换机名
:
fanout_logs
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 投递消息 exchange
=
'fancout_logs'
交换机的名命
;
type
=
'fanout'
:
扇形交换机
channel
.
basic_publish
(
exchange
=
'fanout_logs'
,
routing_key
=
''
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 队列关闭
connection
.
close
(
)
若没有绑定队列的交换器,消息将会丢失。以下是
receive.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 创建实例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 声明管道
channel
=
connection
.
channel
(
)
# 指定交换机名为 fanout_logs 类型为扇形
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 表示与消费者断开连接
,
队列立即删除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成队列的名字
queue_name
=
result
.
method
.
queue
# 绑定交换机和队列
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
queue_name
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消费消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 从指定的消息队列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就调用callback函数来处理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 开始消费消息
3.如果想把日志保存到文件中,打开控制台输入:
python receive
.
py
>
logs_from_rabbit
.
log
4.在屏幕中查看日志,在打开一个新的终端运行:
python receive
.
py
===
===
=
正在等待消息
===
===
==
5.发送消息:
python send
.
py 发送第一条消息
6.可以看到消费者接收到了消息,并且日志中也记录了这条消息。
cat logs_from_rabbit
.
log
===
===
=
正在等待消息
===
===
==
[
X
]
Received发送第一条消息
7.确认已经创建的队列绑定:
rabbitmqctl list_bindings
Listing bindings
...
exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
exchange hello queue hello
[
]
exchange task_queue queue task_queue
[
]
fanout_logs exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
交换器fanout_logs把数据发送给两个系统名命的队列