本文从参考了网络上的许多内容,主要为分布式进程及其的评论内容。
所谓分布式运算,既可以指在同一台机器上利用多进程(线程)进行运算,又可以指将计算任务进行分解,利用多台机器进行运算。本文中的分布式单指最后一种。
python的标准库
multiprocessing
中存在一个
managers
的子模块,该模块支持将多进程分布到多台机器上。选择一个进程来调度任务,其他进程进行计算,从而实现分布式运算。而调度进程和计算进程之间的通信,是通过网络来进行的,用到了python的
socket
模块。
1. linux中的分布式运算
以参考文章中的例子来进行说明,首先写一个调度程序:
# task_manager.py
import
random
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 发送任务的队列
task_queue
=
queue
.
Queue
(
)
# 接收结果的队列
result_queue
=
queue
.
Queue
(
)
# 将两个队列注册到网络上,可以被其他机器访问
BaseManager
.
register
(
'get_task_queue'
,
callable
=
lambda
:
task_queue
)
BaseManager
.
register
(
'get_result_queue'
,
callable
=
lambda
:
result_queue
)
# 实例化一个manager,绑定端口5000,ip为空表名为本地ip,设置验证码
manager
=
BaseManager
(
address
=
(
''
,
5000
)
,
authkey
=
b
'abc'
)
# 启动manager
manager
.
start
(
)
# 获取网络中的queue对象
task
=
manager
.
get_task_queue
(
)
result
=
manager
.
get_result_queue
(
)
# 放几个任务进去
for
i
in
range
(
10
)
:
n
=
random
.
randint
(
0
,
10000
)
print
(
'Put task %d...'
%
n
)
task
.
put
(
n
)
# 监听是否有结果传回
print
(
'Try get results...'
)
for
i
in
range
(
10
)
:
r
=
result
.
get
(
timeout
=
10
)
print
(
'Result: %s'
%
r
)
# 关闭
manager
.
shutdown
(
)
再写一个执行任务的程序:
# task_worker.py
import
time
,
queue
from
multiprocessing
.
managers
import
BaseManager
# 获取网络上被master注册的queue
BaseManager
.
register
(
'get_task_queue'
)
BaseManager
.
register
(
'get_result_queue'
)
# 连接到服务器,即任务分配进程的地址
server_address
=
'191.168.6.67'
print
(
f
'Connecting to server {server_address}...'
)
# 注意验证码要保持一致
m
=
BaseManager
(
address
=
(
server_address
,
5000
)
,
authkey
=
b
'abc'
)
m
.
connect
(
)
# 获取queue对象
task
=
m
.
get_task_queue
(
)
result
=
m
.
get_result_queue
(
)
# 开始进行计算
for
i
in
range
(
10
)
:
try
:
n
=
task
.
get
(
timeout
=
1
)
# 获取manager进程放入task中的值
print
(
f
'run task {n}*{n}'
)
r
=
f
'{n} * {n} = {n*n}'
time
.
sleep
(
1
)
result
.
put
(
r
)
# 将计算的结果放入result队列中
except
queue
.
Empty
:
print
(
'task queue is empty.'
)
print
(
'worker exit.'
)
将上面两个程序分别拷贝到局域网中的两台linux服务器上,先运行
task_manager.py
,随即运行
task_worker.py
,就可以看到计算的结果了。
注意,在运行
task_worker.py
时,可能会提示一下错误:
OSError: [Errno 113] No route to host
这是由于系统设置了防火墙,屏蔽了通过设置的端口进行通信导致的。
我使用的linux是centos,要查看哪些端口开放,可以使用:
firewall-cmd --list-ports
开放5000端口,可以使用:
firewall-cmd --zone=public --add-port=5000/tcp --permanent
设置完成后运行:
firewall-cmd --reload
重启防火墙进行生效。
执行完上述操作后,在linux(centos)下运行应该没有问题。
2. windows中的分布式运算
注意,以上脚本在windows系统中是无法运行的,经过我的试验,主要发现了两个问题:
一、
lambda
定义函数的问题
在
task_manager.py
中,将队列注册到网络上时,对于其中的
callable
参数,使用的是以
lambda
关键字定义的函数,在windows下这是行不通的。因此,需要显示地定义两个函数,然后传递给
callable
:
_task
=
queue
.
Queue
(
)
_result
=
queue
.
Queue
(
)
def
task_queue
(
)
:
return
_task
def
result_queue
(
)
:
return
_result
然后,修改
callable=lambda: task_queue
和
callable=lambda: result_queue
分别为
callable=task_queue
和
callable=result_queue
。
二、__name__问题
在windows下,
manager.start()
及其之后的命令需要放在
if __name__ == '__main__':
语句块中执行,否则会报错。
三、绑定地址问题
在linux下,实例化一个manager时,ip可以留空,默认为本机ip,但在windows下面行不通。可以指定为
localhost
或
127.0.0.1
,这样,在同一台机器上分别运行两个脚本是没有问题的,但将脚本分别放到两台机器上运行会出问题。
解决方法是,将ip指定为本机在局域网中的ip地址,如
191.168.1.123
,然后两台机器就可以正常工作了。