RabbitMQ的工作队列和路由

系统 1771 0

RabbitMQ的工作队列和路由

工作队列:Working Queue
 
工作队列这个概念与简单的发送/接收消息的区别就是:接收方接收到消息后,可能需要花费更长的时间来处理消息,这个过程就叫一个Work/Task。
 
几个概念
分配:多个接收端接收同一个Queue时,如何分配?
消息确认:Server端如何确定接收方的Work已经对消息进行了完整的处理?
消息持久化:发送方、服务端Queue如何对未处理的消息进行磁盘持久化?
 
Round-robin分配
多个接收端接收同一个Queue时,采用了Round-robin分配算法,即轮叫调度——依次分配给各个接收方。
 
消息确认
默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。
 
对于耗时的work,可以先关闭自动消息确认,在work完成后,再手动发回确认。
channel.basicConsume("hello",false/*关闭自动消息确认*/,consumer);
// ...work完成后
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 
持久化
 
1. Server端的Queue持久化
注意的是,如果已经声明了同名非持久化的Queue,则再次声明无效。
发送方和接收方都需要指定该参数。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null); 
 
2. Message持久化
channel.basicPublish("", "task_queue", MessageProperties. PERSISTENT_TEXT_PLAIN ,message.getBytes());
 
负载分配
 
为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-robin。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
意思为,最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。
 

固定关键词路由:Routing
 
使用类型为direct的exchange,发送特定关键词( RoutingKey )的消息给订阅该关键词的Queue。
 
场景示例:消息发送方发送了类型为[error][info]的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印的接收两者。
 
RabbitMQ的工作队列和路由
(上图采用了不同颜色来作为routingKey)
 
发送方
 
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "direct"
            
              /*
            
            
              exchange类型为direct
            
            
              */
            
            
              );

 

channel.basicPublish(EXCHANGE_NAME, 
            
            "info"
            
              /*
            
            
              关键词=info
            
            
              */
            
            , 
            
              null
            
            
              , message.getBytes());

channel.close();

connection.close();
            
          
 
接收方
 
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "direct"
            
              /*
            
            
              exchange类型为direct
            
            
              */
            
            
              );


            
            
              //
            
            
               创建匿名Queue
            
            

String queueName =
            
               channel.queueDeclare().getQueue();


            
            
              //
            
            
               订阅某个关键词,绑定到匿名Queue中
            
            

channel.queueBind(quueName,EXCHANGE_NAME,"error"
            
              );

channel.queueBind(quueName,EXCHANGE_NAME,
            
            "info"
            
              );

 

QueueingConsumer consumer 
            
            = 
            
              new
            
            
               QueueingConsumer(channel);

channel.basicConsume(queueName, 
            
            
              true
            
            
              , consumer);

 

QueueingConsumer.Delivery delivery 
            
            = consumer.nextDelivery(); 
            
              //
            
            
               Blocking...
            
            

String message = 
            
              new
            
            
               String(delivery.getBody());

String routingKey 
            
            = delivery.getEnvelope().getRoutingKey(); 
            
              //
            
            
               可获取路由关键词
            
          

 


关键词模式路由:Topics
 
这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅 模糊关键词
 
关键词必须是一组word,由点号分割。例如"xxx.yyy.zzz",限定255bytes。
* 表示一个word;
# 表示0个或者多个word;
 
RabbitMQ的工作队列和路由
 
发送方
 
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "topic"
            
              /*
            
            
              exchange类型
            
            
              */
            
            
              );

 

channel.basicPublish(EXCHANGE_NAME, 
            
            "xxx.yyy"
            
              /*
            
            
              关键词routingKey
            
            
              */
            
            , 
            
              null
            
            
              , message.getBytes());

channel.close();

connection.close();
            
          

 

接收方
 
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "topic"
            
              /*
            
            
              exchange类型
            
            
              */
            
            
              );


            
            
              //
            
            
               创建匿名Queue
            
            

String queueName =
            
               channel.queueDeclare().getQueue();


            
            
              //
            
            
               订阅某个关键词,绑定到匿名Queue中
            
            

channel.queueBind(quueName,EXCHANGE_NAME,"*.yyy"
            
              );

 

QueueingConsumer consumer 
            
            = 
            
              new
            
            
               QueueingConsumer(channel);

channel.basicConsume(queueName, 
            
            
              true
            
            
              , consumer);

 

QueueingConsumer.Delivery delivery 
            
            = consumer.nextDelivery(); 
            
              //
            
            
               Blocking...
            
            

String message = 
            
              new
            
            
               String(delivery.getBody());

String routingKey 
            
            = delivery.getEnvelope().getRoutingKey(); 
            
              //
            
            
               可获取路由关键词
            
          

 


Refs
 
 
 

RabbitMQ的工作队列和路由


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论