Skip to main content
 首页 » 编程设计

flask之Redis后台作业完成后如何返回flask render_template

2025年12月25日23Free-Thinker

我在 flask 中有这个网络应用程序,我想在提交表单后执行一些机器学习和人工智能算法。我在 Redis 和 rq 的帮助下在后台作业中运行 ML 和 AI 算法(因为我的应用程序由 Heroku 托管,并且他们有超时功能,您必须在 30 秒内返回响应)。工作完成后,我想获取算法生成的图像(一些图表)并将它们输出到网页中,但我不知道如何在工作功能中渲染模板,并从 flask 导入应用程序应用程序来做到这一点似乎不起作用。您对如何解决这个问题有什么想法吗?

来自 Flask 应用程序的将作业排入队列的代码片段:

def upload(): 
    from mlsalespred import run_model 
    file = request.files['file'] 
    dffile = pd.read_csv(file) 
    job = q.enqueue(run_model, dffile) 
    return render_template("waiting.html") 

我的工作职能代码片段:

def run_model(dataFrame): 
    - - - - - - - - - - - 
    - - some ml stuff - - 
    - - - - - - - - - - - 
    return render_template("uploaded.html", sales_fig = sales_fig.decode('utf8'), diff_fig = diff_fig.decode('utf8'), pred_fig = pred_fig.decode('utf8') ) 
 

提前致谢

请您参考如下方法:

一个基本但可行的解决方案 (gist) :

您可以通过从将作业排队的路由进行重定向,然后使用元标记定期刷新该页面来实现此目的。首先导入所需的库:

from flask import Flask, redirect, url_for, render_template_string 
app = Flask(__name__) 
 
from time import sleep 
 
from rq import Queue 
from rq.job import Job 
from redis import Redis 

设置rq相关连接,并定义要运行的函数:

r = Redis(host='redisserver') 
q = Queue(connection=r) 
 
def slow_func(data): 
    sleep(5) 
    return 'Processed %s' % (data,) 

然后定义一个可以每5秒刷新一次页面的模板:

template_str='''<html> 
    <head> 
      {% if refresh %} 
        <meta http-equiv="refresh" content="5"> 
      {% endif %} 
    </head> 
    <body>{{result}}</body> 
    </html>''' 

我们还将使用 Flask render_template_string 创建一个辅助函数来返回插入了变量的模板。请注意,如果未提供,则刷新默认为 False:

def get_template(data, refresh=False): 
    return render_template_string(template_str, result=data, refresh=refresh) 

现在创建一条路由,将我们的函数排入队列,获取其 rq job-id,然后使用该 id 返回重定向到 result View 。这仅需要 URL 字符串中的输入,但可以从任何地方获取:

@app.route('/process/<string:data>') 
def process(data): 
    job = q.enqueue(slow_func, data) 
    return redirect(url_for('result', id=job.id)) 

现在让我们在 rq.Job 对象的帮助下处理实际结果。这里的逻辑可以调整,因为这将导致页面刷新除“finished”之外的所有值:

@app.route('/result/<string:id>') 
def result(id): 
    job = Job.fetch(id, connection=r) 
    status = job.get_status() 
    if status in ['queued', 'started', 'deferred', 'failed']: 
        return get_template(status, refresh=True) 
    elif status == 'finished': 
        result = job.result  
        # If this is a string, we can simply return it: 
        return get_template(result) 

如果状态为“finished”,则job.result将包含slow_func的返回值,因此我们将其呈现在页面上.

此方法的缺点是在等待作业完成时会向服务器发出多个请求。元刷新标签可能有点不传统。如果您从 Javascript 发送更新请求,则有 solutions它可以按一定时间间隔发送 AJAX 请求,尽管这会遇到相同的多请求问题。

另一种方法是使用 websockets 或 SSE 在作业完成后立即将已完成作业的结果流式传输到前端。

更新:2021 年 2 月 27 日

我决定尝试使用 SSE 方法来更新前端的作业状态。我了解到,rq 本身支持更新作业中的 meta 属性,方法是在作业中导入 rq.get_current_job,然后可以作业刷新后可从外部访问。

查看演示代码:

带有进度条的基本示例 (gist) :