Python 操作 Rabbit MQ 发布/订阅 (五)

系统 1865 0

Python 操作 Rabbit MQ 发布/订阅 (五)

一、发布、订阅:

我们将一个消息分发给 多个消费者 ,这种模式被称为 发布/订阅

为了更好的理解这个模式,我们将构建一个日志系统,它包括两个程序:

  • 第一个程序,负责发送日志消息;
  • 第二个程序,负责获取消息并输出内容;

在日志系统中,所有正在运行的接收方程序都会接收消息;

  • 一个接受者,把日志写入硬盘中;
  • 另一个接受者,把日志输出到屏幕上;

最终,日志消息被广播给所有的接受者。

二、交换机(Exchanges):

概念 :应用程序发送消息时,先把消息给交换机,由交换机投递给队列,而不是直接给队列。交换机可以由多个 消息通道(Channel) ,用于投递消息。

简单概括下之前的知识

  • 发布者(Producer):是发布消息的应用程序。
  • 队列(Queue):用于消息存储的缓冲。
  • 消费者(Consumer):是接收消息的应用程序。

图解大体流程
Python 操作 Rabbit MQ 发布/订阅 (五)_第1张图片

  • P:代表是发布者;
  • X:是交换机;

详解图意 :发布者(P )→交换机(X)→队列(Q)→消费者(C );

  • 交换机一边从发布者方接收消息,一边把消息推送到队列(Q)。 交换机必须知道如何处理它接收到的消息,是推送到指定的队列、还是多个队列,或者是忽略消息 。这些都是通过 交换机类型(Exchange Type) 来定义的。

交换机类型

1.直连交换机(Direct);

2.主题交换机(Topic);

3.头交换机(Headers);

4.扇形交换机(Fanout);

  • 主要说明—扇形交换,它把消息发送给它所知道的所有队列。

                    
                      channel
                      
                        .
                      
                      
                        exchange_declare
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        'fanout_logs'
                      
                      
                        ,
                      
                      
                             exchange_type
                      
                        =
                      
                      
                        'fanout'
                      
                      
                        )
                      
                    
                  

参数讲解

  • exchange:就是交换机的名称, 空字符串代表默认或者匿名交换机;

                    
                      channel
                      
                        .
                      
                      
                        basic_publish
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        ''
                      
                      
                        )
                      
                    
                  
  • exchange_type:就是交换机的类型;

  • routing_key:分发到指定的队列;

  • body:发送的内容;

  • properties:使消息持久化;

查看交换器列表

命令: rabbitmqctl list_exchanges

            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers

            
          

列表中以amq.*的开头的交换器,都是默认创建的,目前不需要管它们。

三、临时队列:

我们连接上Rabbit MQ的时候,需要一个 全新的、空的队列 (也就是说不使用之前提到的,routing_key参数指定的队列名),我们可以 手动创建一个随机的队列名 ,或者让 服务器为我们选择一个随机的队列名(推荐) 。我们仅需要在 调用queue_declare方法时,不提供queue参数 即可:

            
              # 在管道里
              
                ,
              
               不声明队列名称
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          

可通过 result.method.queue 获取已经生成的随机队列名,大概的样子如下所示:

            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          

与消费者断开连接时,这个队列应被立即删除:

            
              # 需要一个空的队列  exclusive
              
                =
              
              True 表示与消费者断开时
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          

四、绑定:

img

目前已经创建一个扇形交换机和一个队列。现在需要告诉交换机如果发送消息给队列。

交换机和队列之间的联系我们称为绑定(binding)

            
              # 将fanount_logs交换机将会把消息添加到我们的队列中
              
                ,
              
               队列名服务器随机生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          

查看绑定列表

列出所有现存的绑定命令: rabbitmqctl list_bindings

五、整理本节最终代码:

图解最终流程

Python 操作 Rabbit MQ 发布/订阅 (五)_第2张图片

发布日志与之前的区别

1.我们把消息发送给fanout_logs交换机而不是匿名的交换机;

2.发送的时候需要提供routing_key参数,但它的值会被扇形交换机忽略;

以下是 send.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 创建一个实例  本地访问
              
                IP
              
              地址可以为 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因为默认就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 声明一个管道
              
                ,
              
               在管道里发送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交换机的类型为fanout
              
                ,
              
               执行交换机名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 投递消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交换机的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交换机
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 队列关闭
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

若没有绑定队列的交换器,消息将会丢失。以下是 receive.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 创建实例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 声明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交换机名为 fanout_logs 类型为扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 表示与消费者断开连接
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成队列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

# 绑定交换机和队列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消费消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 从指定的消息队列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就调用callback函数来处理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 开始消费消息

            
          

3.如果想把日志保存到文件中,打开控制台输入:

            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 

            
          

4.在屏幕中查看日志,在打开一个新的终端运行:

            
              python receive
              
                .
              
              py 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

5.发送消息:

            
              python send
              
                .
              
              py 发送第一条消息

            
          

6.可以看到消费者接收到了消息,并且日志中也记录了这条消息。

            
              cat logs_from_rabbit
              
                .
              
              log 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received发送第一条消息

            
          

7.确认已经创建的队列绑定:

            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          

交换器fanout_logs把数据发送给两个系统名命的队列


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论