Python消息队列:Celery上手
上一篇博文里,实现了一个异步任务的场景:调用web服务之后立刻返回结果,后台则继续执行这个任务。这是我工作中的一个真实需求,当我费劲巴拉把这个功能写完之后,我才了解到有一个现成的工具能够实现这个功能,这就是今天要学习的Celery。
Celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。Celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。
生产者和消费者模型是一种设计模式,在这种设计模式中,生产者和消费者分别是任务的发布者和任务的获取者,他们没有直接的关联,之间的交流通过中间人(Broker,也称为消息队列)完成。
在这个过程中,生产者像悬赏榜上的贴告示人一样,将任务发布在消息队列中,任务在任务队列中依次执行后,将结果发送给消费者。在生产环境中,任务队列通常用Redis或RabbitMQ实现。
实际场景
Celery的实际场景在日常生活中经常出现:
例如Web应用中,当用户触发了一个需要长时间执行的操作时(高计算/高IO等会造成阻塞的任务),可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待。
对于用户来说,他点击了执行按钮后,得到了一个任务ID,而程序在后台执行,用户只需要等一段时间,通过任务ID拿到任务的执行结果。
还有一个场景是定时任务:例如需要定时向一些地址发布邮件。
上手代码之前,我发现了大坑,在我的Windows机器上运行Celery程序时,始终报错
[ValueError: not enough values to unpack]
,起初我以为是代码逻辑的问题,最终发现是Celery的最新版暂时不支持Windows,可以使用WSL或者celery -A my_project_name worker --pool=solo -l info
执行。后者的话就意味着单线程执行代码。
最简单的案例
首先是一个最简单的案例:我们有个计算的程序,负责将输入的两个数字相加,得到结果。为了模拟高计算量的程序,我们在计算时加上sleep2秒。
现在,我们想让用户在执行该程序时,程序不会因为sleep的两秒而阻塞,而是会在后台执行。这个过程我们放在队列里。要完成这一点,我们要实现以下几个内容:
- 我们需要实现本地的一个消息代理Broker,例如Redis
- 我们需要实现一个生产者程序,负责生成任务并发送到消息代理。
- 需要实现一个消费者程序,负责从消息队列中接收任务并执行它们。
在这个过程中,生产者不负责执行程序,只负责发布任务。
以下是代码的实现:
首先我们用Docker在本地的6379端口启动redis服务,此处不赘述。
消费者程序,我们命名为tasks.py
。
1 |
|
消费者程序里定义了消息代理和结果后端,两个都是用redis
实现的。顾名思义,一个用来连接消息队列,一个用来存储结果。
首先创建了一个Celery实例,名为my_task
。@app.task
是一个装饰器,将被装饰的函数注册为Celery任务。这样这个函数就能被异步调用了。
生产者程序,命名为client.py
,负责发布任务。
1 |
|
生产者中,首先导入了来源于消费者的add
函数。add
函数经过了@app.task
的包装,变成了一个Celery
任务。这时候我们就可以通过delay
方法来异步执行它,并传入两个参数2,8。
当执行异步任务的时候,程序不会干等两秒返回结果,而是马上执行下面的print('hello world')
。而add
的结果会在后台计算并返回。
如何执行他们呢?首先需要在命令行执行:
1 |
|
这是启动了一个Celery工作进程来监听队列,并执行任务。-A
代表应用的模块名来自于tasks.py
,worker
表示要启动工作进程,loglevel
则表示日志级别。
启动后能看到成功连接的日志:
这时我们在另一个命令行执行python client.py
。命令行会马上返回hello world
。此时程序会在后台执行,可以在Celery进程的后台看到接收和执行的结果。
这样就实现了一个最简单的用例。
app.task装饰器
@app.task
是一个装饰器,用于将程序包装成Celery的实例,其中有几个需要注意的点。
- 如果程序本身有多个装饰器,那么
app.task
必须在最后一个,也就是最上面的那个装饰器。 app.task
包含了一个参数bind
,意为绑定方法。如果需要访问当前任务请求的信息,或者添加到自定义的任务基类,就需要设置为True。例如:
1 |
|
此时程序的第一个参数必须是任务实例,不然拿不到任务id。
app.task
装饰器支持为每个任务都设置一个名称,再未设置的情况下,默认为任务模块.任务名称
。例如:
1 |
|
app.task
装饰器支持retry
,这意味着当报错的时候,可以使用实例的retry
方法,例如:
1 |
|
或者一种更方便的方法:
1 |
|
Delay方法
Cellery提供的delay
方法是异步执行的一个接口,它是另外一个接口apply_async
的封装。执行后它们会返回一个AsyncResult
的实例,这个实例用来跟踪任务的状态,backend就是用来存储这个的。
结果的获取
我们可以在上面的代码里直接获取结果以及任务相关的信息。如下:
1 |
|
我们在这里直接获取结果,实际上有点像顺序执行。如果我们拿到了任务id,需要有另外一个服务去查看任务状态要怎么做呢?
1 |
|
构建链
和Langchain一样,Celery也支持链式调用。例如如果需要在一个任务返回后调用另外一个任务。这时就涉及到签名。所谓签名,就是将一个任务的执行选项和参数进行打包,例如:
1 |
|
对于上面这个签名,也可以直接执行:
1 |
|
如果使用链的话是这样的:
1 |
|
路由
Celery支持路由,也就是根据名称将结果发到不同队列:
1 |
|
在执行时,在apply_async
方法中加入queue
参数:
1 |
|
并在执行时使用-Q
来选择队列:
1 |
|
读取配置文件
在上面的程序中,Broker和Backend的配置写在程序中,但是也可以写成配置文件,用app
的config_from_object
方法来加载配置。注意配置文件需要和启动文件放在同一路径下。例如:
在项目路径下创建celery_config.py
,内容为:
1 |
|
相应的,tasks.py
也要修改一下,修改后内容如下:
1 |
|
原先定义地址和app都写在task.py
中,现在只要在task.py
中直接加载配置文件就可以了。
2024/5/26 于苏州