python中同步、多线程、异步IO、多线程对IO密集型的影响

系统 1628 0

 

 

 

 1、常见并发类型

I/ O密集型:

python中同步、多线程、异步IO、多线程对IO密集型的影响_第1张图片

            蓝色框表示程序执行工作的时间,红色框表示等待I/O操作完成的时间。此图没有按比例显示,因为internet上的请求可能比CPU指令要多花费几个数量级的时间,所以你的程序可能会花费大部分时间进行等待。
          

 CPU密集型:

python中同步、多线程、异步IO、多线程对IO密集型的影响_第2张图片

            IO密集型程序将时间花在cpu计算上。
          

常见并发类型以及区别:

python中同步、多线程、异步IO、多线程对IO密集型的影响_第3张图片

 

 

 2、同步版本

 我们将使用requests访问100个网页,使用同步的方式,requests的请求是同步的,所以代码就很好写了。

同步的版本代码逻辑简单,编写也会很相对容易。

            
              import
            
            
               requests

            
            
              import
            
            
               time

            
            
              def
            
            
               download_site(url,session):
    with session.get(url) as response:
        
            
            
              print
            
            
              (len(response.content))


            
            
              def
            
            
               download_all_site(sites):
    with requests.Session() as session:
        
            
            
              for
            
             url 
            
              in
            
            
               sites:
            download_site(url,session)


            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    sites 
            
            = [
            
              "
            
            
              https://www.baidu.com
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    download_all_site(sites)
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              执行时间:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            
              )

            
            
              #
            
            
              download_site()只从一个URL下载内容并打印其大小
            
            
              
#
            
            
              需要知道的是我们这里没有使用requests.get(),而使用了session.get(),我们使用requests.Session()创建了一个Session对象,每次请求使用了session.get(url,因为可以让requests运用一些神奇的网络小技巧,从而真正使程序加速。
            
            
              
#
            
            
              执行时间:33.91123294830322秒
            
          

 

 

 3、多线程

 ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。

你已经了解了Thread部分。那只是我们之前提到的一个思路。Pool部分是开始变得有趣的地方。这个对象将创建一个线程池,其中的每个线程都可以并发运行。最后,Executor是控制线程池中的每个线程如何以及何时运行的部分。它将在线程池中执行请求。

对我们很有帮助的是,标准库将ThreadPoolExecutor实现为一个上下文管理器,因此你可以使用with语法来管理Threads池的创建和释放。

一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每个站点上运行传入函数。最重要的是,它使用自己管理的线程池自动并发地运行它们。

来自其他语言,甚至Python 2的人可能想知道,在处理threading时,管理你习惯的细节的常用对象和函数在哪里,比如Thread.start()、Thread.join()和Queue。

这些都还在那里,你可以使用它们来实现对线程运行方式的精细控制。但是,从Python 3.2开始,标准库添加了一个更高级别的抽象,称为Executor,如果你不需要精细控制,它可以为你管理许多细节。

本例中另一个有趣的更改是,每个线程都需要创建自己的request . Session()对象。当你查看requests的文档时,不一定就能很容易地看出,但在阅读这个问题(https://github.com/requests/requests/issues/2766  )时,你会清晰地发现每个线程都需要一个单独的Session。

这是threading中有趣且困难的问题之一。因为操作系统可以控制任务何时中断,何时启动另一个任务,所以线程之间共享的任何数据都需要被保护起来,或者说是线程安全的。不幸的是,requests . Session()不是线程安全的。

根据数据是什么以及如何你使用它们,有几种策略可以使数据访问变成线程安全的。其中之一是使用线程安全的数据结构,比如来自 Python的queue模块的Queue。

这些对象使用低级基本数据类型,比如threading.Lock,以确保只有一个线程可以同时访问代码块或内存块。你可以通过ThreadPoolExecutor对象间接地使用此策略。

            
              import
            
            
               requests

            
            
              import
            
            
               concurrent.futures

            
            
              import
            
            
               threading

            
            
              import
            
            
               time


            
            
              #
            
            
              创建线程池
            
            
thread_local=
            
               threading.local()


            
            
              def
            
            
               get_session():
    
            
            
              if
            
            
              not
            
             getattr(thread_local,
            
              "
            
            
              session
            
            
              "
            
            
              ,None):
        thread_local.session 
            
            =
            
               requests.Session()
    
            
            
              return
            
            
               thread_local.session


            
            
              def
            
            
               download_site(url):
    session 
            
            =
            
               get_session()
    with session.get(url) as response:
        
            
            
              print
            
            
              (len(response.content))


            
            
              def
            
            
               download_all_site(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers
            
            =5
            
              ) as exector:
        exector.map(download_site,sites)


            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    sites 
            
            = [
            
              "
            
            
              https://www.baidu.com
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    download_all_site(sites)
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              执行时间:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            
              )

            
            
              #
            
            
              执行时间:6.152076244354248秒
            
          

 

这里要使用的另一种策略是线程本地存储。Threading.local()会创建一个对象,它看起来像一个全局对象但又是特定于每个线程的。在我们的示例中,这是通过threadLocal和get_session()完成的:

python中同步、多线程、异步IO、多线程对IO密集型的影响_第4张图片

ThreadLocal是threading模块中专门用来解决这个问题的。它看起来有点奇怪,但是你只想创建其中一个对象,而不是为每个线程创建一个对象。对象本身将负责从不同的线程到不同的数据的分开访问。

当get_session()被调用时,它所查找的session是特定于它所运行的线程的。因此,每个线程都将在第一次调用get_session()时创建一个单个的会话,然后在整个生命周期中对每个后续调用使用该会话。

最后,简要介绍一下选择线程的数量。你可以看到示例代码使用了5个线程。随意改变这个数字,看看总时间是如何变化的。你可能认为每次下载只有一个线程是最快的,但至少在我的系统上不是这样。我在5到10个线程之间找到了最快的结果。如果超过这个值,那么创建和销毁线程的额外开销就会抵消程序节省的时间。

这里比较困难的答案是,从一个任务到另一个任务的正确线程数不是一个常量。需要进行一些实验来得到。

 注意:request . Session()不是线程安全的。这意味着,如果多个线程使用同一个Session,那么在某些地方可能会发生上面描述的交互类型问题。

 

多线程代码的执行时序表:

  python中同步、多线程、异步IO、多线程对IO密集型的影响_第5张图片

 

 

 4、异步IO

asyncio的一般概念是一个单个的Python对象,称为事件循环,它控制每个任务如何以及何时运行。事件循环会关注每个任务并知道它处于什么状态。在实际中,任务可以处于许多状态,但现在我们假设一个简化的只有两种状态的事件循环。

就绪状态将表明一个任务有工作要做,并且已经准备好运行,而等待状态意味着该任务正在等待一些外部工作完成,例如网络操作。

我们简化的事件循环维护两个任务列表,每一个对应这些状态。它会选择一个就绪的任务,然后重新启动它。该任务处于完全控制之中,直到它配合地将控制权交还给事件循环为止。

当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表中,然后遍历等待列表中的每个任务,以查看I/O操作完成后某个任务是否已经就绪。时间循环知道就绪列表中的任务仍然是就绪的,因为它知道它们还没有运行。

一旦所有的任务都重新排序到正确的列表中,事件循环将选择下一个要运行的任务,然后重复这个过程。我们简化的事件循环会选择等待时间最长的任务并运行该任务。此过程会一直重复,直到事件循环结束。

asyncio的一个重要之处在于,如果没有刻意去释放控制权,任务是永远不会放弃控制权的。它们在操作过程中从不会被打断。这使得我们在asyncio中比在threading中能更容易地共享资源。你不必担心代码是否是线程安全的。

            
              import
            
            
               time

            
            
              import
            
            
               asyncio

            
            
              from
            
             aiohttp 
            
              import
            
            
               ClientSession
async 
            
            
              def
            
            
               download_site(session,url):
    
            
            
              global
            
            
               i
    
            
            
              try
            
            
              :
        async with session.get(url) as response:
            i
            
            =i+1
            
            
              print
            
            
              (i)
            
            
            
              return
            
            
               await response.read()
    
            
            
              except
            
            
               Exception as e:
         
            
            
              pass
            
            
              
async 
            
            
              def
            
            
               download_all_site(sites):
    async with ClientSession() as session:
        tasks 
            
            =
            
               []
        
            
            
              for
            
             url 
            
              in
            
            
               sites:
            task 
            
            =
            
               asyncio.create_task(download_site(session,url))
            tasks.append(task)
        result 
            
            = await asyncio.gather(*tasks) 
            
              #
            
            
              等待一组协程运行结束并接收结果
            
            
              print
            
            
              (result)



            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    i
            
            =
            
              0
    sites 
            
            = [
            
              "
            
            
              http://www.360kuai.com/
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    asyncio.run(download_all_site(sites))
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              执行时间:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            )
            

#执行时间:5.29184889793396秒

异步IO的执行时序表:

python中同步、多线程、异步IO、多线程对IO密集型的影响_第6张图片

asyncio版本的问题

此时asyncio有两个问题。你需要特殊的异步版本的库来充分利用asycio。如果你只是使用requests下载站点,那么速度会慢得多,因为requests的设计目的不是通知事件循环它被阻塞了。随着时间的推移,这个问题变得微不足道,因为越来越多的库包含了asyncio。

另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务处理的所有优势都将不存在。代码中的一个小错误可能会导致任务运行超时并长时间占用处理器,使需要运行的其他任务无法运行。如果一个任务没有将控制权交还给事件循环,则事件循环无法中断它。

考虑到这一点,我们来开始讨论一种完全不同的并发性——multiprocessing。

 


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论