最近博主手上有一个爬虫项目,开始深入研究python爬虫开发,这是我篇博客也相当于是我的学习笔记,我认为学习爬虫第一步,先学习python多线程与多进程,熟悉网络编程,接下来会陆续以博客的方式跟大家做分享。
多进程
Python实现多进程的方式主要有两种,一种方法是使用os模块中的fork方法,另一种方法是使用 multiprocessing模块。这两种方法的区别在于前者仅适用于 Unix/Linux操作系统,对 Windows不支持,后者则是跨平台的实现方式,目前爬虫程序多数是运行在Unix/Linux操作系统上
一、使用os模块的fork方式实现线程
fork方法调用一次,返回两次(操作系统会将当前进程(父进程)复制出一份子线程,这两个线程几乎完全相同,其中子线程永远返回0,父线程返回的是子线程的ID)
import
os
# getpid()获取当前线程的ID,getppid()获取父线程的ID
if
__name__
==
"__main__"
:
print
(
'当前进程是(%s)'
%
(
os
.
getpid
(
)
)
)
pid
=
os
.
fork
(
)
if
pid
<
0
:
print
(
'error in fork'
)
elif
pid
==
0
:
print
(
'我是子线程(%s),我的父线程是(%s)'
,
(
os
.
getpid
(
)
,
os
.
getppid
)
)
else
:
print
(
'我(%s)创建了一个子线程(%s).'
,
(
os
.
getpid
(
)
,
pid
)
)
二、使用multiprocessing模块创建多线程
multiprocessing模块提供Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个 Process实例的创建,用start()方法启动进程用 join()方法实现进程间的同步。
import
os
from
multiprocessing
import
Process
def
run_proc
(
name
)
:
print
(
'Child process %s (%s) Running...'
%
(
name
,
os
.
getpid
(
)
)
)
if
__name__
==
'__main__'
:
print
(
'Parent process %s.'
%
os
.
getpid
(
)
)
for
i
in
range
(
5
)
:
p
=
Process
(
target
=
run_proc
,
args
=
(
str
(
i
)
,
)
)
print
(
'Process will start.'
)
p
.
start
(
)
p
.
join
(
)
print
(
'Process end.'
)
三、multiprocessing模块提供了一个Pool类来代表进程池对象
Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。下面通过一个例子来演示进程池的工作流程,代码如下
import
os
,
time
,
random
from
multiprocessing
import
Pool
def
run_task
(
name
)
:
print
(
'Task %s (pid = %s) is running...'
%
(
name
,
os
.
getpid
(
)
)
)
time
.
sleep
(
random
.
random
(
)
*
3
)
print
(
'Task %s is end.'
%
name
)
if
__name__
==
'__main__'
:
print
(
'Current process is %s.'
%
os
.
getpid
(
)
)
p
=
Pool
(
processes
=
3
)
for
i
in
range
(
5
)
:
p
.
apply_async
(
run_task
,
args
=
(
i
,
)
)
print
(
'waiting for all subprocesses done...'
)
p
.
close
(
)
p
.
join
(
)
print
(
'All subprocesses done.'
)
Pool对象调用 join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用 close()之后就不能继续添加新的 Process了
四、进程间通信
Python提供了多种进程的通信方式,例如Queue、Pipe、Value+Array等
Pipe常用来两个进程之间的通信,Queue用来在多个进程之间实现通信
1.Queue实现:
from
multiprocessing
import
Process
,
Queue
import
os
,
time
,
random
# 写数据进程执行的代码:
def
proc_write
(
p
,
urls
)
:
print
(
'Process(%s)is writing,,,'
%
os
.
getpid
(
)
)
for
url
in
urls
:
p
.
put
(
url
)
print
(
'Put %s to queue...'
%
url
)
time
.
sleep
(
random
.
random
(
)
)
# 读数据进程执行的代码:
def
proc_read
(
q
)
:
print
(
'Process (%s) is reading...'
%
os
.
getpid
(
)
)
while
True
:
url
=
q
.
get
(
True
)
print
(
'Get %s from queue.'
%
url
)
if
__name__
==
'__main__'
:
# 父进程创建Queue,并传给各个子线程
q
=
Queue
(
)
proc_write1
=
Process
(
target
=
proc_write
,
args
=
(
q
,
[
'url_1'
,
'url_2'
,
'url_3'
]
)
)
proc_write2
=
Process
(
target
=
proc_write
,
args
=
(
q
,
[
'url_4'
,
'url_5'
,
'url_6'
]
)
)
proc_reader
=
Process
(
target
=
proc_read
,
args
=
(
q
,
)
)
# 启动子线程 proc_writer,写入:
proc_write1
.
start
(
)
proc_write2
.
start
(
)
# 启动子线程 proc_reader,读取:
proc_reader
.
start
(
)
# 等待proc_writer结束:
proc_write1
.
join
(
)
proc_write2
.
join
(
)
# proc_reader进程里是死循环,无法等待结束,只能强行终止:
proc_reader
.
terminate
(
)
2.Pipe实现:
Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有 duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说 conn1和conn2均可收发。若 duplex为 False, conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如,在全双工模式下,可以调用 conn. send发送消息conn.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出 EOFError
import
multiprocessing
import
os
,
time
,
random
def
proc_send
(
pipe
,
urls
)
:
for
url
in
urls
:
print
(
'Process(%s) send:%s'
%
(
os
.
getpid
(
)
,
url
)
)
pipe
.
send
(
url
)
time
.
sleep
(
random
.
random
(
)
)
def
proc_recv
(
pipe
)
:
while
True
:
print
(
'Process(%s) recv:%s'
%
(
os
.
getpid
(
)
,
pipe
.
recv
(
)
)
)
time
.
sleep
(
random
.
random
(
)
)
if
__name__
==
'__main__'
:
# 调用Pipe()方法,返回两个conn
pipe
=
multiprocessing
.
Pipe
(
)
p1
=
multiprocessing
.
Process
(
target
=
proc_send
,
args
=
(
pipe
[
0
]
,
[
'url_'
+
str
(
i
)
for
i
in
range
(
10
)
]
)
)
p2
=
multiprocessing
.
Process
(
target
=
proc_recv
,
args
=
(
pipe
[
1
]
,
)
)
p1
.
start
(
)
p2
.
start
(
)
p1
.
join
(
)
p2
.
join
(
)
多线程
Python的标准库提供了两个模块:thread和 threading, thread是低级模块, threading是高级模块,对 thread进行了封装。绝大多数情况下,我们只需要使用 threading这个高级模块。
一、用threading模块创建多线程
threading模块一般通过两种方式创建多线程:第一种方式是把一个函数传人并创建Thread实例,然后调用 start方法开始执行,代码如下:
import
random
import
time
,
threading
# 新线程执行的代码
def
thread_run
(
urls
)
:
print
(
'Current %s is running ...'
%
threading
.
current_thread
(
)
.
name
)
for
url
in
urls
:
print
(
'%s --->>> %s'
%
(
threading
.
current_thread
(
)
.
name
,
url
)
)
time
.
sleep
(
random
.
random
(
)
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
print
(
'%s is running...'
%
threading
.
current_thread
(
)
.
name
)
t1
=
threading
.
Thread
(
target
=
thread_run
,
name
=
'Thread_1'
,
args
=
(
[
'url_1'
,
'url_2'
,
'url_3'
]
,
)
)
t2
=
threading
.
Thread
(
target
=
thread_run
,
name
=
'Thread_2'
,
args
=
(
[
'url_4'
,
'url_5'
,
'url_6'
]
,
)
)
t1
.
start
(
)
t2
.
start
(
)
t1
.
join
(
)
t2
.
join
(
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
第二种方式是直接从 threading.Thread继承并创建线程类,然后重写init方法和run方法。
代码如下:
import
random
import
threading
import
time
class
myThread
(
threading
.
Thread
)
:
def
__init__
(
self
,
name
,
urls
)
:
threading
.
Thread
.
__init__
(
self
,
name
=
name
)
self
.
urls
=
urls
def
run
(
self
)
:
print
(
'Current %s is running ...'
%
threading
.
current_thread
(
)
.
name
)
for
url
in
self
.
urls
:
print
(
'%s --->>> %s'
%
(
threading
.
current_thread
(
)
.
name
,
url
)
)
time
.
sleep
(
random
.
random
(
)
)
print
(
'%s ended ...'
%
threading
.
current_thread
(
)
.
name
)
print
(
'%s is running...'
%
threading
.
current_thread
(
)
.
name
)
t1
=
myThread
(
name
=
'Thread_1'
,
urls
=
[
'url_1'
,
'url_2'
,
'url_3'
]
)
t2
=
myThread
(
name
=
'Thread_2'
,
urls
=
[
'url_4'
,
'url_5'
,
'url_6'
]
)
t1
.
start
(
)
t2
.
start
(
)
t1
.
join
(
)
t2
.
join
(
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
二、线程同步
为了保证数据的正确性,需要对多个线程进行同步,这需要调用Thread的Lock和RLock对象
这两个对象都有 acquire方法和 release方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到 acquire和 release方法之间。
对于Lock对象而言,如果一个线程连续两次进行 acquire操作,那么由于之后没有 release,第二次 acquire将挂起线程。这会导致Lock对象永远不会 release,使得线程死锁。 RLock对象允许一个线程多次对其进行 acquire操作,因为在其内部通过一个 counter变量维护着线程 acquire的次数。而且每一次的 acquire操作必须有一个 release操作与之对应在所有的 release操作完成之后,别的线程才能申请该RLock对象。线程同步演示代码如下:
import
threading
mylock
=
threading
.
RLock
(
)
num
=
0
class
myThread
(
threading
.
Thread
)
:
def
__init__
(
self
,
name
)
:
threading
.
Thread
.
__init__
(
self
,
name
=
name
)
def
run
(
self
)
:
global
num
while
True
:
mylock
.
acquire
(
)
print
(
'%s locked,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
if
num
>=
4
:
mylock
.
release
(
)
print
(
'%s released,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
break
num
+=
1
print
(
'%s released,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
mylock
.
release
(
)
if
__name__
==
'__main__'
:
thread1
=
myThread
(
'Thread1'
)
thread2
=
myThread
(
'Thread2'
)
thread1
.
start
(
)
thread2
.
start
(
)