Python 操作 Rabbit MQ 发布/订阅 (五)
一、发布、订阅:
            我们将一个消息分发给
            
              多个消费者
            
            ,这种模式被称为
            
              发布/订阅
            
            。
          
为了更好的理解这个模式,我们将构建一个日志系统,它包括两个程序:
- 第一个程序,负责发送日志消息;
 - 第二个程序,负责获取消息并输出内容;
 
在日志系统中,所有正在运行的接收方程序都会接收消息;
- 一个接受者,把日志写入硬盘中;
 - 另一个接受者,把日志输出到屏幕上;
 
最终,日志消息被广播给所有的接受者。
二、交换机(Exchanges):
            
              概念
            
            :应用程序发送消息时,先把消息给交换机,由交换机投递给队列,而不是直接给队列。交换机可以由多个
            
              消息通道(Channel)
            
            ,用于投递消息。
          
简单概括下之前的知识 :
- 发布者(Producer):是发布消息的应用程序。
 - 队列(Queue):用于消息存储的缓冲。
 - 消费者(Consumer):是接收消息的应用程序。
 
- P:代表是发布者;
 - X:是交换机;
 
详解图意 :发布者(P )→交换机(X)→队列(Q)→消费者(C );
- 
              交换机一边从发布者方接收消息,一边把消息推送到队列(Q)。
              
交换机必须知道如何处理它接收到的消息,是推送到指定的队列、还是多个队列,或者是忽略消息。这些都是通过交换机类型(Exchange Type)来定义的。 
交换机类型 :
1.直连交换机(Direct);
2.主题交换机(Topic);
3.头交换机(Headers);
4.扇形交换机(Fanout);
- 
              
主要说明—扇形交换,它把消息发送给它所知道的所有队列。
channel . exchange_declare ( exchange = 'fanout_logs' , exchange_type = 'fanout' ) 
参数讲解 :
- 
              
exchange:就是交换机的名称,
空字符串代表默认或者匿名交换机;channel . basic_publish ( exchange = '' ) - 
              
exchange_type:就是交换机的类型;
 - 
              
routing_key:分发到指定的队列;
 - 
              
body:发送的内容;
 - 
              
properties:使消息持久化;
 
查看交换器列表 :
            命令:
            
              rabbitmqctl list_exchanges
            
          
            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers
            
          
          列表中以amq.*的开头的交换器,都是默认创建的,目前不需要管它们。
三、临时队列:
            我们连接上Rabbit MQ的时候,需要一个
            
              全新的、空的队列
            
            (也就是说不使用之前提到的,routing_key参数指定的队列名),我们可以
            
              手动创建一个随机的队列名
            
            ,或者让
            
              服务器为我们选择一个随机的队列名(推荐)
            
            。我们仅需要在
            
              调用queue_declare方法时,不提供queue参数
            
            即可:
          
            
              # 在管道里
              
                ,
              
               不声明队列名称
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          
          
            可通过
            
              result.method.queue
            
            获取已经生成的随机队列名,大概的样子如下所示:
          
            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          
          与消费者断开连接时,这个队列应被立即删除:
            
              # 需要一个空的队列  exclusive
              
                =
              
              True 表示与消费者断开时
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          
          四、绑定:
目前已经创建一个扇形交换机和一个队列。现在需要告诉交换机如果发送消息给队列。
            
              交换机和队列之间的联系我们称为绑定(binding)
            
          
            
              # 将fanount_logs交换机将会把消息添加到我们的队列中
              
                ,
              
               队列名服务器随机生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          
          查看绑定列表 :
            列出所有现存的绑定命令:
            
              rabbitmqctl list_bindings
            
          
五、整理本节最终代码:
图解最终流程 :
发布日志与之前的区别 :
1.我们把消息发送给fanout_logs交换机而不是匿名的交换机;
2.发送的时候需要提供routing_key参数,但它的值会被扇形交换机忽略;
            以下是
            
              send.py
            
            :
          
            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika
              
                import
              
               sys
message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              
# 创建一个实例  本地访问
              
                IP
              
              地址可以为 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因为默认就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              
# 声明一个管道
              
                ,
              
               在管道里发送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              
# 指定交换机的类型为fanout
              
                ,
              
               执行交换机名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              
# 投递消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交换机的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交换机
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              
print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 队列关闭
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          
          
            若没有绑定队列的交换器,消息将会丢失。以下是
            
              receive.py
            
            :
          
            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika
# 创建实例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              
# 声明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              
# 指定交换机名为 fanout_logs 类型为扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              
# 表示与消费者断开连接
              
                ,
              
               队列立即删除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              
# 生成队列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue
# 绑定交换机和队列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              
def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              
# 消费消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 从指定的消息队列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就调用callback函数来处理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 开始消费消息
            
          
          3.如果想把日志保存到文件中,打开控制台输入:
            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 
            
          
          4.在屏幕中查看日志,在打开一个新的终端运行:
            
              python receive
              
                .
              
              py 
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          
          5.发送消息:
            
              python send
              
                .
              
              py 发送第一条消息
            
          
          6.可以看到消费者接收到了消息,并且日志中也记录了这条消息。
            
              cat logs_from_rabbit
              
                .
              
              log 
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received发送第一条消息
            
          
          7.确认已经创建的队列绑定:
            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          
          交换器fanout_logs把数据发送给两个系统名命的队列

