多核中的动态任务调度

系统 1466 0

注:本文主要内容摘自笔者所著的《多核计算与程序设计》一书,略有修改,后续还会继续发布系列文章,如有需要,可以考虑将一下地址加入到您的浏览器收藏夹中:http://software.intel.com/zh-cn/blogs/category/multicore/。

1、基本思想

动态任务调度可以将一系列分解好的任务进行并行运行,并取得一定程度的负载均衡。动态任务调度的最大作用就是用它来做并行计算。动态任务调度有多种方法,一般可以使用分布式队列【1】来实现,下面讲解一种最简单的嵌套型任务调度的实现方法。

对于嵌套型任务,通常都有一个或多个开始任务,其他任务的产生都源于这些开始任务。

调度的方法为,每个线程由一个本地队列,另外由一个所有线程共享的队列。当每个线程产生n个新任务后,先检查本地队列是否为空,如果为空,则放入一个任务到本地队列中。然后检查共享队列是否满,如果未满则将其他任务放入共享队列中,否则放入到本地队列中。

上面这个调度方法实际上和CDistributeQueue【1】中的进队操作方法是一样的,因此可以使用CDistributeQueue来实现嵌套型动态任务的调度。

一般来说,嵌套型动态任务调度会遇到以下一些问题

  • 1、 初始时可能只有一个任务运行,此种情况下只能有一个线程运行,其他线程必须挂起。当动态任务产生后,需要唤醒挂起的线程进行执行。
  • 2、 由于每个任务中会产生新的任务,因此每个任务既是消费者,同时也是生产者。在操作本地队列时,比非嵌套型任务调度更加方便,如何将本地队列的操作最大化是首要考虑的问题。

根据上面的思想,下面设计一个CNestTaskScheduler类来实现对嵌套型动态任务的调度。

2、CNestTaskScheduler类的设计和实现

CNestTaskScheduler类的定义如下:

class CNestTaskScheduler {

private:

    CThreadPool     m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);

    CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;

    THREADFUNC      m_StartFunc;  //为线程池使用的线程入口函数指针

    LONG  volatile  m_lTaskId;    //Task Id,用于判断是否唤醒对应的线程

 

public:

    CNestTaskScheduler();

    virtual ~CNestTaskScheduler();

 

    //下面两个函数为调度器本身直接使用

    void SetStartFunc(THREADFUNC StartFunc);

int GetTask(TASK &Task);

    CThreadPool & GetThreadPool();

    LONG AtomicIncrementTaskId();

 

    //下面三个函数为调度器的使用者使用

    void SpawnLocalTask(TASK &Task);

    void SpawnTask(TASK &Task);

    void BeginRootThread(TASK &Task);

};

类中的主要三个接口为

    void SpawnLocalTask(TASK &Task);

    void SpawnTask(TASK &Task);

void BeginRootThread(TASK &Task);

 

SpawnLocalTask()的主要作用是将动态生成的任务放入线程的本地队列中;SpawnTask()的作用是将动态产生的任务放入分布式队列中,当然任务有可能被放入本地队列,也有可能被放入共享队列中;BeginRootThread()的作用是启动初始的任务。

1) BeginRootTask() 的处理流程

BeginRootTask() 的处理流程较简单,它先创建线程池,接着将一个原始任务放入到第 0 个线程的本地队列中,然后执行第 0 个线程,最后等待所有线程执行完。处理流程如下图所示:

 

 

  1 嵌套型任务 BeginRootTask() 处理流程图

BeginRootTask() 的代码如下:

/**    嵌套任务调度的开始根线程函数

 

         @param   TASK &Task - 要执行的最初任务

         @return   void -  

*/

void CNestTaskScheduler::BeginRootThread(TASK &Task)

{

    m_lTaskId = 0;

 

    m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);

    m_DQueue . PushToLocalQueue ( Task , 0);

     m_ThreadPool . ExecThread ( 0 );  

    m_ThreadPool . WaitAllThread ();

}

 

BeginRootTask() 执行后,只有第 0 个线程被执行了,线程池中的其他线程都是处于挂起状态。实际上在第 0 个线程的处理过程中,它会继续调用 SpawnTask() SpawnTask() 中需要判断是否有线程被挂起,如果有则需要唤醒挂起的线程,下面就来看看 SpawnTask() 的详细处理过程。

2) SpawnTask() 的处理流程

SpawnTask() 的功能主要是将任务放入到分布式队列中。由于在 BeginRootThread() 中只执行了第 0 个线程,其他线程都处于挂起状态,因此这个函数中还需要唤醒其他被挂起的线程,整个处理流程如下图所示:

 

  2 嵌套型任务 SpawnLocalTask() 处理流程图

根据上面的处理流程, SpawnLocalTask() 的代码实现如下:

/**    嵌套任务调度的生成任务函数

    生成的任务被放入到分布式队列中

 

         @param   TASK &Task - 待执行的任务

         @return   void -  

*/

void CNestTaskScheduler::SpawnTask(TASK &Task)

{

    if ( m_lTaskId < m_ThreadPool . GetThreadCount () )

    {

        // 依次唤醒各个挂起的线程

        LONG Id = AtomicIncrement (& m_lTaskId );

        if ( Id < m_ThreadPool . GetThreadCount () )

        {

            // 下面之所以可以对其他线程的本地队列进行无同步的操作,是因为

            // 访问这些队列的线程在进队操作之后才开始运行

            m_DQueue . PushToLocalQueue ( Task , Id );

            m_ThreadPool . ExecThread ( Id );

        }

        else

        {

            m_DQueue . EnQueue ( Task );

        }

    }

    else

    {

        // 先判断偷取队列是否满,如果未满则放入偷取队列中

        // 如果满了则放入本地队列中

        m_DQueue . EnQueue ( Task );

    }

};

在处理唤醒其他线程的过程中,采用了原子操作来实现,当变量 m_lTaskId 的值小于给定线程数量时,表明还有线程被挂起,因此将任务放入对应被挂起线程的本地队列中,然后再唤醒并执行对应被挂起的线程。

当任务被放入分布式队列后,线程池中的各个线程是如何处理分布式队列中的任务的呢?下面就来看看线程池的入口函数的处理过程。

 

 

 

 

3、 CNestTaskScheduler 使用方法

注:完整的 CNestTaskScheduler 的源代码,请到 CAPI 开源项目进行下载,下载地址为: http://gforge.osdn.net.cn/projects/capi

 

下面以一个区间递归分拆为例讲解如何使用 CNestTaskScheduler 。首先需要写一个任务处理入口函数,代码如下:

struct RANGE {

    int begin ;

    int end ;

};

 

CNestTaskScheduler   * pTaskSched = NULL ;

 

/**    任务处理入口函数

         将一个大的区间均分成两个更小的区间

 

         @param   void *args - 参数,实际为 RANGE 类型       

         @return   unsigned int WINAPI - 总是返回 CAPI_SUCCESS    

*/

unsigned int WINAPI RootTask (void * args )

{

    RANGE   * p = ( RANGE *) args ;

    if ( p != NULL )

    {

          printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );

        if ( p -> end - p -> begin < 128 )

        {

            // 当区间大小小于时,不再进行分拆

            delete p ;

            return 0;

        }

        int mid = ( p -> begin + p -> end + 1) / 2;

        RANGE * range1 , * range2 ;

 

        range1 = new RANGE ;

        range2 = new RANGE ;

 

         range1 -> begin = p -> begin ;

        range1 -> end = mid - 1;

        range2 -> begin = mid ;

        range2 -> end = p -> end ;

 

        TASK t1 , t2 ;

        t1 . pArg = range1 ;

        t2 . pArg = range2 ;

        t1 . func = RootTask ;

        t2 . func = RootTask ;

 

         pTaskSched -> SpawnLocalTask ( t1 );

        pTaskSched -> SpawnTask ( t2 );

 

        delete p ;

    }

    return 1;

}

 

任务处理函数 RootTask() 中,先将一个大区间拆分成两个更小的区间,然后将每个区间看成一个新的任务,得到两个新的任务 t1 t2 ,然后调用 SpawnLocalTask() 将任务 t1 放进任务调度器的分布式队列的本地队列中。如果拆分后的区间小于给定的大小,就不再分拆。

下面的代码演示了如何调用 CNestTaskScheduler 类来对一个 0 1023 的区间进行并行拆分。

void main ( void )

{

 

    TASK     task ;

    RANGE    * pRange = new RANGE ;

 

    pRange -> begin = 0;

    pRange -> end = 1023;

 

    task.func = RootTask ;

    task . pArg = pRange ;

 

    pTaskSched = new CNestTaskScheduler ;

   

    pTaskSched -> BeginRootThread ( task );

 

    delete pTaskSched ;

 

}

上面程序执行后,打印的结果如下,从打印结果可以看出整个程序执行中进行的分拆过程。

Range: 0 - 1023

Range: 0 - 511

Range: 512 - 1023

Range: 0 - 255

Range: 512 - 767

Range: 0 - 127

Range: 512 - 639

Range: 256 - 511

Range: 768 - 1023

Range: 256 - 383

Range: 768 - 895

Range: 128 - 255

Range: 640 - 767

Range: 384 - 511

Range: 896 – 1023

 

当然,我们需要用任务调度来实现并行计算,下面就来讲一个具体的用任务调度进行并行快速排序的实例。

 

 

3) 线程池入口函数处理流程

线程池入口函数的处理在一个循环中进行,每次循环中,从分布式队列中获取任务,然后执行任务的启动入口函数,如果从分布式队列中获取任务失败,则认为所有任务被处理完,此时需要判断是否还有挂起的线程,有则需要将挂起线程执行起来让其退出,然后退出循环并结束当前线程。

 

  3   线程池入口函数处理流程图

 

/**    嵌套任务调度的获取任务函数

 

         @param   TASK &Task - 接收从分布式队列中获取的任务

         @return   int - 成功返回 CAPI_SUCCESS, 失败返回 CAPI_FAILED.       

*/

int CNestTaskScheduler::GetTask(TASK &Task)

{

    // 先从本地队列获取任务

    // 本地获取任务失败后从偷取队列获取任务

    return m_DQueue . DeQueue ( Task );

};

 

/**    嵌套任务调度的线程池入口函数

 

         @param   void *pArgs - CNestTaskScheduler 类型的参数      

         @return   unsigned int WINAPI - 返回    

*/

unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)

{

    CNestTaskScheduler   * pSched = ( CNestTaskScheduler *) pArgs ;

 

    TASK     Task ;

    int      nRet ;

 

    for ( ;; )

    {

        nRet = pSched -> GetTask ( Task );

        if ( nRet == CAPI_FAILED )

        {

            CThreadPool &ThreadPool = pSched->GetThreadPool();

           

            // 唤醒一个挂起的线程 , 防止任务数量小于 CPU 核数时,

            // 仍然有任务处于挂起状态 , 从而导致 WaitAllThread() 处于死等状态

            // 这个唤醒过程是一个串行的过程,被唤醒的任务会继续唤醒一个挂起线程

            LONG Id = pSched->AtomicIncrementTaskId();

            if ( Id < ThreadPool.GetThreadCount() )

            {

                ThreadPool.ExecThread(Id);

            }

             break;

        }

        (*( Task . func ))( Task . pArg );

    }

    return 0;

}

 

在上面的线程入口处理函数 NestTaskScheduler_StartFunc() 中,当获取任务失败时,表明所有任务都处理完毕。此时需要考虑一种特殊情况,即任务总数量小于线程数量的情况。由于线程池 CThreadPool 采用预创建线程的方法,所有预创建的线程初始处于挂起状态,获取任务失败后,可能还有若干线程没有被分配到任务,仍然处于挂起状态。必须将这些挂起的任务恢复执行让其退出,否则 WaitAllThread() 函数将处于死等状态。

NestTaskScheduler_StartFunc() 在处理唤醒挂起的线程的方法是逐个唤醒的方法,当有某个执行线程获取任务失败后,它先唤醒一个被挂起的线程,然后这个被唤醒的线程执行后,它也会执行 NestTaskScheduler_StartFunc() 函数,当然它获取任务会失败,接着它也会唤醒一个被挂起的线程,这样一直下去,所有被挂起线程都会被唤醒并被退出。

多核中的动态任务调度


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论