2种方式解决Python执行卡顿问题

系统 1677 0

2种方式解决Python执行卡顿问题_第1张图片

转载:hackpython

简介

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

怎么实现呢?

使用线程的方式

当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。

通过 ThreadPoolExecutor 来实现

              
                from flask import Flask	
from time import sleep	
from concurrent.futures import ThreadPoolExecutor	
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor	
# 创建线程池执行器	
executor = ThreadPoolExecutor(2)	
app = Flask(__name__)	
@app.route('/jobs')	
def run_jobs():	
    # 交由线程去执行耗时任务	
    executor.submit(long_task, 'hello', 123)	
    return 'long task running.'	
# 耗时任务	
def long_task(arg1, arg2):	
    print("args: %s %s!" % (arg1, arg2))	
    sleep(5)	
    print("Task is done!")	
if __name__ == '__main__':	
    app.run()
              
            

当要执行一些比较简单的耗时任务时就可以使用这种方式,如发邮件、发短信验证码等。

但这种方式有个问题,就是前端无法得知任务执行状态。

如果想要前端知道,就需要设计一些逻辑,比如将任务执行状态存储到 redis 中,通过唯一的任务 id 进行标识,然后再写一个接口,通过任务 id 去获取任务的状态,然后让前端定时去请求该接口,从而获得任务状态信息。

全部自己实现就显得有些麻烦了,而 Celery 刚好实现了这样的逻辑,来使用一下。

使用 Celery

为了满足前端可以获得任务状态的需求,可以使用 Celery。

Celery 是实时任务处理与调度的分布式任务队列,它常用于 web 异步任务、定时任务等,后面单独写一篇文章描述 Celery 的架构,这里不深入讨论。

现在我想让前端可以通过一个进度条来判断后端任务的执行情况。使用 Celery 就很容易实现,首先通过 pip 安装 Celery 与 redis,之所以要安装 redis,是因为让 Celery 选择 redis 作为「消息代理 / 消息中间件」。

              
                pip install celery	
pip install redis
              
            

在 Flask 中使用 Celery 其实很简单,这里先简单的过一下 Flask 中使用 Celery 的整体流程,然后再去实现具体的项目

  • 1. 在 Flask 中初始化 Celery

              
                from flask import Flask	
from celery import Celery	
app = Flask(__name__)	
# 配置	
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL	
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'	
# 要存储 Celery 任务的状态或运行结果时就必须要配置	
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'	
# 初始化Celery	
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])	
# 将Flask中的配置直接传递给Celery	
celery.conf.update(app.config)
              
            

上述代码中,通过 Celery 类初始化 celery 对象,传入的应用名称与消息代理的连接 URL。

  • 2. 通过 celery.task 装饰器装饰耗时任务对应的函数

              
                @celery.task	
def long_task(arg1, arg2):	
    # 耗时任务的逻辑	
    return result
              
            
  • 3.Flask 中定义接口通过异步的方式执行耗时任务

              
                @app.route('/', methods=['GET', 'POST'])	
def index():	
    task = long_task.delay(1, 2)
              
            

delay () 方法是 apply async () 方法的快捷方式,apply async () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行

              
                @app.route('/', methods=['GET', 'POST'])	
def index():	
    task = long_task.apply_async(args=[1, 2], countdown=60)
              
            

delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。

通过这 3 步就可以使用 Celery 了。

接着就具体来实现「让前端可以通过一个进度条来判断后端任务的执行情况」的需求。

              
                # bind为True,会传入self给被装饰的方法	
@celery.task(bind=True)	
def long_task(self):	
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']	
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']	
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']	
    message = ''	
    total = random.randint(10, 50)	
    for i in range(total):	
        if not message or random.random() < 0.25:	
            # 随机的获取一些信息	
            message = '{0} {1} {2}...'.format(random.choice(verb),	
                                              random.choice(adjective),	
                                              random.choice(noun))	
        # 更新Celery任务状态	
        self.update_state(state='PROGRESS',	
                          meta={'current': i, 'total': total,	
                                'status': message})	
        time.sleep(1)	
    # 返回字典	
    return {'current': 100, 'total': 100, 'status': 'Task completed!',	
            'result': 42}
              
            

上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。

然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。

每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。

定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法

              
                @app.route('/longtask', methods=['POST'])	
def longtask():	
    # 异步调用	
    task = long_task.apply_async()	
    # 返回 202,与Location头	
    return jsonify({}), 202, {'Location': url_for('taskstatus',	
                                                  task_id=task.id)}
              
            

简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。

返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。

前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。

              
                @app.route('/status/
                
                  ')	
def taskstatus(task_id):	
    task = long_task.AsyncResult(task_id)	
    if task.state == 'PENDING': # 在等待	
        response = {	
            'state': task.state,	
            'current': 0,	
            'total': 1,	
            'status': 'Pending...'	
        }	
    elif task.state != 'FAILURE': # 没有失败	
        response = {	
            'state': task.state, # 状态	
            # meta中的数据,通过task.info.get()可以获得	
            'current': task.info.get('current', 0), # 当前循环进度	
            'total': task.info.get('total', 1), # 总循环进度	
            'status': task.info.get('status', '')	
        }	
        if 'result' in task.info: 	
            response['result'] = task.info['result']	
    else:	
        # 后端执行任务出现了一些问题	
        response = {	
            'state': task.state,	
            'current': 1,	
            'total': 1,	
            'status': str(task.info),  # 报错的具体异常	
        }	
    return jsonify(response)
                
              
            

为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。

该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。

如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。

这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下:

              
                var options = {	
    classname: 'my-class',	
  id: 'my-id',	
  // 进度条要出现的位置	
    target: document.getElementById('myDivId')	
};	
// 初始化进度条对象	
var nanobar = new Nanobar( options );	
nanobar.go( 30 ); // 30% 进度条	
nanobar.go( 76 ); // 76% 进度条	
// 100% 进度条,进度条结束	
nanobar.go(100);
              
            

有了 nanobar.js 就非常简单了。

先定义一个简单的 HTML 界面

              
                

Long running task with progress updates



通过 JavaScript 实现对后台的请求

              
                // 按钮点击事件	
$(function() {	
       $('#start-bg-job').click(start_long_task);	
   });	
// 请求 longtask 接口	
function start_long_task() {	
            // 添加元素在html中	
            div = $('
                
0%
...
 

'); $('#progress').append(div); // 创建进度条对象 var nanobar = new Nanobar({ bg: '#44f', target: div[0].childNodes[0] }); // ajax请求longtask $.ajax({ type: 'POST', url: '/longtask', // 获得数据,从响应头中获取Location success: function(data, status, request) { status_url = request.getResponseHeader('Location'); // 调用 update_progress() 方法更新进度条 update_progress(status_url, nanobar, div[0]); }, error: function() { alert('Unexpected error'); } }); } // 更新进度条 function update_progress(status_url, nanobar, status_div) { // getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/ 」 $.getJSON(status_url, function(data) { // 计算进度 percent = parseInt(data['current'] * 100 / data['total']); // 更新进度条 nanobar.go(percent); // 更新文字 $(status_div.childNodes[1]).text(percent + '%'); $(status_div.childNodes[2]).text(data['status']); if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') { if ('result' in data) { // 展示结果 $(status_div.childNodes[3]).text('Result: ' + data['result']); } else { // 意料之外的事情发生 $(status_div.childNodes[3]).text('Result: ' + data['state']); } } else { // 2秒后再次运行 setTimeout(function() { update_progress(status_url, nanobar, status_div); }, 2000); } }); }

可以通过注释阅读代码整体逻辑。

至此,需求实现完了,运行一下。

首先运行 Redis

              
                redis-server
              
            

然后运行 celery

              
                celery worker -A app.celery --loglevel=info
              
            

最后运行 Flask 项目

              
                python app.py
              
            

效果如下:

2种方式解决Python执行卡顿问题_第2张图片


Flask 异步运行任务的常见方式就介绍完了,因为本人在开发一个用于自动生成字幕的小玩具,其中视频上传以及字幕生成都是耗时任务,所以就单独写一篇文章来介绍一下这部分的内容,后面会将小玩具的代码开源让大家学习一下,先一睹其真容:

2种方式解决Python执行卡顿问题_第3张图片

往期阅读


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论