python爬虫开发第一步——熟悉网络编程,python线程与进程

系统 1519 0
            
              最近博主手上有一个爬虫项目,开始深入研究python爬虫开发,这是我篇博客也相当于是我的学习笔记,我认为学习爬虫第一步,先学习python多线程与多进程,熟悉网络编程,接下来会陆续以博客的方式跟大家做分享。

            
          

多进程

Python实现多进程的方式主要有两种,一种方法是使用os模块中的fork方法,另一种方法是使用 multiprocessing模块。这两种方法的区别在于前者仅适用于 Unix/Linux操作系统,对 Windows不支持,后者则是跨平台的实现方式,目前爬虫程序多数是运行在Unix/Linux操作系统上

一、使用os模块的fork方式实现线程

fork方法调用一次,返回两次(操作系统会将当前进程(父进程)复制出一份子线程,这两个线程几乎完全相同,其中子线程永远返回0,父线程返回的是子线程的ID)

            
              
                import
              
               os

              
                # getpid()获取当前线程的ID,getppid()获取父线程的ID
              
              
                if
              
               __name__ 
              
                ==
              
              
                "__main__"
              
              
                :
              
              
                print
              
              
                (
              
              
                '当前进程是(%s)'
              
              
                %
              
              
                (
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
    pid 
              
                =
              
               os
              
                .
              
              fork
              
                (
              
              
                )
              
              
                if
              
               pid 
              
                <
              
              
                0
              
              
                :
              
              
                print
              
              
                (
              
              
                'error in fork'
              
              
                )
              
              
                elif
              
               pid 
              
                ==
              
              
                0
              
              
                :
              
              
                print
              
              
                (
              
              
                '我是子线程(%s),我的父线程是(%s)'
              
              
                ,
              
              
                (
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                ,
              
               os
              
                .
              
              getppid
              
                )
              
              
                )
              
              
                else
              
              
                :
              
              
                print
              
              
                (
              
              
                '我(%s)创建了一个子线程(%s).'
              
              
                ,
              
              
                (
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                ,
              
               pid
              
                )
              
              
                )
              
            
          
二、使用multiprocessing模块创建多线程

multiprocessing模块提供Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个 Process实例的创建,用start()方法启动进程用 join()方法实现进程间的同步。

            
              
                import
              
               os

              
                from
              
               multiprocessing 
              
                import
              
               Process


              
                def
              
              
                run_proc
              
              
                (
              
              name
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Child process %s (%s) Running...'
              
              
                %
              
              
                (
              
              name
              
                ,
              
               os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                print
              
              
                (
              
              
                'Parent process %s.'
              
              
                %
              
               os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                5
              
              
                )
              
              
                :
              
              
        p 
              
                =
              
               Process
              
                (
              
              target
              
                =
              
              run_proc
              
                ,
              
               args
              
                =
              
              
                (
              
              
                str
              
              
                (
              
              i
              
                )
              
              
                ,
              
              
                )
              
              
                )
              
              
                print
              
              
                (
              
              
                'Process will start.'
              
              
                )
              
              
        p
              
                .
              
              start
              
                (
              
              
                )
              
              
    p
              
                .
              
              join
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                'Process end.'
              
              
                )
              
            
          
三、multiprocessing模块提供了一个Pool类来代表进程池对象

Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。下面通过一个例子来演示进程池的工作流程,代码如下

            
              
                import
              
               os
              
                ,
              
              time
              
                ,
              
              random

              
                from
              
               multiprocessing 
              
                import
              
               Pool


              
                def
              
              
                run_task
              
              
                (
              
              name
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Task %s (pid = %s) is running...'
              
              
                %
              
              
                (
              
              name
              
                ,
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
    time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                *
              
              
                3
              
              
                )
              
              
                print
              
              
                (
              
              
                'Task %s is end.'
              
              
                %
              
               name
              
                )
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                print
              
              
                (
              
              
                'Current process is %s.'
              
              
                %
              
               os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
    p 
              
                =
              
               Pool
              
                (
              
              processes
              
                =
              
              
                3
              
              
                )
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                5
              
              
                )
              
              
                :
              
              
        p
              
                .
              
              apply_async
              
                (
              
              run_task
              
                ,
              
               args
              
                =
              
              
                (
              
              i
              
                ,
              
              
                )
              
              
                )
              
              
                print
              
              
                (
              
              
                'waiting for all subprocesses done...'
              
              
                )
              
              
    p
              
                .
              
              close
              
                (
              
              
                )
              
              
    p
              
                .
              
              join
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                'All subprocesses done.'
              
              
                )
              
            
          

Pool对象调用 join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用 close()之后就不能继续添加新的 Process了

四、进程间通信

Python提供了多种进程的通信方式,例如Queue、Pipe、Value+Array等

Pipe常用来两个进程之间的通信,Queue用来在多个进程之间实现通信

1.Queue实现:

            
              
                from
              
               multiprocessing 
              
                import
              
               Process
              
                ,
              
              Queue

              
                import
              
               os
              
                ,
              
              time
              
                ,
              
              random


              
                # 写数据进程执行的代码:
              
              
                def
              
              
                proc_write
              
              
                (
              
              p
              
                ,
              
               urls
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Process(%s)is writing,,,'
              
              
                %
              
               os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                for
              
               url 
              
                in
              
               urls
              
                :
              
              
        p
              
                .
              
              put
              
                (
              
              url
              
                )
              
              
                print
              
              
                (
              
              
                'Put %s to queue...'
              
              
                %
              
               url
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                )
              
              
                # 读数据进程执行的代码:
              
              
                def
              
              
                proc_read
              
              
                (
              
              q
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Process (%s) is reading...'
              
              
                %
              
               os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                )
              
              
                while
              
              
                True
              
              
                :
              
              
        url 
              
                =
              
               q
              
                .
              
              get
              
                (
              
              
                True
              
              
                )
              
              
                print
              
              
                (
              
              
                'Get %s from queue.'
              
              
                %
              
               url
              
                )
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 父进程创建Queue,并传给各个子线程
              
              
    q 
              
                =
              
               Queue
              
                (
              
              
                )
              
              
    proc_write1 
              
                =
              
               Process
              
                (
              
              target
              
                =
              
              proc_write
              
                ,
              
               args
              
                =
              
              
                (
              
              q
              
                ,
              
              
                [
              
              
                'url_1'
              
              
                ,
              
              
                'url_2'
              
              
                ,
              
              
                'url_3'
              
              
                ]
              
              
                )
              
              
                )
              
              
    proc_write2 
              
                =
              
               Process
              
                (
              
              target
              
                =
              
              proc_write
              
                ,
              
               args
              
                =
              
              
                (
              
              q
              
                ,
              
              
                [
              
              
                'url_4'
              
              
                ,
              
              
                'url_5'
              
              
                ,
              
              
                'url_6'
              
              
                ]
              
              
                )
              
              
                )
              
              
    proc_reader 
              
                =
              
               Process
              
                (
              
              target
              
                =
              
              proc_read
              
                ,
              
               args
              
                =
              
              
                (
              
              q
              
                ,
              
              
                )
              
              
                )
              
              
                # 启动子线程 proc_writer,写入:
              
              
    proc_write1
              
                .
              
              start
              
                (
              
              
                )
              
              
    proc_write2
              
                .
              
              start
              
                (
              
              
                )
              
              
                # 启动子线程 proc_reader,读取:
              
              
    proc_reader
              
                .
              
              start
              
                (
              
              
                )
              
              
                # 等待proc_writer结束:
              
              
    proc_write1
              
                .
              
              join
              
                (
              
              
                )
              
              
    proc_write2
              
                .
              
              join
              
                (
              
              
                )
              
              
                # proc_reader进程里是死循环,无法等待结束,只能强行终止:
              
              
    proc_reader
              
                .
              
              terminate
              
                (
              
              
                )
              
            
          

2.Pipe实现:
Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有 duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说 conn1和conn2均可收发。若 duplex为 False, conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如,在全双工模式下,可以调用 conn. send发送消息conn.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出 EOFError

            
              
                import
              
               multiprocessing

              
                import
              
               os
              
                ,
              
              time
              
                ,
              
              random


              
                def
              
              
                proc_send
              
              
                (
              
              pipe
              
                ,
              
               urls
              
                )
              
              
                :
              
              
                for
              
               url 
              
                in
              
               urls
              
                :
              
              
                print
              
              
                (
              
              
                'Process(%s) send:%s'
              
              
                %
              
              
                (
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                ,
              
               url
              
                )
              
              
                )
              
              
        pipe
              
                .
              
              send
              
                (
              
              url
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                )
              
              
                def
              
              
                proc_recv
              
              
                (
              
              pipe
              
                )
              
              
                :
              
              
                while
              
              
                True
              
              
                :
              
              
                print
              
              
                (
              
              
                'Process(%s) recv:%s'
              
              
                %
              
              
                (
              
              os
              
                .
              
              getpid
              
                (
              
              
                )
              
              
                ,
              
              pipe
              
                .
              
              recv
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                )
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 调用Pipe()方法,返回两个conn
              
              
    pipe 
              
                =
              
               multiprocessing
              
                .
              
              Pipe
              
                (
              
              
                )
              
              
    p1 
              
                =
              
               multiprocessing
              
                .
              
              Process
              
                (
              
              target
              
                =
              
              proc_send
              
                ,
              
               args
              
                =
              
              
                (
              
              pipe
              
                [
              
              
                0
              
              
                ]
              
              
                ,
              
              
                [
              
              
                'url_'
              
              
                +
              
              
                str
              
              
                (
              
              i
              
                )
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                10
              
              
                )
              
              
                ]
              
              
                )
              
              
                )
              
              
    p2 
              
                =
              
               multiprocessing
              
                .
              
              Process
              
                (
              
              target
              
                =
              
              proc_recv
              
                ,
              
               args
              
                =
              
              
                (
              
              pipe
              
                [
              
              
                1
              
              
                ]
              
              
                ,
              
              
                )
              
              
                )
              
              
    p1
              
                .
              
              start
              
                (
              
              
                )
              
              
    p2
              
                .
              
              start
              
                (
              
              
                )
              
              
    p1
              
                .
              
              join
              
                (
              
              
                )
              
              
    p2
              
                .
              
              join
              
                (
              
              
                )
              
            
          

多线程

Python的标准库提供了两个模块:thread和 threading, thread是低级模块, threading是高级模块,对 thread进行了封装。绝大多数情况下,我们只需要使用 threading这个高级模块。

一、用threading模块创建多线程

​ threading模块一般通过两种方式创建多线程:第一种方式是把一个函数传人并创建Thread实例,然后调用 start方法开始执行,代码如下:

            
              
                import
              
               random

              
                import
              
               time
              
                ,
              
               threading


              
                # 新线程执行的代码
              
              
                def
              
              
                thread_run
              
              
                (
              
              urls
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Current %s is running ...'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
                for
              
               url 
              
                in
              
               urls
              
                :
              
              
                print
              
              
                (
              
              
                '%s --->>> %s'
              
              
                %
              
              
                (
              
              threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                ,
              
               url
              
                )
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s ended.'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
                print
              
              
                (
              
              
                '%s is running...'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
t1 
              
                =
              
               threading
              
                .
              
              Thread
              
                (
              
              target
              
                =
              
              thread_run
              
                ,
              
               name
              
                =
              
              
                'Thread_1'
              
              
                ,
              
               args
              
                =
              
              
                (
              
              
                [
              
              
                'url_1'
              
              
                ,
              
              
                'url_2'
              
              
                ,
              
              
                'url_3'
              
              
                ]
              
              
                ,
              
              
                )
              
              
                )
              
              
t2 
              
                =
              
               threading
              
                .
              
              Thread
              
                (
              
              target
              
                =
              
              thread_run
              
                ,
              
               name
              
                =
              
              
                'Thread_2'
              
              
                ,
              
               args
              
                =
              
              
                (
              
              
                [
              
              
                'url_4'
              
              
                ,
              
              
                'url_5'
              
              
                ,
              
              
                'url_6'
              
              
                ]
              
              
                ,
              
              
                )
              
              
                )
              
              
t1
              
                .
              
              start
              
                (
              
              
                )
              
              
t2
              
                .
              
              start
              
                (
              
              
                )
              
              
t1
              
                .
              
              join
              
                (
              
              
                )
              
              
t2
              
                .
              
              join
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s ended.'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
            
          

第二种方式是直接从 threading.Thread继承并创建线程类,然后重写init方法和run方法。

代码如下:

            
              
                import
              
               random

              
                import
              
               threading

              
                import
              
               time


              
                class
              
              
                myThread
              
              
                (
              
              threading
              
                .
              
              Thread
              
                )
              
              
                :
              
              
                def
              
              
                __init__
              
              
                (
              
              self
              
                ,
              
               name
              
                ,
              
               urls
              
                )
              
              
                :
              
              
        threading
              
                .
              
              Thread
              
                .
              
              __init__
              
                (
              
              self
              
                ,
              
               name
              
                =
              
              name
              
                )
              
              
        self
              
                .
              
              urls 
              
                =
              
               urls

    
              
                def
              
              
                run
              
              
                (
              
              self
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              
                'Current %s is running ...'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
                for
              
               url 
              
                in
              
               self
              
                .
              
              urls
              
                :
              
              
                print
              
              
                (
              
              
                '%s --->>> %s'
              
              
                %
              
              
                (
              
              threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                ,
              
               url
              
                )
              
              
                )
              
              
            time
              
                .
              
              sleep
              
                (
              
              random
              
                .
              
              random
              
                (
              
              
                )
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s ended ...'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
                print
              
              
                (
              
              
                '%s is running...'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
              
t1 
              
                =
              
               myThread
              
                (
              
              name
              
                =
              
              
                'Thread_1'
              
              
                ,
              
               urls
              
                =
              
              
                [
              
              
                'url_1'
              
              
                ,
              
              
                'url_2'
              
              
                ,
              
              
                'url_3'
              
              
                ]
              
              
                )
              
              
t2 
              
                =
              
               myThread
              
                (
              
              name
              
                =
              
              
                'Thread_2'
              
              
                ,
              
               urls
              
                =
              
              
                [
              
              
                'url_4'
              
              
                ,
              
              
                'url_5'
              
              
                ,
              
              
                'url_6'
              
              
                ]
              
              
                )
              
              
t1
              
                .
              
              start
              
                (
              
              
                )
              
              
t2
              
                .
              
              start
              
                (
              
              
                )
              
              
t1
              
                .
              
              join
              
                (
              
              
                )
              
              
t2
              
                .
              
              join
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s ended.'
              
              
                %
              
               threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                )
              
            
          
二、线程同步

为了保证数据的正确性,需要对多个线程进行同步,这需要调用Thread的Lock和RLock对象

这两个对象都有 acquire方法和 release方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到 acquire和 release方法之间。

对于Lock对象而言,如果一个线程连续两次进行 acquire操作,那么由于之后没有 release,第二次 acquire将挂起线程。这会导致Lock对象永远不会 release,使得线程死锁。 RLock对象允许一个线程多次对其进行 acquire操作,因为在其内部通过一个 counter变量维护着线程 acquire的次数。而且每一次的 acquire操作必须有一个 release操作与之对应在所有的 release操作完成之后,别的线程才能申请该RLock对象。线程同步演示代码如下:

            
              
                import
              
               threading

mylock 
              
                =
              
               threading
              
                .
              
              RLock
              
                (
              
              
                )
              
              
num 
              
                =
              
              
                0
              
              
                class
              
              
                myThread
              
              
                (
              
              threading
              
                .
              
              Thread
              
                )
              
              
                :
              
              
                def
              
              
                __init__
              
              
                (
              
              self
              
                ,
              
               name
              
                )
              
              
                :
              
              
        threading
              
                .
              
              Thread
              
                .
              
              __init__
              
                (
              
              self
              
                ,
              
               name
              
                =
              
              name
              
                )
              
              
                def
              
              
                run
              
              
                (
              
              self
              
                )
              
              
                :
              
              
                global
              
               num
        
              
                while
              
              
                True
              
              
                :
              
              
            mylock
              
                .
              
              acquire
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s locked,Number:%d'
              
              
                %
              
              
                (
              
              threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                ,
              
               num
              
                )
              
              
                )
              
              
                if
              
               num 
              
                >=
              
              
                4
              
              
                :
              
              
                mylock
              
                .
              
              release
              
                (
              
              
                )
              
              
                print
              
              
                (
              
              
                '%s released,Number:%d'
              
              
                %
              
              
                (
              
              threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                ,
              
               num
              
                )
              
              
                )
              
              
                break
              
              
            num 
              
                +=
              
              
                1
              
              
                print
              
              
                (
              
              
                '%s released,Number:%d'
              
              
                %
              
              
                (
              
              threading
              
                .
              
              current_thread
              
                (
              
              
                )
              
              
                .
              
              name
              
                ,
              
               num
              
                )
              
              
                )
              
              
            mylock
              
                .
              
              release
              
                (
              
              
                )
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
    thread1 
              
                =
              
               myThread
              
                (
              
              
                'Thread1'
              
              
                )
              
              
    thread2 
              
                =
              
               myThread
              
                (
              
              
                'Thread2'
              
              
                )
              
              
    thread1
              
                .
              
              start
              
                (
              
              
                )
              
              
    thread2
              
                .
              
              start
              
                (
              
              
                )
              
            
          

更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论