基于socket的python分布式运算中多服务器间的通信问题

系统 1560 0

本文从参考了网络上的许多内容,主要为分布式进程及其的评论内容。

所谓分布式运算,既可以指在同一台机器上利用多进程(线程)进行运算,又可以指将计算任务进行分解,利用多台机器进行运算。本文中的分布式单指最后一种。

python的标准库 multiprocessing 中存在一个 managers 的子模块,该模块支持将多进程分布到多台机器上。选择一个进程来调度任务,其他进程进行计算,从而实现分布式运算。而调度进程和计算进程之间的通信,是通过网络来进行的,用到了python的 socket 模块。

1. linux中的分布式运算

以参考文章中的例子来进行说明,首先写一个调度程序:

            
              
                # task_manager.py
              
              
                import
              
               random
              
                ,
              
               queue

              
                from
              
               multiprocessing
              
                .
              
              managers 
              
                import
              
               BaseManager



              
                # 发送任务的队列
              
              
task_queue 
              
                =
              
               queue
              
                .
              
              Queue
              
                (
              
              
                )
              
              
                # 接收结果的队列
              
              
result_queue 
              
                =
              
               queue
              
                .
              
              Queue
              
                (
              
              
                )
              
              
                # 将两个队列注册到网络上,可以被其他机器访问
              
              
BaseManager
              
                .
              
              register
              
                (
              
              
                'get_task_queue'
              
              
                ,
              
              
                callable
              
              
                =
              
              
                lambda
              
              
                :
              
               task_queue
              
                )
              
              
BaseManager
              
                .
              
              register
              
                (
              
              
                'get_result_queue'
              
              
                ,
              
              
                callable
              
              
                =
              
              
                lambda
              
              
                :
              
               result_queue
              
                )
              
              
                # 实例化一个manager,绑定端口5000,ip为空表名为本地ip,设置验证码
              
              
manager 
              
                =
              
               BaseManager
              
                (
              
              address
              
                =
              
              
                (
              
              
                ''
              
              
                ,
              
              
                5000
              
              
                )
              
              
                ,
              
               authkey
              
                =
              
              b
              
                'abc'
              
              
                )
              
              
                # 启动manager
              
              
manager
              
                .
              
              start
              
                (
              
              
                )
              
              
                # 获取网络中的queue对象
              
              
task 
              
                =
              
               manager
              
                .
              
              get_task_queue
              
                (
              
              
                )
              
              
result 
              
                =
              
               manager
              
                .
              
              get_result_queue
              
                (
              
              
                )
              
              
                # 放几个任务进去
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                10
              
              
                )
              
              
                :
              
              
    n 
              
                =
              
               random
              
                .
              
              randint
              
                (
              
              
                0
              
              
                ,
              
              
                10000
              
              
                )
              
              
                print
              
              
                (
              
              
                'Put task %d...'
              
              
                %
              
               n
              
                )
              
              
    task
              
                .
              
              put
              
                (
              
              n
              
                )
              
              
                # 监听是否有结果传回
              
              
                print
              
              
                (
              
              
                'Try get results...'
              
              
                )
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                10
              
              
                )
              
              
                :
              
              
    r 
              
                =
              
               result
              
                .
              
              get
              
                (
              
              timeout
              
                =
              
              
                10
              
              
                )
              
              
                print
              
              
                (
              
              
                'Result: %s'
              
              
                %
              
               r
              
                )
              
              
                # 关闭
              
              
manager
              
                .
              
              shutdown
              
                (
              
              
                )
              
            
          

再写一个执行任务的程序:

            
              
                # task_worker.py
              
              
                import
              
               time
              
                ,
              
               queue

              
                from
              
               multiprocessing
              
                .
              
              managers 
              
                import
              
               BaseManager


              
                # 获取网络上被master注册的queue
              
              
BaseManager
              
                .
              
              register
              
                (
              
              
                'get_task_queue'
              
              
                )
              
              
BaseManager
              
                .
              
              register
              
                (
              
              
                'get_result_queue'
              
              
                )
              
              
                # 连接到服务器,即任务分配进程的地址
              
              
server_address 
              
                =
              
              
                '191.168.6.67'
              
              
                print
              
              
                (
              
              f
              
                'Connecting to server {server_address}...'
              
              
                )
              
              
                # 注意验证码要保持一致
              
              
m 
              
                =
              
               BaseManager
              
                (
              
              address
              
                =
              
              
                (
              
              server_address
              
                ,
              
              
                5000
              
              
                )
              
              
                ,
              
               authkey
              
                =
              
              b
              
                'abc'
              
              
                )
              
              
m
              
                .
              
              connect
              
                (
              
              
                )
              
              
                # 获取queue对象
              
              
task 
              
                =
              
               m
              
                .
              
              get_task_queue
              
                (
              
              
                )
              
              
result 
              
                =
              
               m
              
                .
              
              get_result_queue
              
                (
              
              
                )
              
              
                # 开始进行计算
              
              
                for
              
               i 
              
                in
              
              
                range
              
              
                (
              
              
                10
              
              
                )
              
              
                :
              
              
                try
              
              
                :
              
              
        n 
              
                =
              
               task
              
                .
              
              get
              
                (
              
              timeout
              
                =
              
              
                1
              
              
                )
              
              
                # 获取manager进程放入task中的值
              
              
                print
              
              
                (
              
              f
              
                'run task {n}*{n}'
              
              
                )
              
              
        r 
              
                =
              
               f
              
                '{n} * {n} = {n*n}'
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                1
              
              
                )
              
              
        result
              
                .
              
              put
              
                (
              
              r
              
                )
              
              
                # 将计算的结果放入result队列中
              
              
                except
              
               queue
              
                .
              
              Empty
              
                :
              
              
                print
              
              
                (
              
              
                'task queue is empty.'
              
              
                )
              
              
                print
              
              
                (
              
              
                'worker exit.'
              
              
                )
              
            
          

将上面两个程序分别拷贝到局域网中的两台linux服务器上,先运行 task_manager.py ,随即运行 task_worker.py ,就可以看到计算的结果了。

注意,在运行 task_worker.py 时,可能会提示一下错误:

            
              OSError: [Errno 113] No route to host

            
          

这是由于系统设置了防火墙,屏蔽了通过设置的端口进行通信导致的。

我使用的linux是centos,要查看哪些端口开放,可以使用:

            
              firewall-cmd --list-ports

            
          

开放5000端口,可以使用:

            
              firewall-cmd --zone=public --add-port=5000/tcp --permanent

            
          

设置完成后运行:

            
              firewall-cmd --reload

            
          

重启防火墙进行生效。

执行完上述操作后,在linux(centos)下运行应该没有问题。

2. windows中的分布式运算

注意,以上脚本在windows系统中是无法运行的,经过我的试验,主要发现了两个问题:

一、 lambda 定义函数的问题
task_manager.py 中,将队列注册到网络上时,对于其中的 callable 参数,使用的是以 lambda 关键字定义的函数,在windows下这是行不通的。因此,需要显示地定义两个函数,然后传递给 callable

            
              _task 
              
                =
              
               queue
              
                .
              
              Queue
              
                (
              
              
                )
              
              
_result 
              
                =
              
               queue
              
                .
              
              Queue
              
                (
              
              
                )
              
              
                def
              
              
                task_queue
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               _task


              
                def
              
              
                result_queue
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               _result 

            
          

然后,修改 callable=lambda: task_queue callable=lambda: result_queue 分别为 callable=task_queue callable=result_queue

二、__name__问题
在windows下, manager.start() 及其之后的命令需要放在 if __name__ == '__main__': 语句块中执行,否则会报错。

三、绑定地址问题
在linux下,实例化一个manager时,ip可以留空,默认为本机ip,但在windows下面行不通。可以指定为 localhost 127.0.0.1 ,这样,在同一台机器上分别运行两个脚本是没有问题的,但将脚本分别放到两台机器上运行会出问题。
解决方法是,将ip指定为本机在局域网中的ip地址,如 191.168.1.123 ,然后两台机器就可以正常工作了。


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论