pika生产者程序大致步骤:
1. 建立连接connection , 需要认证的调用认证参数
2. 创建通道channel 当然 channel可以池化,这样可以重复使用
3. 声明队列 指定队列属性, 一旦指定属性不能修改, 例如是否持久化,名称
4. 声明交换机 交换机类型,名称等, 也可以不用声明,直接使用 “” 空字符串,默认交换机也可以
5. 将队列与交换机绑定 queue_bind
6. basic_publish 发送到交换机 指定路由键
pika消费者程序大致步骤:
1. 建立连接connection , 需要认证的调用认证参数
2. 创建通道channel 当然 channel可以池化,这样可以重复使用
3. 声明队列 指定队列属性, 一旦指定属性不能修改, 例如是否持久化,名称
4. 声明交换机 交换机类型,名称等, 也可以不用声明,直接使用 “” 空字符串,默认交换机也可以
5. 将队列与交换机绑定 queue_bind
6. basic_consume 消费消息
1. 轮询接收消息
使用消息队列的一个好处就是, 可以将任务消息发送到队列中,由消费者异步进行处理, 同时对于后端消费者可以很容易地增加减少,只需要运行多个进程即可, 方便扩展, 之前的示例中消费端程序就可以开启多个,然后可以看到消费被轮询得分配给每个消费者
将之前的消费者略作更改, 加入客户端编号,启动三个消费者, 通过生产者发送4个消息, 依次收到消息, 即是 轮询(round-robin):
[*] Waiting for messages. To exit press CTRL+C
1. [x] Received 'Hello World!'
1. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
2. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
3. [x] Received 'Hello World!'
2. 消息确认:
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。 如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
消息响应默认auto_ack=False, 不自动确认消息, 即是需要我们处理并确认消息的
确认需要发送确认消息:
在回调callback中加入basic_ack
channel
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
如果auto_ack设置为True,而忘记basic_ack消息确认,消息在程序退出之后就会重新发送,如果不及时释放没响应的消息,RabbitMQ就会占用越来越多的内存。
为了排除这种错误,可以使用rabbitmqctl命令,输出messages_unacknowledged字段:
# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 0
TEST01 0 0
3. 消息持久化
如果没有向rabbitmq指定消息持久化, 则退出或者崩溃的时候,将会丢失所有队列和消息, 消息持久化必须把“队列”和“消息”设为持久化
- 队列声明持久化:
channel.queue_declare(queue='hello', durable=True)
注意一个消息队列被声明过一次后,rabbitmq不允许使用不同的参数重新定义队列, 因此如果存在hello队列,上面会提示错误
- 消息声明持久化
将publish生产者发送消息时候消息属性, delivery_mode的属性设为2
properties=pika.BasicProperties(delivery_mode=2)
生产者代码:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_publish(exchange='',
routing_key='TEST02',
body='Hello World!',
properties=pika.BasicProperties(delivery_mode=2)
)
客户端代码:
channel.queue_declare(queue='TEST02', durable=True)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
4. 设置客户端QOS
开启客户端最大的未处理消息队列大小:
channel.basic_qos(prefetch_count=1)
代码示例:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
5. 发布订阅模式:
rabbitmq在之前介绍的时候可以看到,消息是被依次发送给消费者,即是消息只会被发送给一个消费者,除非开启确认机制时处理失败了, 一个消息发送给多个消费者, 这个是rabbitmq提供的发布订阅模式
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的
交换机类型:直连交换机(direct)-- 一对一, 之前使用的就是这个;主题交换机(topic)-- 模糊匹配,需要符合匹配规则; headers(头交换机)和 扇型交换机(fanout)-- 进行消息广播
fanout会发送消息到交换机所有的消息队列
消息将会根据指定的routing_key分发到指定的队列
rabbitmq拥有一个默认交换机 即是 空字符串(""),
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
在pika编程中,可以不用指定队列名称,系统会随机生成一个名称, 在重启都该队列丢失
只需在声明时不提供参数就可以了
result = channel.queue_declare()
队列需要绑定到交换, 才能通过交换机发送消息到该队列
channel.queue_bind(exchange='logs',
queue=result.method.queue)
代码:
生产者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
connection.close()
消费者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
6. 几个重要概念的程序实现
- 路由 routing
路由键在发送消息的时候进行指定
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
- 队列绑定 binding
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
- 交换机类型
交换机声明的时候进行指定, 一般常用direct, fanout, topic三种类型
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
- 路由键
一般路由键不等于绑定键, 但是我们通常在direct的时候可以近似的认为这两个等价的
在队列绑定的时候,通过指定routing_key 指定
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='tologs')
- 一个路由键多个队列
也就是多个队列使用相同的绑定键, 这个是合法 的, 这样就可以将消息发生到不同的队列中
例如:
channel.queue_bind(exchange="logs",queue="info",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="warn",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="debug",routing_key='tologs')
- 排他队列:
一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的
channel.queue_declare(exclusive=True)
示例代码:
生产者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
消费者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
7. 主题交换机 (topic)
发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由
.
分隔开的词语列表, 例如: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”
绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似, 携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列
它的绑定键和路由键有两个特殊应用方式, 即是支持模糊匹配:
-
*
(星号) 用来表示一个单词. -
#
(井号) 用来表示任意数量(零个或多个)单词。
例如路由键:
*.*.rabbit
,
lazy.#
lazy.pink.rabbit 会匹配
*.*.rabbit
和
lazy.#
lazy.x 匹配
lazy.#
注特殊情况:
当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当
*
(星号) 和
#
(井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
示例代码:
生产者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
消费者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()