C#实现异步消息队列

系统 3541 0
原文: C#实现异步消息队列

拿到新书《.net框架设计》,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使用我们的知识,从这本书中提炼了一篇,正好符合我前几篇的“数据驱动框架”设计的问题;

消息队列

消息队列 英语 Message queue)是一种 进程间通信 或同一进程的不同 线程 间的通信方式, 软件 贮列 用来处理一系列的 输入 ,通常是来自使用者。消息队列提供了 异步 通信协议 ,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在 队列 中,直到接收者取回它。

简单的说队列就是贮存了我们需要处理的Command但是并不是及时的拿到其处理结果;

实现

实际上,消息队列常常保存在 链表 结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括 JBoss Messaging JORAM Apache ActiveMQ Sun Open Message Queue Apache Qpid 和HTTPSQS。

优点,缺点

消息队列本身是 异步 的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如 WWW 中使用的 HTTP 协议是 同步 的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须 轮询 消息队列,才能收到最近的消息。

信号 相比,消息队列能够传递更多的信息。与 管道 相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。

读取队列消息

主要有两种(1)服务端的推;(2)客户端的拉;

拉:主要是客户端定时轮询拿走消息处理;

推:通过事件订阅方式主动通知订阅者进行处理;

消息的贮存

简单的是通过内存链表实现贮存;也可以借助DB,比如Redis;还可以持久到本地文件中;

如何保证异步处理的一致性

尽管队列主要目的是实现消息贮存,同时将调用与实现异步化。但是如果想达到处理消息一致性,好的方式是区别业务处理顺序,比如操作主从DB,主负责写,从负责读,我们没有机会在写之后立马从读数据库拿到你想要的结果;同时我们需要借助中间状态,当多个中间状态同时符合调用结果才到到业务时间被处理,否则将“异常消息”持久化,待下次操作;

上代码

C#实现异步消息队列

建立消息对立核心队列

      {

    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);



    public class MessageQueue:Queue<BaseMessage>

    {

        public static MessageQueue GlobalQueue = new MessageQueue();



        private Timer timer = new Timer();

        public MessageQueue() {

            this.timer.Interval = 5000;

            this.timer.Elapsed += Notify;

            this.timer.Enabled = true;

        }

        private void Notify(object sender, ElapsedEventArgs e) {

            lock (this) {

                if (this.Count > 0) {

                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());

                    var message = this.Dequeue();

                    this.messageNotifyEvent(message);

                }

            }

        }



        private MessageQueueEventNotifyHandler messageNotifyEvent;

        public event MessageQueueEventNotifyHandler MessageNotifyEvent {

            add {

                this.messageNotifyEvent += value;

            }



            remove {

                if (this.messageNotifyEvent != null) {

                    this.messageNotifyEvent -= value;

                }

            }

        }

    }

}


    

事件处理

      public const string OrderCodePrefix = "P";

        public void Submit(Message.BaseMessage message)

        {

            Order order = message.Body as Order;



            if (order.OrderCode.StartsWith(OrderCodePrefix))

            {

                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);

            }

            else {

                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);

            }

        }


    

可依据具体业务进行个性化处理;

通过Proxy向队列追加消息

      public class OrderServiceProxy:IOrderService

    {

        public void Submit(Message.BaseMessage message)

        {

            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);

        }

    }


    

客户端调用

      OrderService orderService = new OrderService();

            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;



            var orders = new List<Order>() { 

                new Order(){OrderCode="P001"},

                new Order(){OrderCode="P002"},

                new Order(){OrderCode="B003"}

            };



            OrderServiceProxy proxy = new OrderServiceProxy();

            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));



            Console.ReadLine();


    

这样就满足了事件的绑定与触发个性化处理,同时达到了消息异步化的目的,希望更细致的拓展用到后期的项目中。

C#实现异步消息队列


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论