Util.concurrent工具包概述

系统 1782 0

Util.concurrent工具包概述

Doug Lea

State University of New York at Oswego

dl@cs.oswego.edu

http://gee.cs.oswego.edu

翻译:

Cocia Lin( cocia@163.com )

Huihoo.org

原文

http://gee.cs.oswego.edu/dl/cpjslides/util.pdf

要点

-- 目标和结构

-- 主要的接口和实现

Sync : 获得 / 释放 (acquire/release) 协议

Channel : 放置 / 取走 (put/take) 协议

Executor : 执行 Runnable 任务

-- 每一个部分都有一些关联的接口和支持类

-- 简单的涉及其他的类和特性

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

目标

-- 一些简单的接口

- 但是覆盖大部分程序员需要小心处理代码的问题

-- 高质量实现

- 正确的,保守的,有效率的,可移植的

-- 可能作为将来标准的基础

- 获取经验和收集反馈信息

Util.concurrent工具包概述

Sync

-- acquire/release 协议的主要接口

- 用来定制锁,资源管理,其他的同步用途

- 高层抽象接口

- 没有区分不同的加锁用法

-- 实现

- Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore, FIFOSemaphore, PrioritySemaphore

<!-- [if !supportLists]--> n    <!-- [endif]--> 还有,有几个简单的实现,例如 ObservableSync, LayeredSync

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

独占锁

try {

lock.acquire();

try {

action();

}

finally {

lock.release();

}

}

catch (InterruptedException ie) { ... }

-- Java 同步块不适用的时候使用它

- 超时,回退 (back-off)

- 确保可中断

- 大量迅速锁定

- 创建 Posix 风格应用 (condvar)

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

独占例子

class ParticleUsingMutex {

int x; int y;

final Random rng = new Random();

final Mutex mutex = new Mutex();

public void move() {

try {

mutex.acquire();

try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }

finally { mutex.release(); }

}

catch (InterruptedException ie) {

Thread.currentThread().interrupt(); }

}

public void draw(Graphics g) {

int lx, ly;

try {

mutex.acquire();

try { lx = x; ly = y; }

finally { mutex.release(); }

}

catch (InterruptedException ie) {

Thread.currentThread().interrupt(); return; }

g.drawRect(lx, ly, 10, 10);

}

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

回退 (Backoff) 例子

class CellUsingBackoff {

private long val;

private final Mutex mutex = new Mutex();

void swapVal(CellUsingBackoff other)

throws InterruptedException {

if (this == other) return; // alias check

for (;;) {

mutex.acquire();

try {

I f ( other.mutex.attempt(0) ) {

try {

long t = val;

val = other.val;

other.val = t;

return;

}

finally { other.mutex.release(); }

}

}

finally { mutex.release(); };

Thread.sleep(100); // heuristic retry interval

}

}

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

读写锁

interface ReadWriteLock {

Sync readLock();

Sync writeLock();

}

-- 管理一对锁

- 和普通的锁一样的使用习惯

-- 对集合类很有用

- 半自动的方式实现 SyncSet, SyncMap, ...

-- 实现者使用不同的锁策略

- WriterPreference, ReentrantWriterPreference,

ReaderPreference, FIFO

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

ReadWriteLock 例子

-- 示范在读写锁中执行任何 Runnable 的包装类

class WithRWLock {

final ReadWriteLock rw;

public WithRWLock(ReadWriteLock l) { rw = l; }

public void performRead(Runnable readCommand)

throws InterruptedException {

rw.readLock().acquire();

try { readCommand.run(); }

finally { rw.readlock().release(); }

}

public void performWrite(...) // similar

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

闭锁 (Latch)

-- 闭锁是开始时设置为 false, 但一旦被设置为 true ,他将永远保持 true 状态

- 初始化标志

- 流结束定位

- 线程中断

- 事件出发指示器

-- CountDown 和他有点类似,不同的是, CountDown 需要一定数量的触发设置,而不是一次

-- 非常简单,但是广泛使用的类

- 替换容易犯错的开发代码

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

Latch Example 闭锁例子

class Worker implements Runnable {

Latch startSignal;

Worker(Latch l) { startSignal = l; }

public void run() {

startSignal.acquire();

// ... doWork();

}

}

class Driver { // ...

void main() {

Latch ss = new Latch();

for (int i = 0; i < N; ++i) // make threads

new Thread(new Worker( ss )).start();

doSomethingElse(); // don’t let run yet

ss.release(); // now let all threads proceed

}

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

信号 (Semaphores)

-- 服务于数量有限的占有者

- 使用许可数量构造对象 ( 通常是 0)

- 如果需要一个许可才能获取,等待,然后取走一个许可

- 释放的时候将许可添加回来

-- 但是真正的许可并没有转移 (But no actual permits change hands.)

- 信号量仅仅保留当前的计数值

-- 应用程序

- 锁:一个信号量可以被用作互斥体 (mutex)

- 一个独立的等待缓存或者资源控制的操作

- 设计系统是想忽略底层的系统信号

-- (phores ‘remember’ past signals) 记住已经消失的信号量

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

信号量例子

class Pool {

ArrayList items = new ArrayList();

HashSet busy = new HashSet();

final Semaphore available;

public Pool(int n) {

available = new Semaphore(n);

// ... somehow initialize n items ...;

}

public Object getItem() throws InterruptedException {

available.acquire();

return doGet();

}

public void returnItem(Object x) {

if (doReturn(x)) available.release();

}

synchronized Object doGet() {

Object x = items.remove(items.size()-1);

busy.add(x); // put in set to check returns

return x;

}

synchronized boolean doReturn(Object x) {

return busy.remove(x); // true if was present

}

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

屏障 (Barrier)

-- 多部分同步接口

- 每一部分都必须等待其他的分不撞倒屏障

-- CyclicBarrier

- CountDown 的一个可以重新设置的版本

- 对于反复划分算法很有用 (iterative partitioning algorithms)

-- Rendezvous

- 一个每部分都能够和其他部分交换信息的屏障

- 行为类似同时的在一个同步通道上 put take

- 对于资源交换协议很有用 (resource-exchange protocols)

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

通道 (Channel)

-- 为缓冲,队列等服务的主接口

Util.concurrent工具包概述

-- 具体实现

- LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

通道属性

-- 被定义为 Puttable Takable 的子接口

- 允许安装生产者 / 消费者模式执行

-- 支持可超时的操作 offer poll

- 当超时值是 0 时,可能会被阻塞

- 所有的方法能够抛出 InterruptedException 异常

-- 没有接口需要 size 方法

- 但是一些实现定义了这个方法

- BoundedChannel capacity 方法

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

通道例子

class Service { // ...

final Channel msgQ = new LinkedQueue();

public void serve() throws InterruptedException {

String status = doService();

msgQ.put(status);

}

public Service() { // start background thread

Runnable logger = new Runnable() {

public void run() {

try {

for(;;)

System.out.println( msqQ.take() );

}

catch(InterruptedException ie) {} }

};

new Thread(logger).start();

}

}

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

<!-- [if !supportEmptyParas]--> <!-- [endif]-->

运行器 (Executor)

-- 类似线程的类的主接口 font-size: 18pt; fon

Util.concurrent工具包概述


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论