出自 | hackpython(ID:hackpython)
Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。
怎么实现呢?
使用线程的方式
当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。
通过 ThreadPoolExecutor 来实现
from time import sleep
from concurrent.futures import ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 创建线程池执行器
executor = ThreadPoolExecutor( 2 )
app = Flask(__name__)
def run_jobs () :
# 交由线程去执行耗时任务
executor.submit(long_task, 'hello' , 123 )
return 'long task running.'
# 耗时任务
def long_task (arg1, arg2) :
print( "args: %s %s!" % (arg1, arg2))
sleep( 5 )
print( "Task is done!" )
if __name__ == '__main__' :
app.run()
pip install redis
from celery import Celery
app = Flask(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config[ 'CELERY_BROKER_URL' ] = 'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config[ 'CELERY_RESULT_BACKEND' ] = 'redis://localhost:6379/0'
# 初始化Celery
celery = Celery(app.name, broker=app.config[ 'CELERY_BROKER_URL' ])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)
def long_task (arg1, arg2) :
# 耗时任务的逻辑
return result
def index () :
task = long_task.delay( 1 , 2 )
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行
@app.route( '/' , methods=[ 'GET' , 'POST' ])
def index () :
task = long_task.apply_async(args=[ 1 , 2 ], countdown= 60 )
delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
@celery.task(bind=True)
def long_task ( self ) :
verb = [ 'Starting up' , 'Booting' , 'Repairing' , 'Loading' , 'Checking' ]
adjective = [ 'master' , 'radiant' , 'silent' , 'harmonic' , 'fast' ]
noun = [ 'solar array' , 'particle reshaper' , 'cosmic ray' , 'orbiter' , 'bit' ]
message = ''
total = random.randint( 10 , 50 )
for i in range(total):
if not message or random.random() < 0 . 25 :
# 随机的获取一些信息
message = '{0} {1} {2}...' .format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
# 更新Celery任务状态
self .update_state(state= 'PROGRESS' ,
meta={ 'current' : i, 'total' : total,
'status' : message})
time.sleep( 1 )
# 返回字典
return { 'current' : 100 , 'total' : 100 , 'status' : 'Task completed!' ,
'result' : 42 }
def longtask () :
# 异步调用
task = long_task.apply_async()
# 返回 202,与Location头
return jsonify({}), 202 , { 'Location' : url_for( 'taskstatus' ,
task_id=task.id)}
def taskstatus ( task_id ):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING' :
response = {
'state' : task.state,
'current' : 0 ,
'total' : 1 ,
'status' : 'Pending...'
}
elif task.state != 'FAILURE' :
response = {
'state' : task.state,
'current' : task.info. get ( 'current' , 0 ),
'total' : task.info. get ( 'total' , 1 ),
'status' : task.info. get ( 'status' , '' )
}
if 'result' in task.info:
response[ 'result' ] = task.info[ 'result' ]
else :
response = {
'state' : task.state,
'current' : 1 ,
'total' : 1 ,
'status' : str(task.info),
}
return jsonify(response)
classname: 'my-class' ,
id: 'my-id' ,
// 进度条要出现的位置
target: document.getElementById( 'myDivId' )
};
// 初始化进度条对象
var nanobar = new Nanobar( options );
nanobar. go ( 30 ); // 30% 进度条
nanobar. go ( 76 ); // 76% 进度条
// 100% 进度条,进度条结束
nanobar. go ( 100 );
< button id = "start-bg-job" > Start Long Calculation button >< br >< br >
< div id = "progress" >div >
$( function ( ) {
$( '#start-bg-job' ).click(start_long_task);
});
// 请求 longtask 接口
function start_long_task ( ) {
// 添加元素在html中
div = $( '
' );
$( '#progress' ).append(div);
// 创建进度条对象
var nanobar = new Nanobar({
bg : '#44f' ,
target : div[ 0 ].childNodes[ 0 ]
});
// ajax请求longtask
$.ajax({
type : 'POST' ,
url : '/longtask' ,
// 获得数据,从响应头中获取Location
success: function ( data, status, request ) {
status_url = request.getResponseHeader( 'Location' );
// 调用 update_progress() 方法更新进度条
update_progress(status_url, nanobar, div[ 0 ]);
},
error : function ( ) {
alert( 'Unexpected error' );
}
});
}
// 更新进度条
function update_progress ( status_url, nanobar, status_div ) {
// getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/
$.getJSON(status_url, function ( data ) {
// 计算进度
percent = parseInt (data[ 'current' ] * 100 / data[ 'total' ]);
// 更新进度条
nanobar.go(percent);
// 更新文字
$(status_div.childNodes[ 1 ]).text(percent + '%' );
$(status_div.childNodes[ 2 ]).text(data[ 'status' ]);
if (data[ 'state' ] != 'PENDING' && data[ 'state' ] != 'PROGRESS' ) {
if ( 'result' in data) {
// 展示结果
$(status_div.childNodes[ 3 ]).text( 'Result: ' + data[ 'result' ]);
}
else {
// 意料之外的事情发生
$(status_div.childNodes[ 3 ]).text( 'Result: ' + data[ 'state' ]);
}
}
else {
// 2秒后再次运行
setTimeout( function ( ) {
update_progress(status_url, nanobar, status_div);
}, 2000 );
}
});
}
◆
精彩推荐
◆
-
-
5大必知的图算法,附Python代码实现
-
-
如何用爬虫技术帮助孩子秒到心仪的幼儿园(基础篇)
-
-
2019年最新华为、BAT、美团、头条、滴滴面试题目及答案汇总
-
阿里巴巴杨群:高并发场景下Python的性能挑战