在Mina的使用中,线程池的配置一个比较关键的环节,同时它也是Mina性能提高的一个有效的方法,在Mina的2.0以上版本中已经不再需要对Mina线程池的配置了,本系列文章都是基于当前的稳定版本Mina 1.1.7版来进行讲述的,Mina的2.0以上版本现在还都是M(millestone,即里程碑)版的,在1.5版本上2.0M版为稳定版本,但是在1.5+以上则为非稳定版本,所以,为了更好的进行讨论和学习,还是基于Mina 1.1.7版本进行讨论,如果使用Mina 2.0进行开发要注意JDK的版本问题,当然如果有能力的话也可以自行修改和编译Mina的2.0版本,这里对此就不再多说,使用2.0版本的同学可以不用理会本文的内容。
上面的内容都是基于Apache Mina提供的文档讲述,如有需要,请自行查找相关资料,在此不再赘述。
下面开始对Mina的线程模型的配置、使用、及ExcutorFilter的基本原理进行简单的讲解。
配置Mina的三种工作线程
在Mina的NIO模式中有三种I/O工作线程(这三种线程模型只在NIO Socket中有效,在NIO数据包和虚拟管道中没有,也不需要配置):
Acceptor thread
该线程的作用是接收客户端的连接,并将客户端的连接导入到I/O processor线程模型中。所谓的I/O processor线程模型就是Mina的I/O processor thread。Acceptor thread在调用了Acceptor.bind()方法后启动。每个Acceptor只能创建一个Acceptor thread,该线程模型不能配置,它由Mina自身提供。
Connector thread
该线程模型是客户端的连接线程模型,它的作用和Acceptor thread类似,它将客户端与服务器的连接导入到I/O processor线程模型中。同样地,该线程模型也是由Mina的客户端自动创建,该线程模型也不能进行配置。
I/O processor thread
该线程模型的主要作用就行接收和发送数据,所有的IO操作在服务器与客户端的连接建立后,所有的数据的接收和发送都是有该线程模型来负责的,知道客户端与服务器的连接关闭,该线程模型才停止工作。该线程模型可以由程序员根据需要进行配置。该线程模型默认的线程的数量为cpu的核数+1。若你的cpu为双核的,则你的I/O processor 线程的最大数量为3,同理若你的若你的cpu为四核的,那么你的I/O processor 线程的最大数量为5。
由上面的内容我们可以知道在Mina中可以配置的线程数量只有I/O processor,对于每个IoService再创建其实例的时候可以配置该IoService的I/O processor的线程数量。在SokcetConnector和SocketAccpetor中I/O Processor的数量是由CPU的核数+1来决定的。
他们的配置方式如下:
/*** * 配置SocketAcceptor监听器的I/O Processor的线程的数量, * 此处的I/O Processor的线程数量由CPU的核数决定,但Acceptor * 的线程数量只有一个,也就是接收客户端连接的线程数只有一个, * Acceptor的线程数量不能配置。 * */ SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime() .availableProcessors() + 1, Executors.newCachedThreadPool()); /*** * 配置SocketConnector监听器的I/O Processor的线程的数量, * 此处的I/O Processor的线程数量由CPU的核数决定,但SocketConnector * 的线程数量只有一个,也就是接收客户端连接的线程数只有一个, * SocketConnector的线程数量不能配置。 * */ SocketConnector connector = new SocketConnector(Runtime.getRuntime() .availableProcessors() + 1, Executors.newCachedThreadPool());
在上面的配置比较难以理解的地方就是Runtime.getRuntime().availableProcessors() + 1,它的意思就是由JVM根据系统的情况(即CPU的核数)来决定IO Processor的线程的数量。虽然这个线程的数量是在SocketAcceptor /SocketConnector 的构造器中进行的,但是对于SocketAcceptor /SocketConnector自身的线程没有影响,SocketAcceptor /SocketConnector的线程数量仍然为1。为SocketAcceptor /SocketConnector本身就封装了IO Processor,SocketAcceptor /SocketConnector只是由一个单独的线程来负责接收外部连接/向外部请求建立连接,当连接建立后,SocketAcceptor /SocketConnector会把数据收发的任务转交I/O Processor的线程。这个在本系列文章的《IoFilter和IoHandler的区别和联系》中的图示中可以看。
图中清晰的显示了IO Processor就是位于IoService和IoFilter之间,IoService负责和外部建立连接,而IoFilter则负责处理接收到的数据,IoProcessor则负责数据的收发工作。
关于配置IO Processor的线程数量还有一种比较“笨”的办法,那就一个一个试,你可以根据你的PC的硬件情况从1开始,每次加1,然后得出IO Processor的最佳的线程的数量。但是这种方式个人建议最好不要用了,上面的方法足矣。配置方法如下:
//从1--N开始尝试,N的最大数量为CPU核数+1 SocketAcceptor acceptor = new SocketAcceptor(N, Executors.newCachedThreadPool());
为Mina的IoFilterChain添加线程池
在Mina的API中提供了一个ExecutorFilter,该线程池实现了IoFilter接口,它可以作为一个IoFilter添加到IoFilterChain中,它的作用就是将I/O Processor中的事件通过其自身封装的一个线程池来转发到下一个过滤器中。在没有添加该线程模型时,I/O Processor的事件是通过方法来触发的,然后转发给IoHandler。在没有添加该线程池的时候,所有的事件都是在单线程模式下运行的,也就是说有的事件和处理(IO Processor,IoHandler,IoFilter)都是运行在同一个线程上,这个线程就是IO Processor的线程,但是这个线程的数量受到CPU核数的影响,因此系统的性能也直接受CPU核数的影响。
比较复杂的应用一般都会用到该线程池,你可以根据你的需求在IoFilterchain中你可以添加任意数量的线程池,这些线程池可以组合成一个事件驱动(SEDA)的处理模型。对于一般的应用来说不是线程的数量越多越好,线程的数量越多可能会加剧CPU切换线程所耗费的时间,反而会影响系统的性能,因此,线程的数量需要根据实际的需要由小到大,逐步添加,知道找到适合你系统的最佳线程的数量。ExcutorFilter的配置过程如下:
SocketAcceptor acceptor = ...; DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain(); filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool());
在配置该线程池的时候需要注意的一个问题是,当你使用自定的ProtocolCodecFactory时候一定要将线程池配置在该过滤器之后,如下所示:
DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain(); // 和CPU绑定的操作配置在过滤器的前面 filterChainBuilder.addLast("codec", new ProtocolCodecFactory(...)); // 添加线程池 filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool());
因为你自己实现的ProtocolCodecFactory直接读取和转换的是二进制数据,这些数据都是由和CPU绑定的I/O Processor来读取和发送的,因此为了不影响系统的性能,也应该将数据的编解码操作绑定到I/O Processor线程中,因为在Java中创建和线程切换都是比较耗资源的,因此建议将ProtocolCodecFactory配置在ExecutorFilter的前面。关于ProtocolCodecFactory详细讲述会在后续的文档中给出,此处就不多说了。
最后给出一个服务器线程模型完整配置的例子,该例子和KFCClient一起配置使用,详细代码在附件中,此处只给出代码的主要部分:
SocketAddress address = new InetSocketAddress("localhost", 4321); /*** * 配置SocketAcceptor监听器的I/O Processor的线程的数量, 此处的I/O * Processor的线程数量由CPU的核数决定,但Acceptor 的线程数量只有一个,也就是接收客户端连接的线程数只有一个, * Acceptor的线程数量不能配置。 * */ IoAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime() .availableProcessors() + 1, Executors.newCachedThreadPool()); acceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL); // 配置数据的编解码器 acceptor.getDefaultConfig().getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // 此处为你自己实现的编解码器 // config.getFilterChain().addLast("codec", new // ProtocolCodecFactory(...)); // 为IoFilterChain添加线程池 acceptor.getDefaultConfig().getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); acceptor.getDefaultConfig().getFilterChain().addLast("logger", new LoggingFilter()); // 绑定服务器端口 acceptor.bind(address, new KFCFoodPriceHandler()); System.out.println(" 服务器开始在 8000 端口监听 ......."); // ==========================================// // 此处为客户端的I/O Processor线程数的配置,你可以模仿 // // IoAcceptor配置来实现 // // ==========================================// /*** * 配置SocketConnector监听器的I/O Processor的线程的数量, 此处的I/O * Processor的线程数量由CPU的核数决定,但SocketConnector * 的线程数量只有一个,也就是接收客户端连接的线程数只有一个, SocketConnector的线程数量不能配置。 * */ // SocketConnector connector = new SocketConnector(Runtime.getRuntime() // .availableProcessors() + 1, Executors.newCachedThreadPool()); }