9.5 守护进程
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就立即终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
-
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
from
multiprocessing
import
Process
def
task(name):
print
(
'
%s is running
'
%
name)
time.sleep(
3
)
if
__name__
==
'
__main__
'
:
obj
= Process(target=task, args=(
'
egon
'
,))
obj.daemon
=True
#
设置obj为守护进程,并且父进程代码执行结束,obj即终止运行
obj.start()
#
发送信号给操作系统
print
(
'
主
'
)
9.6 互斥锁
互斥锁用来将并发编程串行,牺牲了效率而保证了数据安全
强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()
互斥锁和 join的区别:
二者的原理都是一样,都是将并发变成串行,从而保证有序
区别一:join是按照人为指定的顺序执行,而互斥锁是进程平等地竞争,谁先抢到谁执行,一个人拿到锁,其余人都等待
from
multiprocessing
import
Process,Lock
import
random
mutex
=
Lock()
def
task1(lock):
lock.acquire()
print
(
'
task1:名字是egon
'
)
print
(
'
task1:性别是male
'
)
lock.release()
def
task2(lock):
lock.acquire()
print
(
'
task2:名字是alex
'
)
print
(
'
task2:性别是male
'
)
lock.release()
def
task3(lock):
lock.acquire()
print
(
'
task3:名字是lxx
'
)
print
(
'
task3:性别是female
'
)
lock.release()
if
__name__
==
'
__main__
'
:
p1
=Process(target=task1,args=
(mutex,))
p2
=Process(target=task2,args=
(mutex,))
p3
=Process(target=task3,args=
(mutex,))
p1.start()
#
p1.start()
p2.start()
#
p1.join()
p3.start()
#
p2.start()
#
p2.join()
#
p3.start()
#
p3.join()
9.61 模拟抢票
互斥锁和 join的区别二:
互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行
import
json
import
time
import
random
import
os
from
multiprocessing
import
Process,Lock
mutex
=
Lock()
def
search():
time.sleep(random.randint(
1,3
))
with open(
'
db.json
'
,
'
r
'
,encoding=
'
utf-8
'
) as f:
dic
=
json.load(f)
print
(
'
%s 剩余票数:%s
'
%(os.getpid(),dic[
'
count
'
]))
def
get():
with open(
'
db.json
'
,
'
r
'
,encoding=
'
utf-8
'
) as f:
dic
=
json.load(f)
if
dic[
'
count
'
] >
0:
dic[
'
count
'
]-=1
with open(
'
db.json
'
,
'
w
'
,encoding=
'
utf-8
'
) as f:
json.dump(dic,f)
print
(
'
%s 购票成功
'
%
os.getpid())
def
task(lock):
search()
lock.acquire()
get()
lock.release()
if
__name__
==
'
__main__
'
:
for
i
in
range(10
):
p
=Process(target=task,args=
(mutex,))
p.start()
#
p.join()
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
进程之间通信必须找到一种介质,该介质必须满足: 1、是所有进程共享的 2、必须是内存空间 附加:帮我们自动处理好锁的问题
from
multiprocessing
import
Process,Manager,Lock
import
time
mutex
=
Lock()
def
task(dic,lock):
lock.acquire()
temp
=dic[
'
num
'
]
time.sleep(
0.1
)
dic[
'
num
'
]=temp-1
lock.release()
if
__name__
==
'
__main__
'
:
m
=
Manager()
dic
=m.dict({
'
num
'
:10
})
l
=
[]
for
i
in
range(10
):
p
=Process(target=task,args=
(dic,mutex))
l.append(p)
p.start()
for
p
in
l:
p.join()
print
(dic)
#
{'num': 0}
9.71创建队列的类Queue
底层就是以管道和锁定的方式实现:
队列 (管道+锁) :1、共享的空间 2、是内存空间 3、自动帮我们处理好锁定问题
-
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
-
maxsize是队列中允许最大项数,省略则无大小限制。
from
multiprocessing
import
Queue
q
=Queue(3)
#
maxsize=3
q.put(
'
first
'
)
q.put({
'
second
'
:None})
q.put(
'
三
'
)
#
q.put(4) #阻塞
print
(q.get())
#
first
print
(q.get())
#
{'second': None}
print
(q.get())
#
三
强调: 1、队列用来存成进程之间沟通的消息,数据量不应该过大 2、maxsize的值超过的内存限制就变得毫无意义
了解:block=True(默认值)、timeout
q=Queue(1
)
q.put(
'
first
'
,block=False)
#
q.put方法用以插入数据到队列中
q.put(
'
fourth
'
,block=False/True)
#
queue.Full/一直等
q.put(
'
first
'
,block=
True)
q.put(
'
fourth
'
,block=True,timeout=3)
#
等3秒后报错queue.Full
q.get(block
=False)
#
q.get方法可以从队列读取并且删除一个元素
q.get(block=False)
#
queue.Empty
q.get(block
=
True)
q.get(block
=True,timeout=2)
#
等2秒后报错queue.Empty

