Python Web:Flask异步执行任务

系统 1869 0
原文链接: https://www.tinymind.cn/competitions/48
(▲由Python大本营付费下载自视觉中国)
作者 | ayuliao  
出自 | hackpython(ID:hackpython)

简介

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

怎么实现呢?

使用线程的方式

当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。

通过 ThreadPoolExecutor 来实现

              
from  flask  import  Flask
from  time  import  sleep
from  concurrent.futures  import  ThreadPoolExecutor


# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 创建线程池执行器
executor = ThreadPoolExecutor( 2 )


app = Flask(__name__)


@app.route('/jobs')
def   run_jobs () :
     # 交由线程去执行耗时任务
    executor.submit(long_task,  'hello' 123 )
     return   'long task running.'


# 耗时任务
def   long_task (arg1, arg2) :
    print( "args: %s %s!"  % (arg1, arg2))
    sleep( 5 )
    print( "Task is done!" )


if  __name__ ==  '__main__' :
    app.run()
当要执行一些比较简单的耗时任务时就可以使用这种方式,如发邮件、发短信验证码等。
但这种方式有个问题,就是前端无法得知任务执行状态。
如果想要前端知道,就需要设计一些逻辑,比如将任务执行状态存储到 redis 中,通过唯一的任务 id 进行标识,然后再写一个接口,通过任务 id 去获取任务的状态,然后让前端定时去请求该接口,从而获得任务状态信息。
全部自己实现就显得有些麻烦了,而 Celery 刚好实现了这样的逻辑,来使用一下。
使用 Celery
为了满足前端可以获得任务状态的需求,可以使用 Celery。
Celery 是实时任务处理与调度的分布式任务队列,它常用于 web 异步任务、定时任务等,后面单独写一篇文章描述 Celery 的架构,这里不深入讨论。
现在我想让前端可以通过一个进度条来判断后端任务的执行情况。使用 Celery 就很容易实现,首先通过 pip 安装 Celery 与 redis,之所以要安装 redis,是因为让 Celery 选择 redis 作为「消息代理 / 消息中间件」。
              
pip  install  celery
pip  install  redis
在 Flask 中使用 Celery 其实很简单,这里先简单的过一下 Flask 中使用 Celery 的整体流程,然后再去实现具体的项目
1.在 Flask 中初始化 Celery
              
from  flask  import  Flask
from  celery  import  Celery


app = Flask(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config[ 'CELERY_BROKER_URL' ] =  'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config[ 'CELERY_RESULT_BACKEND' ] =  'redis://localhost:6379/0'


# 初始化Celery
celery = Celery(app.name, broker=app.config[ 'CELERY_BROKER_URL' ])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)
上述代码中,通过 Celery 类初始化 celery 对象,传入的应用名称与消息代理的连接 URL。
2.通过 celery.task 装饰器装饰耗时任务对应的函数
              
@celery.task
def   long_task (arg1, arg2) :
     # 耗时任务的逻辑
     return  result
3.Flask 中定义接口通过异步的方式执行耗时任务
              
@app.route( '/' , methods=[ 'GET' 'POST' ])
def   index () :
    task = long_task.delay( 1 2 )
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行

@app.route( '/' , methods=[ 'GET' 'POST' ])
def   index () :
    task = long_task.apply_async(args=[ 1 2 ], countdown= 60 )
delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
通过这 3 步就可以使用 Celery 了。
接着就具体来实现「让前端可以通过一个进度条来判断后端任务的执行情况」的需求。
              
# bind为True,会传入self给被装饰的方法
@celery.task(bind=True)
def   long_task ( self ) :
    verb = [ 'Starting up' 'Booting' 'Repairing' 'Loading' 'Checking' ]
    adjective = [ 'master' 'radiant' 'silent' 'harmonic' 'fast' ]
    noun = [ 'solar array' 'particle reshaper' 'cosmic ray' 'orbiter' 'bit' ]


    message =  ''
    total = random.randint( 10 50 )


     for  i  in  range(total):
         if   not  message  or  random.random() <  0 . 25 :
             # 随机的获取一些信息
            message =  '{0} {1} {2}...' .format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
         # 更新Celery任务状态
         self .update_state(state= 'PROGRESS' ,
                          meta={ 'current' : i,  'total' : total,
                                 'status' : message})
        time.sleep( 1 )
     # 返回字典
     return  { 'current' 100 'total' 100 'status' 'Task completed!' ,
             'result' 42 }
上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。
然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。
每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。
定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法
              
@app.route( '/longtask' , methods=[ 'POST' ])
def   longtask () :
     # 异步调用
    task = long_task.apply_async()
     # 返回 202,与Location头
     return  jsonify({}),  202 , { 'Location' : url_for( 'taskstatus' ,
                                                  task_id=task.id)}
简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。
返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。
前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。
              
@app.route( '/status/ ' )
def  taskstatus ( task_id ):
    task 
= long_task.AsyncResult(task_id)
     if  task.state ==  'PENDING' # 在等待
        response = {
             'state' : task.state,
             'current' 0 ,
             'total' 1 ,
             'status' 'Pending...'
        }
    elif task.state !=  'FAILURE' # 没有失败
        response = {
             'state' : task.state,  # 状态
             # meta中的数据,通过task.info.get()可以获得
             'current' : task.info. get ( 'current' 0 ),  # 当前循环进度
             'total' : task.info. get ( 'total' 1 ),  # 总循环进度
             'status' : task.info. get ( 'status' '' )
        }
         if   'result'   in  task.info:
            response[ 'result' ] = task.info[ 'result' ]
     else :
         # 后端执行任务出现了一些问题
        response = {
             'state' : task.state,
             'current' 1 ,
             'total' 1 ,
             'status' : str(task.info),   # 报错的具体异常
        }
     return  jsonify(response)
为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。
该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。
如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。
这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下:
              
var  options = {
    classname:  'my-class' ,
  id:  'my-id' ,
   // 进度条要出现的位置
    target: document.getElementById( 'myDivId' )
};


// 初始化进度条对象
var  nanobar =  new  Nanobar( options );


nanobar. go 30  );  // 30% 进度条
nanobar. go 76  );  // 76% 进度条


// 100% 进度条,进度条结束
nanobar. go ( 100 );
有了 nanobar.js 就非常简单了。
先定义一个简单的 HTML 界面
              
< h2 > Long running task with progress updates h2 >
< button   id = "start-bg-job" > Start Long Calculation button >< br >< br >
< div   id = "progress" >div >
通过 JavaScript 实现对后台的请求
              
// 按钮点击事件
$( function ( {
       $( '#start-bg-job' ).click(start_long_task);
   });


// 请求 longtask 接口
function   start_long_task ( {
             // 添加元素在html中
            div = $( '
0%
...
 

'
);
            $( '#progress' ).append(div);


             // 创建进度条对象
             var  nanobar =  new  Nanobar({
                 bg '#44f' ,
                 target : div[ 0 ].childNodes[ 0 ]
            });


             // ajax请求longtask
            $.ajax({
                 type 'POST' ,
                 url '/longtask' ,
                 // 获得数据,从响应头中获取Location
                success:  function ( data, status, request {
                    status_url = request.getResponseHeader( 'Location' );
                     // 调用 update_progress() 方法更新进度条
                    update_progress(status_url, nanobar, div[ 0 ]);
                },
                 error function ( {
                    alert( 'Unexpected error' );
                }
            });
        }


// 更新进度条
function   update_progress ( status_url, nanobar, status_div {
             // getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/
            $.getJSON(status_url,  function ( data {
                 // 计算进度
                percent =  parseInt (data[ 'current' ] *  100  / data[ 'total' ]);
                 // 更新进度条
                nanobar.go(percent);


                 // 更新文字
                $(status_div.childNodes[ 1 ]).text(percent +  '%' );
                $(status_div.childNodes[ 2 ]).text(data[ 'status' ]);
                 if  (data[ 'state' ] !=  'PENDING'  && data[ 'state' ] !=  'PROGRESS' ) {
                     if  ( 'result'   in  data) {
                         // 展示结果
                        $(status_div.childNodes[ 3 ]).text( 'Result: '  + data[ 'result' ]);
                    }
                     else  {
                         // 意料之外的事情发生
                        $(status_div.childNodes[ 3 ]).text( 'Result: '  + data[ 'state' ]);
                    }
                }
                 else  {
                     // 2秒后再次运行
                    setTimeout( function ( {
                        update_progress(status_url, nanobar, status_div);
                    },  2000 );
                }
            }); 
        }       
可以通过注释阅读代码整体逻辑。
至此,需求实现完了,运行一下。
首先运行 Redis
              
redis- server
然后运行 celery
              
celery  worker -A app.celery --loglevel= info
最后运行 Flask 项目
              
python   app .py
效果如下:
Flask 异步运行任务的常见方式就介绍完了,因为本人在开发一个用于自动生成字幕的小玩具,其中视频上传以及字幕生成都是耗时任务,所以就单独写一篇文章来介绍一下这部分的内容,后面会将小玩具的代码开源让大家学习一下,先一睹其真容:
(*本文为Python大本营转载文章,转载请联系原作者)

精彩推荐


【结果提交倒计时】PV,UV流量预测算法大赛,结果提交截止时间为 9月20日 ,还没有提交的小伙伴抓紧时间了~~9月25日公布初赛成绩。 最新排行榜请点击 阅读原文 查看。
推荐阅读
  • 5大必知的图算法,附Python代码实现

  • 如何用爬虫技术帮助孩子秒到心仪的幼儿园(基础篇)

  • 2019年最新华为、BAT、美团、头条、滴滴面试题目及答案汇总

  • 阿里巴巴杨群:高并发场景下Python的性能挑战


你点的每个“在看”,我都认真当成了喜欢



更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论