首先建立 工作exchange和工作queue,指定工作队列的x-dead-letter-exchange到重试exchenge
var workQueueArgs = new Dictionary < string , object > { { "x-dead-letter-exchange" , RETRY_EXCHANGE }, }; channel.ExchangeDeclare(WORK_EXCHANGE, "direct" ); channel.QueueDeclare(WORK_QUEUE, true , false , false , workQueueArgs); channel.QueueBind(WORK_QUEUE, WORK_EXCHANGE, "" , null );
之后建立重试exchange和重试queue
var queueArgs = new Dictionary < string , object > { { "x-dead-letter-exchange" , WORK_EXCHANGE }, { "x-message-ttl" , RETRY_DELAY } }; channel.ExchangeDeclare(RETRY_EXCHANGE, "direct" ); channel.QueueDeclare(RETRY_QUEUE, true , false , false , queueArgs); channel.QueueBind(RETRY_QUEUE, RETRY_EXCHANGE, "" , null );
重试队列需要2个属性,一个是 x-dead-letter-exchange,指向到工作exchange
一个是过期时间(这里等于是多少秒后重试)
监听工作队列,处理消息
QueueingBasicConsumer consumer = new QueueingBasicConsumer (channel); channel.BasicConsume(WORK_QUEUE, false , consumer); while ( true ) { BasicDeliverEventArgs e = ( BasicDeliverEventArgs )consumer.Queue.Dequeue(); var message = Encoding .UTF8.GetString(e.Body); try { //throw new Exception(""); channel.BasicAck(e.DeliveryTag, false ); } catch { channel.BasicNack(e.DeliveryTag, false , false ); } }
处理成功调用ack,处理不成功调用nack,
调用nack后,会根据工作队列的x-dead-letter-exchange自动把消息发到重试队列
重试队列等几秒(x-message-ttl)后,就认为是自动失败了,又会根据重试队列的x-dead-letter-exchange发送回工作队列