我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,而当针对高质量Java多线程并发程序设计时,为防止死蹦等现象的出现,比如使用java之前的wait()、notify()和synchronized等,每每需要考虑性能、死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等诸多因素,往往会采用一些较为复杂的安全策略,加重了程序员的开发负担.万幸的是,在JDK1.5出现之后,Sun大神终于为我们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减少竞争条件(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为我们提供了更实用的并发程序模型。
java.util.concurrent下主要的接口和类:
Executor:具体Runnable任务的执行者。
ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个
Runnable,Callable提交到池中让其调度。
Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。
BlockingQueue:阻塞队列。
下面我写一个简单的事例程序:
FutureProxy .java
java.util.concurrent下主要的接口和类:
Executor:具体Runnable任务的执行者。
ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个
Runnable,Callable提交到池中让其调度。
Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。
BlockingQueue:阻塞队列。
下面我写一个简单的事例程序:
FutureProxy .java
package
org.test.concurrent;
/**
* <p>Title: LoonFramework</p>
* <p>Description:利用Future模式进行处理</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
* @author chenpeng
* @email:ceponline@yahoo.com.cn
* @version 0.1
*/
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
public abstract class FutureProxy < T > {
private final class CallableImpl implements Callable < T > {
public T call() throws Exception {
return FutureProxy. this .createInstance();
}
}
private static class InvocationHandlerImpl < T > implements InvocationHandler {
private Future < T > future;
private volatile T instance;
InvocationHandlerImpl(Future < T > future) {
this .future = future;
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
synchronized ( this ) {
if ( this .future.isDone()) {
this .instance = this .future.get();
} else {
while ( ! this .future.isDone()) {
try {
this .instance = this .future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return method.invoke( this .instance, args);
}
}
}
/**
* 实现java.util.concurrent.ThreadFactory接口
* @author chenpeng
*
*/
private static final class ThreadFactoryImpl implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon( true );
return thread;
}
}
private static ExecutorService service = Executors.newCachedThreadPool( new ThreadFactoryImpl());
protected abstract T createInstance();
protected abstract Class <? extends T > getInterface();
/**
* 返回代理的实例
* @return
*/
@SuppressWarnings( " unchecked " )
public final T getProxyInstance() {
Class <? extends T > interfaceClass = this .getInterface();
if (interfaceClass == null || ! interfaceClass.isInterface()) {
throw new IllegalStateException();
}
Callable < T > task = new CallableImpl();
Future < T > future = FutureProxy.service.submit(task);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class <?> [] { interfaceClass } , new InvocationHandlerImpl(future));
}
}
/**
* <p>Title: LoonFramework</p>
* <p>Description:利用Future模式进行处理</p>
* <p>Copyright: Copyright (c) 2007</p>
* <p>Company: LoonFramework</p>
* @author chenpeng
* @email:ceponline@yahoo.com.cn
* @version 0.1
*/
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
public abstract class FutureProxy < T > {
private final class CallableImpl implements Callable < T > {
public T call() throws Exception {
return FutureProxy. this .createInstance();
}
}
private static class InvocationHandlerImpl < T > implements InvocationHandler {
private Future < T > future;
private volatile T instance;
InvocationHandlerImpl(Future < T > future) {
this .future = future;
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
synchronized ( this ) {
if ( this .future.isDone()) {
this .instance = this .future.get();
} else {
while ( ! this .future.isDone()) {
try {
this .instance = this .future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return method.invoke( this .instance, args);
}
}
}
/**
* 实现java.util.concurrent.ThreadFactory接口
* @author chenpeng
*
*/
private static final class ThreadFactoryImpl implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon( true );
return thread;
}
}
private static ExecutorService service = Executors.newCachedThreadPool( new ThreadFactoryImpl());
protected abstract T createInstance();
protected abstract Class <? extends T > getInterface();
/**
* 返回代理的实例
* @return
*/
@SuppressWarnings( " unchecked " )
public final T getProxyInstance() {
Class <? extends T > interfaceClass = this .getInterface();
if (interfaceClass == null || ! interfaceClass.isInterface()) {
throw new IllegalStateException();
}
Callable < T > task = new CallableImpl();
Future < T > future = FutureProxy.service.submit(task);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class <?> [] { interfaceClass } , new InvocationHandlerImpl(future));
}
}