1. 何时使用线程池
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能;
尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。
当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
2. 线程池
线程池的基类是concurrent.futures模块中的Executor, Executor提供了两个子类,即ThreadPoolExecutor 和 ProcessPoolExecutor
ThreadPoolExecutor 线程池
ProcessPoolExecutor 进程池
使用线程池/进程池来管理并发编程,那么只要将相应的task函数提交给线程池/进程池,剩下的就由线程池/进程池来做
常用方法:
submit(fn, *args, **kwargs)
fn 函数
*args 传给fn函数的参数
*kargs 以关键字参数的形式传给fn函数的参数
map(func, *iterables, timeout = None, chunksize = 1)
类似全局函数map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对iterables执行map处理
shutdown(wait=True)
程序将task函数提交给线程池后,submit方法会返回一个Future对象,Future类主要用于获取线程任务函数的返回值。
由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以使用Future来代表
Future提供了以下方法:
cancel(): 取消该Future代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False;否则,程序会取消该任务,并返回True。
cancelled(): 返回Future代表的线程任务是否被成功取消
running(): 如果该Future代表的线程任务正在执行、不可被取消,该方法返回True。
done(): 如果该Future代表的线程任务被成功取消或这行完成,则该方法返回True。
result(timeout=None): 获取该Future代表的线程任务最后返回的结果。如果Future代表的线程任务还未能完成,该方法将会阻塞当前线程,其中timeout参数指定最多阻塞多少秒
exception(timeout=None): 获取该Future代表的线程任务所引发的异常,如果该任务成功完成,没有异常,则该方法返回None
add_done_callback(fn): 为该Future代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该fn函数
在用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列。
调用shutdown()方法后的线程池不再接受新任务,但会将以前所有的已提交的任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
示例:
"""
使用线程池来执行线程任务:
1. 调用ThreadPoolExecutor类的构造器创建一个线程池
2. 定义一个普通函数作为线程任务
3. 调用ThreadPoolExecutor对象的submit()方法来提交线程任务
4. 当不想提交任何任务时,调用ThreadPoolExecutor对象的shutdown()方法来关闭线程池
"""
from concurrent.futures import ThreadPoolExecutor
import threading
import time
#线程任务
def task(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
#创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers = 2)
#向线程池提交一个task,
future1 = pool.submit(task, 50)
#向线程池再提交一个task
future2 = pool.submit(task, 100)
#判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
#判断future2代表的任务是否结束
print(future2.done())
#查看future1代表的任务返回结果
print(future1.result()) #会阻塞主线程在此等待
#查看future2代表的任务返回结果
print(future2.result())
#如果想不阻塞主线程,可以使用add_done_callback方法
#示例:
#def callback_get_result(future):
# future.result()
#
#future1.add_done_callback(callback_get_result)
#future2.add_done_callback(callback_get_result)
#关闭线程池
pool.shutdown()
由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用with语句来管理线程池,这样即可避免手动关闭线程池
Executor提供了一个map(func, *iterables, timeout=None, chunksize=1)方法
类似于全局函数map()
区别在于线程池的map()会为iterables的每个元素启动一个线程,已并发方式执行func函数。这种方式相当于启动len(iterables)个线程,并收集每个线程的执行结果。
示例:
with ThreadPoolExecutor(max_workers=4) as pool:
#使用map,元组中有3个元素,因此程序启动3条线程来执行task
results = pool.map(task, (50,100,250))