Python 操作 Rabbit MQ 基础:
一、简介:
1.介绍:
RabbitMQ是一个消息代理:它接收和转发消息。
可以把它比作为邮局,当您要发布邮件放在邮箱中时,可以确定这封邮件让哪位快递员来进行发送到您的收件人手中。
2.术语:
1.发送消息的程序是
生产者
:
2.队列可以理解为邮箱,用来存储一些邮件。队列的由主机的
存储器
和
磁盘限制约束
,它本质上是一个大的
消息缓冲器
。很多生产者可以发送到一个队列的消息,并且许多消费者可以尝试从一个队列接收数据:
3.消费者可以理解为接收人。一个消费者是一个程序,
主要是等待接收信息
:
注意 :
-
生产者、消费者、消息队列
不必驻留在同一个主机上
。
二、发送"Hello world":
实现功能:
生产者 → 消息队列 → 消费者
RabbitMQ有许多不同语言的客户端,我们使用的Python客户端,那么使用
Pika 1.0.0
包:
安装命令:
python -m pip install pika --upgrade
三、生产者:
新创建一个程序
send.py
,将向队列发送一条消息:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 创建一个实例 本地访问
IP
地址可以为 localhost
后面5672是端口地址
(
可以不用指定
,
因为默认就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 声明一个管道
,
在管道里发送消息
channel
=
connection
.
channel
(
)
# 在管道里声明队列名称
channel
.
queue_declare
(
queue
=
'hello'
)
# 参数exchange
=
''
表示默认交换
,
目前记住rabbitmq消息永远不是直接发送到队列中的
,
它需要通过交换
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'hello'
,
body
=
'hello world'
)
# 队列关闭
connection
.
close
(
)
注意 :
- 若没有看到’已发送’消息,有可能是消息队列没有足够的磁盘空间可用。默认情况下它至少需要200MB空间,因此会拒绝接收消息。
四、消费者:
新创建第二个程序
receive.py
,将从队列中接收消息并打印出来:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 创建实例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 声明管道
channel
=
connection
.
channel
(
)
# 这里又声明一次
'hello'
队列
,
因为你不知道哪个程序
(
send
.
py
)
先运行
,
所以要声明两次
channel
.
queue_declare
(
queue
=
'hello'
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
(
'ch:'
,
ch
)
print
(
'method:'
,
method
)
print
(
'properties:'
,
properties
)
print
(
'--------收到消息:---------{}'
.
format
(
body
)
)
# 看消息是否已经接收
time
.
sleep
(
20
)
# 睡
20
秒
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告诉生产者
,
消息处理完成
# 消费消息
channel
.
basic_consume
(
queue
=
'hello'
,
# 从指定的消息队列中接收消息
on_message_callback
=
callback
)
# 如果收到消息
,
就调用callback函数来处理
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 开始消费消息
列出目前有的队列:
sudo rabbitmqctl list_queues
五、运行程序:
1.首先在终端上使用我们的程序,先让
消费者的程序运行起来
,等待生产者的发送:
python receive
.
py
#
===
===
=
正在等待消息
===
===
==
2.现在开始让生产者,发送消息:
python send
.
py
#
--
--
--
--
--
--
--
--
--
--
--
发出消息
--
--
--
--
--
--
--
--
--
--
-
3.可以看到消费者的打印信息:
(
'ch:'
,
<
BlockingChannel impl
=
<
Channel number
=
1
OPEN
conn
=
<
SelectConnection
OPEN
transport
=
<
pika
.
adapters
.
utils
.
io_services_utils
.
_AsyncPlaintextTransport object at
0x153b590
>
params
=
<
ConnectionParameters host
=
localhost port
=
5672
virtual_host
=
/
ssl
=
False
>>>
>
)
(
'method:'
,
<
Basic
.
Deliver
(
[
'consumer_tag=ctag1.cc4070930568408fb20c783f85f9336e'
,
'delivery_tag=1'
,
'exchange='
,
'redelivered=False'
,
'routing_key=hello'
]
)
>
)
(
'properties:'
,
<
BasicProperties
>
)
--
--
--
--
收到消息:
--
--
--
--
-
hello world
4.可以查看一下目前有的队列:
rabbitmqctl list_queues
// 执行命令
Listing queues
...
hello
0
// hello 队列的名称 0:消息数量
5.我们已经成功的通过RabbitMQ发送第一条消息,hello world,但是
receive.py
程序不会退出,它将保持准备接收更多消息,使用
Ctrl +c 中断
。