1. 锁:Lock (1次放1个)同步锁
线程安全,多线程操作时,内部会让所有线程排队处理。如:list/dict/Queue
线程不安全 + 人 => 排队处理。
需求:
a. 创建100个线程,在列表中追加8
b. 创建100个线程
v = []
锁
- 把自己的添加到列表中。
- 在读取列表的最后一个。
解锁
以后锁一个代码块:
import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() # 锁的区域--------- v.append(arg) # ---------------- time.sleep(0.01) # ---------------- m = v[-1] # ----------------- print (arg,m) # ---------------- lock.release() # ---------锁的区域 for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start()
2. 锁:RLock (1次放1个)递归锁
支持锁多次,解多次
import threading import time v = [] lock = threading.RLock() # 参数是锁的数量 def func(arg): lock.acquire() # 加几次锁,下面就释放几次,lock不支持多次锁,Rlock支持 lock.acquire() v.append(arg) time.sleep( 0.01 ) m = v[-1 ] print (arg,m) lock.release() lock.release() for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start()
3. 锁:BoundedSemaphore(1次放N个)信号量
import time import threading lock = threading.BoundedSemaphore(3 ) def func(arg): lock.acquire() print (arg) time.sleep( 1 ) lock.release() for i in range(20 ): t =threading.Thread(target=func,args= (i,)) t.start()
4. 锁:Condition(每次释放前需要输入要释放的个数x)
import time import threading lock = threading.Condition() # ############# 方式一 ############## def func(arg): print ( ' 线程进来了 ' ) lock.acquire() lock.wait() # 加锁 print (arg) time.sleep( 1 ) lock.release() for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start() while True: inp = int(input( ' >>> ' )) lock.acquire() lock.notify(inp) lock.release() # ############# 方式二 ############## def xxxx(): print ( ' 来执行函数了 ' ) input( " >>> " ) # 让每一个线程完成某个条件,完成之后释放 # ct = threading.current_thread() # 获取当前线程 # ct.getName() return True def func(arg): print ( ' 线程进来了 ' ) lock.wait_for(xxxx) print (arg) time.sleep( 1 ) for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start()
5. 锁:Event(1次放所有)
import time import threading lock = threading.Event() def func(arg): print ( ' 线程来了 ' ) lock.wait() # 加锁:红灯 print (arg) for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start() input( " >>>> " ) lock.set() # 绿灯 lock.clear() # 再次变红灯 for i in range(10 ): t =threading.Thread(target=func,args= (i,)) t.start() input( " >>>> " ) lock.set()
总结:
线程安全,指的是容器存放数据的时候线程安全;列表和字典线程安全;
为什么要加锁?
- 非线程安全
- 控制一段代码
6. threading.local
作用:
内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离。
{
线程ID: {...}
线程ID: {...}
线程ID: {...}
线程ID: {...}
}
示例:
import time
import threading
v = threading.local()
def func(arg):
# 内部会为当前线程创建一个空间用于存储:phone=自己的值
v.phone = arg
time.sleep(2)
print(v.phone,arg) # 去当前线程自己空间取值
for i in range(10):
t =threading.Thread(target=func,args=(i,))
t.start()
7. 线程池
最多能创建多少个线程
创建多个线程用线程池,避免无节制创建线程
from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep( 2 ) print (a1,a2) # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5 ) for i in range(40 ): # 去线程池中申请一个线程,让线程执行task函数。 pool.submit(task,i,8)
8. 生产者消费者模型
三部件:
生产者
队列,先进先出
扩展: 栈,后进先出
消费者
问:生产者消费者模型解决了什么问题?不用一直等待的问题。
示例:
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 """ while True: time.sleep( 2 ) q.put( ' 包子 ' ) print ( ' 厨师%s 生产了一个包子 ' % id ) for i in range(1,4 ): t = threading.Thread(target=producer,args= (i,)) t.start() def consumer(id): """ 消费者 """ while True: time.sleep( 1 ) v1 = q.get() print ( ' 顾客 %s 吃了一个包子 ' % id) for i in range(1,3 ): t = threading.Thread(target=consumer,args= (i,)) t.start()