首先建立 工作exchange和工作queue,指定工作队列的x-dead-letter-exchange到重试exchenge
var
workQueueArgs =
new
Dictionary
<
string
,
object
> {
{
"x-dead-letter-exchange"
, RETRY_EXCHANGE },
};
channel.ExchangeDeclare(WORK_EXCHANGE,
"direct"
);
channel.QueueDeclare(WORK_QUEUE,
true
,
false
,
false
, workQueueArgs);
channel.QueueBind(WORK_QUEUE, WORK_EXCHANGE,
""
,
null
);
之后建立重试exchange和重试queue
var
queueArgs =
new
Dictionary
<
string
,
object
> {
{
"x-dead-letter-exchange"
, WORK_EXCHANGE },
{
"x-message-ttl"
, RETRY_DELAY }
};
channel.ExchangeDeclare(RETRY_EXCHANGE,
"direct"
);
channel.QueueDeclare(RETRY_QUEUE,
true
,
false
,
false
, queueArgs);
channel.QueueBind(RETRY_QUEUE, RETRY_EXCHANGE,
""
,
null
);
重试队列需要2个属性,一个是 x-dead-letter-exchange,指向到工作exchange
一个是过期时间(这里等于是多少秒后重试)
监听工作队列,处理消息
QueueingBasicConsumer
consumer =
new
QueueingBasicConsumer
(channel);
channel.BasicConsume(WORK_QUEUE,
false
, consumer);
while
(
true
)
{
BasicDeliverEventArgs
e = (
BasicDeliverEventArgs
)consumer.Queue.Dequeue();
var
message =
Encoding
.UTF8.GetString(e.Body);
try
{
//throw new Exception("");
channel.BasicAck(e.DeliveryTag,
false
);
}
catch
{
channel.BasicNack(e.DeliveryTag,
false
,
false
);
}
}
处理成功调用ack,处理不成功调用nack,
调用nack后,会根据工作队列的x-dead-letter-exchange自动把消息发到重试队列
重试队列等几秒(x-message-ttl)后,就认为是自动失败了,又会根据重试队列的x-dead-letter-exchange发送回工作队列

