Skip to main content
 首页 » 编程设计

rabbitmq之重试丢失或失败的任务(Celery、Django 和 RabbitMQ)

2025年05月04日239shihaiming

有没有办法确定是否有任何任务丢失并重试?

我认为丢失的原因可能是调度程序错误或工作线程崩溃。

我打算重试它们,但我不确定如何确定哪些任务需要退出?

以及如何自动进行此过程?我可以使用自己的自定义调度程序来创建新任务吗?

编辑:我从文档中发现 RabbitMQ 从不丢失任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

请您参考如下方法:

你需要的是设置

CELERY_ACKS_LATE = 真

Late ack 表示任务执行完毕后会确认任务消息,
不只是之前,这是默认行为。
这样如果worker崩溃了rabbit MQ还是会有消息的。

显然,在同一时间完全崩溃(Rabbit + workers)没有办法恢复任务,除非您在任务开始和任务结束时实现日志记录。
就我个人而言,每次任务开始时我都会在 mongodb 中写一行,当任务完成时再写一行(独立于结果),这样我可以通过分析 mongo 日志来知道哪个任务被中断。

您可以通过重写 __call__ 方法轻松地做到这一点。和 after_return celery 基任务类的。

接下来您会看到我的一段代码,它使用 taskLogger 类作为上下文管理器(带有入口和导出点)。
taskLogger 类只是在 mongodb 实例中写入包含任务信息的行。

def __call__(self, *args, **kwargs): 
    """In celery task this function call the run method, here you can 
    set some environment variable before the run of the task""" 
 
    #Inizialize context managers     
 
    self.taskLogger = TaskLogger(args, kwargs) 
    self.taskLogger.__enter__() 
 
    return self.run(*args, **kwargs) 
 
def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    #exit point for context managers 
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo) 

我希望这会有所帮助