ESBasic 可复用的.NET类库(05) -- 工作者引

系统 1519 0
1. 缘起:

假设我们的系统在运行的过程中,源源不断的有新的任务需要处理(比如订单处理),而且这些任务的处理是相互独立的,没有前后顺序依赖性(顺序依赖性是指,必须在任务 A 处理结束后才可开始 B 任务),那么我们就可以使用多个线程来同时处理多个任务。每个处理任务的线程称为“工作者(线程)”。
我设计了 ESBasic.Threading.Engines.IWorkerEngine 工作者引擎,其目的就是使用多个线程来并行处理任务,提高系统的吞吐能力。

工作者引擎的形象示意图如下:
ESBasic 可复用的.NET类库(05) -- 工作者引擎 IWorkerEngine

2. 适用场合:

设计工作者引擎 ESBasic.Threading.Engines.IWorkerEngine 的主要目的是为了解决类似下面的问题:

(1) 充分利用多 CPU 、多核计算资源。

(2) 减少因高速设备与低速设备之间速度差而产生计算资源浪费。

(3) 对于突发的大批量的任务(比如订单系统经常在其它时段接受的订单很少,但在某高峰期会有突发性的大量的订单进来)进行缓冲处理,并最大限度地利用现有资源进行处理。

3 .设计思想与实现

IWorkerEngine 的设计思路是这样的:我们使用一个队列来存放需要处理的任务,新来的任务都会排队到这个队列中,然后有 N 个工作者线程不断地从队列中取出任务去处理,每个线程处理完当前任务后,又从队列中取出下一个任务 …… ,如此循环。

IWorkerEngine 接口的源码对应如下:

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> public interface IWorkerEngine < T >
{
/// <summary>
/// IdleSpanInMSecs当没有工作要处理时,工作者线程休息的时间间隔。默认为10ms
/// </summary>
int IdleSpanInMSecs{ get ; set ;}

/// <summary>
/// WorkerThreadCount工作者线程的数量。默认值为1。
/// </summary>
int WorkerThreadCount{ get ; set ;}

/// <summary>
/// WorkProcesser用于处理任务的处理器。
/// </summary>
IWorkProcesser < T > WorkProcesser{ set ;}

/// <summary>
/// WorkCount当前任务队列中的任务数。
/// </summary>
int WorkCount{ get ;}

/// <summary>
/// MaxWaitWorkCount历史中最大的处于等待状态的任务数量。
/// </summary>
int MaxWaitWorkCount{ get ;}

void Initialize();
void Start();
void Stop();

/// <summary>
/// AddWork添加任务。
/// </summary>
void AddWork(Twork);
}

由于任务的类型不是固定的,所以我们使用的泛型参数 T 来表示要处理任务的类型。

所有的任务的具体执行都是由 IWorkProcesser 完成的:

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> public interface IWorkProcesser < T >
{
void Process(Twork);
}


实现这个 IWorkerEngine 接口的时候要注意以下几点:

(1) AddWork 方法会在多线程的环境中被调用,所以必须保证其是线程安全的。

(2) 每个工作者线程实际上就是一个我们前面介绍的循环引擎 ICycleEngine ,只不过将其 DetectSpanInSecs 设为 0 即可,表示不间断地执行任务。 WorkerEngine 便是使用了 N AgileCycleEngine 实例来作为工作者的。这些 AgileCycleEngine 实例在 Initialize 方法中被实例化。

(3) 所有的工作者最终都是执行私有的 DoWork 方法,这个方法就是从任务队列中取出任务并且调用 IWorkProcesser 来处理任务,如果任务队列为空,则等待 IdleSpanInMSecs 秒钟后再重试。

(4) MaxWaitWorkCount 属性用于记录自从引擎运行以来最大的等待任务的数量,通过这个属性我们可以推测任务量与任务处理速度之间的差距。

(5) 通过 Start Stop 方法我们可以随时停止、启动工作者引擎,并可重复调用。

4. 使用时的注意事项

(1) 当引擎已经启动并正在运行时,如果要修改 WorkerThreadCount 的值并使其生效,则必须先调用 Stop 方法停止引擎,然后重新调用 Initialize 方法初始化引擎,再调用 Start 方法启动引擎。

(2) 关于工作者线程的个数 N 的设置的问题。这个数字不是越大越好,因为使用的线程越多,而 CPU 跟不上的话,那么消耗在线程切换上的浪费就越严重。所以,为了达到最好的性能,需要为工作者线程个数设置一个合适的值。
通常,这个值跟 CPU 的个数、 CPU 核的个数、任务的复杂度、慢速设备与快速设备之间的速度差以及它们的吞吐量有关。我们可以通过足够的测试来发现适合我们系统的 N 值。

一般情况下的推荐值为: CPU 个数 * 单个 CPU 的核数 *2 + 1

5. 扩展

1 )“一次性”的工作者引擎: BriefWorkerEngine

假设我们的系统可能会偶尔有一批任务要处理(也许永远也不会有这样的任务出现),我们希望只有当任务到来时,才使用一个工作者引擎实例来多线程处理它,处理完后,该引擎就可以释放掉。

ESBasic.Threading.Engines.BriefWorkerEngine ,精简的工作者引擎,便是为这一目的而设计的。它使用多线程处理一批任务,当这批任务处理结束后,工作者线程会被自动释放,而该引擎实例也就可以被结束了。

为了方便使用,我将 BriefWorkerEngine 设计为从构造函数注入引擎运行所需要的参数,包括任务处理器、工作者线程个数、以及要处理的任务集合。在引擎实例被构造成功的同时,内部的循环引擎已经准备好了。注意, BriefWorkerEngine 实现了 IDisposable 接口,这表明当引擎被释放时,内部所有的循环引擎都会停止运行,从而不再占有后台线程池中的线程。

我们可以这样来使用 BriefWorkerEngine

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> IWorkProcesser < MyTask > processer = ... ;
IList < MyTask > taskList = ... ;
BriefWorkerEngine < MyTask > engine = new BriefWorkerEngine < MyTask > (processer, 5 ,taskList);
engine.Start();
while ( ! engine.IsFinished())
{
System.Threading.Thread.Sleep(
100 );
}
engine.Dispose();
// 执行到这里,表示所有任务已经处理完毕,引擎实例即将被释放。

我们可以通过它的 IsFinished 方法来检测执行是否已经完成。当 IsFinished 方法返回 true 时,引擎实例就可以被销毁了。


2 )永不停止的工作者引擎

我们同样可以考虑一个类似于循环引擎的扩展的情况,假设我们的系统要求在启动时就将工作者引擎运行起来,而且在整个运行的生命周期中,都不需要停止引擎,那么我们就不想将 Start 方法、 Stop 方法暴露出来以免意外的调用 Stop 方法而导致引擎停止运行,那这个时候我们可以使用相同的技巧来做到:

<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />--> public sealed class MyWorkerEngine
{
private IWorkerEngine < MyTask > workerEngine;

public void Initialize()
{
this .workerEngine = new WorkerEngine < MyTask > ();
this .workerEngine.WorkerThreadCount = 5 ;
// this.workerEngine.WorkProcesser= ..赋值
this .workerEngine.Initialize();
this .workerEngine.Start();
}
}

public class MyTask {}

其道理与循环引擎的扩展是一样的。

注:ESBasic源码可到 http://esbasic.codeplex.com/ 下载。
ESBasic讨论:37677395
ESBasic开源前言


ESBasic 可复用的.NET类库(05) -- 工作者引擎 IWorkerEngine


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论