6.跑步者--并行编程框架 ForkJoin

系统 1964 0

本文如果您已经了解一般并行编程知识。了解 Java concurrent 部分如 ExecutorService 等相关内容。

虽说是 Java ForkJoin 并行框架。但不要太在意 Java ,当中的思想在其他语言环境也是相同适用的。由于并发编程在本质上是一样的。就好像怎样找到优秀的 Ruby 程序猿?事实上要找的仅仅是一个优秀的程序猿。

当然,假设语言层面直接支持相关的语义会更好。

 

引言

Java  语言从一開始就支持线程和并发性语义。

Java5 添加的并发工具又攻克了一般应用程序的并发需求。 Java6 Java7 又进一步补充了一些内容。原来的工具主要是粗粒度的并发。

比方每一个 web 请求由一个工作线程处理,在线程池分配任务。

而Java 7 中新引入的 ForkJoin 能够处理更细粒度的并行计算。

 

早期的时候都是单核 cpu 环境。假设不是多核环境下。线程 / 进程并非 真正 的并行运行,主要用来表示异步运行效果。

单核 cpu 上,假如每一个任务全然是 cpu 密集的(没有等待),那么这样的伪并发并不会使计算变快。

仅仅有在真正的多核环境才干起到加速作用,而如今多核已经普及。甚至已经到了手机上!


介绍

ForkJoin 是适用于多核环境的轻量级并行框架。目标是在多核系统下,通过并行运算。充分利用多处理器。提高效率与加速执行。


ForkJoin 编程范式:将问题递归地分解为较小的子问题,并行处理这些子问题,然后合并结果,如:

if (my portion of the work is small enough)   

     do the work directly 

else   

     split my work into two pieces   

     invoke the two pieces and wait for the results

 

ForkJoin 由一组工作线程组成。用来运行任务。核心是 work-stealing 算法 。能够有大量任务,但实际仅仅有少量真正的物理线程。默认是机器的 cpu 数量,也可指定。非常多其他工作的算法都能够在此基础之上进行。


尽管起初直接使用它的人可能不多,但将来会被非常多框架在底层使用,由于是如此基础,所以终于 ForkJoin 可能会无处不在。

一般而言,使用者仅仅须要关心两个方法 fork()   join() 。它们分别表示:子任务的异步运行和堵塞等待结果完毕。  


ForkJoin 框架的核心是 ForkJoinPool 类,实现了 work-stealing 算法,用于运行 ForkJoinTask 类型的任务(也就是依照该算法调度线程与任务,当然还负责解决好相关的一些其他问题)。

 

work-stealing 算法

work-stealing  是一种任务调度方法,由多个工作线程组成。每一个工作线程用一个双端队列维护一组任务。

Fork 的时候是把任务加到队列的头部。而不像一般的线程池那样是加到任务队列末尾。工作线程选择头部最新的任务来运行。当工作线程没有任务可运行时,它会尝试从其他线程的任务队列尾部窃取一个任务运行。假设没有任务运行了而且窃取其他任务失败,那么工作线程停止。


这样的方法的长处是降低了争用,由于工作线程从头获取任务,而窃取线程从尾部窃取任务。还有一个长处是递归的分治法使得早期产生的是较大的任务单元。而窃取到较大任务会进一步递归分解。因此也降低了尾部窃取的次数。

另外,父任务非常可能要等待子任务( join )。所以从队列头部子任务開始运行也是一种优化。


总之。它会使用有限的线程运行大量任务,同一时候保持各线程的任务都处于繁忙的运行状态。而尽量不让线程处于等待状态。

为了做到这点可能会从其他线程的任务队列中窃取任务来运行,所以叫 work-stealing


就像前面所说物理线程不能太多,过多的话切换管理开销就会较大,还会消耗很多其它的内存等资源,而且没有带来不论什么优点。默认是用 cpu数量的线程数,普通情况都 比較合适(比方 Runtime.getRuntime().availableProcessors() 返回处理器的数量),但详细的数值还和任务自身的特点有关,能够通过不同參数測试比較一下。而任务能够是大量的,由每一个线程的工作队列维护。


ForkJoin 是简化了一些开发人员的工作,假设不用 ForkJoin 。最原始的方式是自己手工切分任务并分别创建线程运行。

 

分治、并行、可伸缩的思考:

这三者 关系非常亲热。分治思想( divide-and-conquer )是一种简单朴素的思想。非常多问题都能够这样解决。 ForkJoin 就相当于分治法的并行版本号。   分治本身仅仅是解决这个问题的思想,既能够顺序运行也能够并行运行,可是在并行环境中更加有效。由于能够并行处理子问题。

而在并行方面,可并行处理问题要么是彼此全然独立的问题,要么是可分解单独处理的问题。可伸缩性又和是否能并行处理紧密相关。由于假设不能并行处理就要受到单机处理能力的限制,也就难以伸缩了。


ForkJoin MapReduce 两个并行计算框架的差别  

MapReduce 是把大数据集切分成小数据集,并行分布计算后再合并。

ForkJoin 是将一个问题递归分解成子问题。再将子问题并行运算后合并结果。

二者共同点:都是用于运行并行任务的。基本思想都是把问题分解为一个个子问题分别计算,再合并结果。应该说并行计算都是这样的思想,彼此独立的或可分解的。 从名字上看 Fork Map 都有切分的意思。 Join Reduce 都有合并的意思。比較类似。

差别:

1 )环境差异。分布式  vs  单机多核: ForkJoin 设计初衷针对单机多核(处理器数量非常多的情况)。 MapReduce 一開始就明白是针对非常多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而还有一个是想充分利用非常多机器做分布式计算。这是两种不同的的应用场景。有非常多差异,因此在细的编程模式方面有非常多不同。

2 )编程差异: MapReduce 通常是:做较大粒度的切分,一開始就先切分好任务然后再运行,而且彼此间在最后合并之前不须要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也很多其它。 ForkJoin 能够是较小粒度的切分,任务自己知道该怎样切分自己,递归地切分到一组合适大小的子任务来运行。由于是一个 JVM 内,所以彼此间通信是非常easy的。更像是传统编程方式。


ForkJoin 框架基本结构

ForkJoinPool 本身实现了 ExecutorService 接口,负责调度运行 ForkJoinTask

ForkJoinTask 是提交给 ForkJoinPool  运行的任务,本身也实现了 Future  接口。

 

ForkJoinTask 有两个子类 RecursiveAction RecursiveTask

  RecursiveAction  没有返回值(仅仅需 fork ); RecursiveTask 有返回值(须要合并)。

类似于 Runnable  Callable 一样。没有返回值一般意味着全部子任务都运行完了就可以,中间的子任务不须要 join 了。事实上要不要返回值都能够实现。有返回值能够直接合并。没有返回值能够把结果保存在共享的数据上。

 

而我们要做的是实现自己要完毕的任务,仅仅须要继承其一,并覆盖抽象方法 compute() 。在这种方法中实现自己的任务,递归分解任务。

 

ForkJoinPool 与一般的 ExecutorService 实现的区别

ForkJoin 实现了 ExecutorService 接口,这个接口就是用来把任务交给线程池中的工作线程去运行。 ForkJoin 也是一个 ExecutorService ,但差别在于 ForkJoin 使用了 work-stealing 算法,见前面的介绍。

普通的线程池是按 FIFO 的方式运行,而 ForkJoin 优先运行(由其他任务)后创建子任务。

对于大部分 产生子任务的 任务 模式, ForkJoin 的处理 实现 会非常高效。假设设置了异步模式,   ForkJoin 也可能适合运行事件类型(不须要 join )的任务。


影响 ForkJoin 加速效果的因素

理想效果是核越多加速效果越好。

可是并行不一定更快,參数不正确还可能更慢:

1)    并发数,即线程数。通常是可用的 cpu 数,默认就是这个,一般表现非常好。

2)    任务切分的粒度。假设切分粒度等于总任务量,一个任务运行,就相当于单线程顺序运行。每一个任务运行的计算量。太大的话加速效果有限。不能发挥到最好。相反,太小的话,消耗在任务管理的成本占了主要部分。导致还不如顺序运行的快。  

须要适当平衡二者。由于还和任务本身的特定有关。所以能够做个基准測试比較一下。

而总的运行时间还与任务的规模有关。


任务粒度应该适中,多大合适?好像在什么地方上看到说:经验上单个任务为 100-10000 个基本指令,当然还和任务本身的特定有关。

 

个人感觉多核 cpu 仅仅适用于解决计算密集型应用,由于实际问题可能 IO 等其它方面的瓶颈,多核也还是无法充分利用的。

   

使用 ForkJoin 的步骤:

ForkJoin 框架替我们完毕了一些工作,那么我们使用时还要完毕哪些工作:

1)    怎样运行单个任务。假设仅仅切分出一个任务运行,就相当于单线程顺序运行。

2)    怎样递归地切分任务(以及任务切分后是否须要合并结果)

3)    切分粒度多少合适(最小任务单元)

这些详细表如今:继承 ForkJoinTask 的一个子类。并实现抽象方法 compute()

在这种方法中实现自己的任务,递归分解任务。

 

这些准备好之后就能够启动了:创建一个表示所有任务的 ForkJoinTask 对象,创建一个 ForkJoinPool 的实例。把 task 作为參数运行 ForkJoinPool invoke 方法。

 

ForkJoin 任务外部运行总任务: execute 异步运行任务。没有返回结果 void invoke 运行任务并等待返回结果。结果是特定类型。 submit 运行一个任务,返回 ForkJoinTask (实际上是作为 Future 对象返回)。一般应该在外部使用 invoke 调用运行总任务。

execute submit 仅仅是为了实现 ExecutorService 规定的相关语义, invoke ForkJoin 中特有的。

 

ForkJoinTask 内部递归运行的过程中: fork 是异步运行。 invoke 是等待任务运行完毕。


 

详细实例:

多看看详细演示样例比較好。

1)    合并排序演示样例:

合并排序是常见的排序算法之中的一个。

演示样例实现了对一个整数数组的合并排序。同一时候还演示了不同并发数(线程数)与不同数组大小的组合測试。

代码在 <jdk_ home>/sample/ForkJoin/  中。

 

2)    把图片模糊处理演示样例:

一个图片能够被表示为一个 m*n 大小的整数数组,当中每一个整数表示一个像素(的颜色)。模糊处理之后的图像还是一个相同大小的整数数组。处理过程是把原来的每一个像素与周围像素的颜色求平均值就可以。

假设顺序运行就是从头到尾对每一个像素运行一次计算得到目标像素,由于每一个像素的计算是独立的,所以能够把这个整数数组切分成一块一块的子数组(即子任务)分别运行。任务不适合切分的过小。所以设定了一个常数阈值 10000 ,大小小于 10000 的子数组就直接运行,否则对半切分为两个子数组的任务分别运行。 文章   源码

       在我自己的机器上 i3 处理器   (i3, 4cpu ),并行确实快了不少。

 

其他的比如:求最大值 max 、平均值 avg 、求和 sum 等聚合函数都是能够分解计算的。

演示样例中都是对数组的处理。比較常见的是对数组、集合进行并行地处理操作。但也不限于此。   

网上有一些 Fibonacci  的演示样例。但这些演示样例并不适合展示 ForkJoin


 

Doug Lea JSR-166

说到 Java 并发编程,就不能不说到 Doug Lea JSR-166 Doug Lea 是并发编程方面的专家。纽约州立大学奥斯威戈分校的计算机教授。曾是 JCP 运行委员。是 JSR-166 leader

JSR-166 就是负责向 Java 语言中加入并发编程工具的,即我们见到的 java.util.concurrent 包(及子包)。还是《  Concurrent Programming in Java Design Principles and Patterns 》一书的作者,是这方面最早的书。

他还是知名的内存分配方法 dlmalloc 的作者,这是 C 语言中的动态内存分配函数 malloc 的一种普遍使用的实现。

   

參考资料:

  其他并行框架:

版权声明:本文博客原创文章,博客,未经同意,不得转载。

6.跑步者--并行编程框架 ForkJoin


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论