第二人生的源码分析(四十二)实现消息处理的线程

系统 1629 0
第二人生里使用线程循环来处理消息,这样的结构就比较清晰。比如有一个写文件的请求,就可以把这个请求放到线程队列里,然后唤醒线程,让线程处理这个请求。那么在第二人生里是怎么样构造消息循环呢?又是怎么样执行其它线程发过来的请求呢?带着这两个问题来分析下面这几段代码。
#001 void LLQueuedThread::run()
#002 {
 
下面实现消息循环。
#003       while (1)
#004       {
#005              // this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
 
下面检查是否暂停线程执行。
#006              checkPause();
#007             
 
检查线程是否要结束循环。
#008              if(isQuitting())
#009                     break;
#010 
#011              //llinfos << "QUEUED THREAD RUNNING, queue size = " << mRequestQueue.size() << llendl;
#012 
 
标志线程已经非空闲状态。
#013              mIdleThread = FALSE;
#014             
 
调用processNextRequest函数来处理本线程消息队列里的消息。
#015              int res = processNextRequest();
#016              if (res == 0)
#017              {
#018                     mIdleThread = TRUE;
#019              }
#020             
 
处理线程消息出错,退出线程循环。
#021              if (res < 0) // finished working and want to exit
#022              {
#023                     break;
#024              }
#025 
#026              //LLThread::yield(); // thread should yield after each request          
#027       }
#028 
#029       llinfos << "QUEUED THREAD " << mName << " EXITING." << llendl;
#030 }
 
由线程接口类LLThread就知道,线程里主要处理函数是run(),而在LLThread继承类LLQueuedThread里也就是实现了这个函数的功能,主要就是调用函数processNextRequest来处理消息队列里的消息。下面就来分析函数processNextRequest的代码,如下:
 
#001 
#002 S32 LLQueuedThread::processNextRequest()
#003 {
#004       QueuedRequest *req;
#005       // Get next request from pool
 
锁住线程消息队列。
#006       lockData();
 
循环地找到可用的消息。
#007       while(1)
#008       {
#009              req = NULL;
 
线程消息队列为空,退出处理消息。
#010              if (mRequestQueue.empty())
#011              {
#012                     break;
#013              }
 
获取第一个消息请求。
#014              req = *mRequestQueue.begin();
 
删除第一个消息请求。
#015              mRequestQueue.erase(mRequestQueue.begin());
 
判断是否丢掉这个消息请求。
#016              if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
#017              {
#018                     req->setStatus(STATUS_ABORTED);
#019                     req->finishRequest(false);
#020                     if (req->getFlags() & FLAG_AUTO_COMPLETE)
#021                     {
#022                            mRequestHash.erase(req);
#023                            req->deleteRequest();
#024 //                       check();
#025                     }
#026                     continue;
#027              }
#028              llassert_always(req->getStatus() == STATUS_QUEUED);
#029              break;
#030       }
 
设置这个消息正在处理过程中。
#031       if (req)
#032       {
#033              req->setStatus(STATUS_INPROGRESS);
#034       }
 
解锁消息队列。
#035       unlockData();
#036 
#037       // This is the only place we will call req->setStatus() after
#038       // it has initially been seet to STATUS_QUEUED, so it is
#039       // safe to access req.
 
下面开始处理获取到的消息。
#040       if (req)
#041       {
 
开始调用这个消息的特别处理函数。
#042              // process request
#043              bool complete = req->processRequest();
#044 
 
判断这个请求是否完成。
#045              if (complete)
#046              {
#047                     lockData();
#048                     req->setStatus(STATUS_COMPLETE);
#049                     req->finishRequest(true);
#050                     if (req->getFlags() & FLAG_AUTO_COMPLETE)
#051                     {
#052                            mRequestHash.erase(req);
#053                            req->deleteRequest();
#054 //                       check();
#055                     }
#056                     unlockData();
#057              }
#058              else
#059              {
#060                     lockData();
#061                     req->setStatus(STATUS_QUEUED);
#062                     mRequestQueue.insert(req);
#063                     U32 priority = req->getPriority();
#064                     unlockData();
#065                     if (priority < PRIORITY_NORMAL)
#066                     {
#067                            ms_sleep(1); // sleep the thread a little
#068                     }
#069              }
#070       }
#071 
 
查看是否需要退出线程。
#072       S32 res;
#073       S32 pending = getPending();
#074       if (pending == 0)
#075       {
#076              if (isQuitting())
#077              {
#078                     res = -1; // exit thread
#079              }
#080              else
#081              {
#082                     res = 0;
#083              }
#084       }
#085       else
#086       {
#087              res = pending;
#088       }
#089       return res;
#090 }
#091 
 
通过在processNextRequest里调用更加具体的消息处理函数processRequest来实现各个消息处理,由于processRequest也是纯虚函数,因此通过实现这个函数不同的形式,就可以实现不同的内容处理,这就是C++的多态特性。不过,要注意的是这个函数可能由多线程访问,需要进行同步的操作。
 
 

第二人生的源码分析(四十二)实现消息处理的线程类


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论