注:本文主要内容摘自笔者所著的《多核计算与程序设计》一书,略有修改,后续还会继续发布系列文章,如有需要,可以考虑将一下地址加入到您的浏览器收藏夹中:http://software.intel.com/zh-cn/blogs/category/multicore/。
1、基本思想
动态任务调度可以将一系列分解好的任务进行并行运行,并取得一定程度的负载均衡。动态任务调度的最大作用就是用它来做并行计算。动态任务调度有多种方法,一般可以使用分布式队列【1】来实现,下面讲解一种最简单的嵌套型任务调度的实现方法。
对于嵌套型任务,通常都有一个或多个开始任务,其他任务的产生都源于这些开始任务。
调度的方法为,每个线程由一个本地队列,另外由一个所有线程共享的队列。当每个线程产生n个新任务后,先检查本地队列是否为空,如果为空,则放入一个任务到本地队列中。然后检查共享队列是否满,如果未满则将其他任务放入共享队列中,否则放入到本地队列中。
上面这个调度方法实际上和CDistributeQueue【1】中的进队操作方法是一样的,因此可以使用CDistributeQueue来实现嵌套型动态任务的调度。
一般来说,嵌套型动态任务调度会遇到以下一些问题 :
- 1、 初始时可能只有一个任务运行,此种情况下只能有一个线程运行,其他线程必须挂起。当动态任务产生后,需要唤醒挂起的线程进行执行。
- 2、 由于每个任务中会产生新的任务,因此每个任务既是消费者,同时也是生产者。在操作本地队列时,比非嵌套型任务调度更加方便,如何将本地队列的操作最大化是首要考虑的问题。
根据上面的思想,下面设计一个CNestTaskScheduler类来实现对嵌套型动态任务的调度。
2、CNestTaskScheduler类的设计和实现
CNestTaskScheduler类的定义如下:
class CNestTaskScheduler {
private:
CThreadPool m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);
CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
THREADFUNC m_StartFunc; //为线程池使用的线程入口函数指针
LONG volatile m_lTaskId; //Task Id,用于判断是否唤醒对应的线程
public:
CNestTaskScheduler();
virtual ~CNestTaskScheduler();
//下面两个函数为调度器本身直接使用
void SetStartFunc(THREADFUNC StartFunc);
int GetTask(TASK &Task);
CThreadPool & GetThreadPool();
LONG AtomicIncrementTaskId();
//下面三个函数为调度器的使用者使用
void SpawnLocalTask(TASK &Task);
void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
};
类中的主要三个接口为
void SpawnLocalTask(TASK &Task);
void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
SpawnLocalTask()的主要作用是将动态生成的任务放入线程的本地队列中;SpawnTask()的作用是将动态产生的任务放入分布式队列中,当然任务有可能被放入本地队列,也有可能被放入共享队列中;BeginRootThread()的作用是启动初始的任务。
1) BeginRootTask() 的处理流程
BeginRootTask() 的处理流程较简单,它先创建线程池,接着将一个原始任务放入到第 0 个线程的本地队列中,然后执行第 0 个线程,最后等待所有线程执行完。处理流程如下图所示:
图 1 嵌套型任务 BeginRootTask() 处理流程图
BeginRootTask() 的代码如下:
/** 嵌套任务调度的开始根线程函数
@param TASK &Task - 要执行的最初任务
@return void - 无
*/
void CNestTaskScheduler::BeginRootThread(TASK &Task)
{
m_lTaskId = 0;
m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);
m_DQueue . PushToLocalQueue ( Task , 0);
m_ThreadPool . ExecThread ( 0 );
m_ThreadPool . WaitAllThread ();
}
BeginRootTask() 执行后,只有第 0 个线程被执行了,线程池中的其他线程都是处于挂起状态。实际上在第 0 个线程的处理过程中,它会继续调用 SpawnTask() , SpawnTask() 中需要判断是否有线程被挂起,如果有则需要唤醒挂起的线程,下面就来看看 SpawnTask() 的详细处理过程。
2) SpawnTask() 的处理流程
SpawnTask() 的功能主要是将任务放入到分布式队列中。由于在 BeginRootThread() 中只执行了第 0 个线程,其他线程都处于挂起状态,因此这个函数中还需要唤醒其他被挂起的线程,整个处理流程如下图所示:
图 2 嵌套型任务 SpawnLocalTask() 处理流程图
根据上面的处理流程, SpawnLocalTask() 的代码实现如下:
/** 嵌套任务调度的生成任务函数
生成的任务被放入到分布式队列中
@param TASK &Task - 待执行的任务
@return void - 无
*/
void CNestTaskScheduler::SpawnTask(TASK &Task)
{
if ( m_lTaskId < m_ThreadPool . GetThreadCount () )
{
// 依次唤醒各个挂起的线程
LONG Id = AtomicIncrement (& m_lTaskId );
if ( Id < m_ThreadPool . GetThreadCount () )
{
// 下面之所以可以对其他线程的本地队列进行无同步的操作,是因为
// 访问这些队列的线程在进队操作之后才开始运行
m_DQueue . PushToLocalQueue ( Task , Id );
m_ThreadPool . ExecThread ( Id );
}
else
{
m_DQueue . EnQueue ( Task );
}
}
else
{
// 先判断偷取队列是否满,如果未满则放入偷取队列中
// 如果满了则放入本地队列中
m_DQueue . EnQueue ( Task );
}
};
在处理唤醒其他线程的过程中,采用了原子操作来实现,当变量 m_lTaskId 的值小于给定线程数量时,表明还有线程被挂起,因此将任务放入对应被挂起线程的本地队列中,然后再唤醒并执行对应被挂起的线程。
当任务被放入分布式队列后,线程池中的各个线程是如何处理分布式队列中的任务的呢?下面就来看看线程池的入口函数的处理过程。
3、 CNestTaskScheduler 使用方法
注:完整的 CNestTaskScheduler 的源代码,请到 CAPI 开源项目进行下载,下载地址为: http://gforge.osdn.net.cn/projects/capi
下面以一个区间递归分拆为例讲解如何使用 CNestTaskScheduler 。首先需要写一个任务处理入口函数,代码如下:
struct RANGE {
int begin ;
int end ;
};
CNestTaskScheduler * pTaskSched = NULL ;
/** 任务处理入口函数
将一个大的区间均分成两个更小的区间
@param void *args - 参数,实际为 RANGE 类型
@return unsigned int WINAPI - 总是返回 CAPI_SUCCESS
*/
unsigned int WINAPI RootTask (void * args )
{
RANGE * p = ( RANGE *) args ;
if ( p != NULL )
{
printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );
if ( p -> end - p -> begin < 128 )
{
// 当区间大小小于时,不再进行分拆
delete p ;
return 0;
}
int mid = ( p -> begin + p -> end + 1) / 2;
RANGE * range1 , * range2 ;
range1 = new RANGE ;
range2 = new RANGE ;
range1 -> begin = p -> begin ;
range1 -> end = mid - 1;
range2 -> begin = mid ;
range2 -> end = p -> end ;
TASK t1 , t2 ;
t1 . pArg = range1 ;
t2 . pArg = range2 ;
t1 . func = RootTask ;
t2 . func = RootTask ;
pTaskSched -> SpawnLocalTask ( t1 );
pTaskSched -> SpawnTask ( t2 );
delete p ;
}
return 1;
}
任务处理函数 RootTask() 中,先将一个大区间拆分成两个更小的区间,然后将每个区间看成一个新的任务,得到两个新的任务 t1 、 t2 ,然后调用 SpawnLocalTask() 将任务 t1 放进任务调度器的分布式队列的本地队列中。如果拆分后的区间小于给定的大小,就不再分拆。
下面的代码演示了如何调用 CNestTaskScheduler 类来对一个 0 ~ 1023 的区间进行并行拆分。
void main ( void )
{
TASK task ;
RANGE * pRange = new RANGE ;
pRange -> begin = 0;
pRange -> end = 1023;
task.func = RootTask ;
task . pArg = pRange ;
pTaskSched = new CNestTaskScheduler ;
pTaskSched -> BeginRootThread ( task );
delete pTaskSched ;
}
上面程序执行后,打印的结果如下,从打印结果可以看出整个程序执行中进行的分拆过程。
Range: 0 - 1023
Range: 0 - 511
Range: 512 - 1023
Range: 0 - 255
Range: 512 - 767
Range: 0 - 127
Range: 512 - 639
Range: 256 - 511
Range: 768 - 1023
Range: 256 - 383
Range: 768 - 895
Range: 128 - 255
Range: 640 - 767
Range: 384 - 511
Range: 896 – 1023
当然,我们需要用任务调度来实现并行计算,下面就来讲一个具体的用任务调度进行并行快速排序的实例。
3) 线程池入口函数处理流程
线程池入口函数的处理在一个循环中进行,每次循环中,从分布式队列中获取任务,然后执行任务的启动入口函数,如果从分布式队列中获取任务失败,则认为所有任务被处理完,此时需要判断是否还有挂起的线程,有则需要将挂起线程执行起来让其退出,然后退出循环并结束当前线程。
图 3 线程池入口函数处理流程图
/** 嵌套任务调度的获取任务函数
@param TASK &Task - 接收从分布式队列中获取的任务
@return int - 成功返回 CAPI_SUCCESS, 失败返回 CAPI_FAILED.
*/
int CNestTaskScheduler::GetTask(TASK &Task)
{
// 先从本地队列获取任务
// 本地获取任务失败后从偷取队列获取任务
return m_DQueue . DeQueue ( Task );
};
/** 嵌套任务调度的线程池入口函数
@param void *pArgs - CNestTaskScheduler 类型的参数
@return unsigned int WINAPI - 返回
*/
unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)
{
CNestTaskScheduler * pSched = ( CNestTaskScheduler *) pArgs ;
TASK Task ;
int nRet ;
for ( ;; )
{
nRet = pSched -> GetTask ( Task );
if ( nRet == CAPI_FAILED )
{
CThreadPool &ThreadPool = pSched->GetThreadPool();
// 唤醒一个挂起的线程 , 防止任务数量小于 CPU 核数时,
// 仍然有任务处于挂起状态 , 从而导致 WaitAllThread() 处于死等状态
// 这个唤醒过程是一个串行的过程,被唤醒的任务会继续唤醒一个挂起线程
LONG Id = pSched->AtomicIncrementTaskId();
if ( Id < ThreadPool.GetThreadCount() )
{
ThreadPool.ExecThread(Id);
}
break;
}
(*( Task . func ))( Task . pArg );
}
return 0;
}
在上面的线程入口处理函数 NestTaskScheduler_StartFunc() 中,当获取任务失败时,表明所有任务都处理完毕。此时需要考虑一种特殊情况,即任务总数量小于线程数量的情况。由于线程池 CThreadPool 采用预创建线程的方法,所有预创建的线程初始处于挂起状态,获取任务失败后,可能还有若干线程没有被分配到任务,仍然处于挂起状态。必须将这些挂起的任务恢复执行让其退出,否则 WaitAllThread() 函数将处于死等状态。
NestTaskScheduler_StartFunc() 在处理唤醒挂起的线程的方法是逐个唤醒的方法,当有某个执行线程获取任务失败后,它先唤醒一个被挂起的线程,然后这个被唤醒的线程执行后,它也会执行 NestTaskScheduler_StartFunc() 函数,当然它获取任务会失败,接着它也会唤醒一个被挂起的线程,这样一直下去,所有被挂起线程都会被唤醒并被退出。