Python消息队列:Celery上手

上一篇博文里,实现了一个异步任务的场景:调用web服务之后立刻返回结果,后台则继续执行这个任务。这是我工作中的一个真实需求,当我费劲巴拉把这个功能写完之后,我才了解到有一个现成的工具能够实现这个功能,这就是今天要学习的Celery。

Celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。Celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

生产者和消费者模型是一种设计模式,在这种设计模式中,生产者和消费者分别是任务的发布者和任务的获取者,他们没有直接的关联,之间的交流通过中间人(Broker,也称为消息队列)完成。

在这个过程中,生产者像悬赏榜上的贴告示人一样,将任务发布在消息队列中,任务在任务队列中依次执行后,将结果发送给消费者。在生产环境中,任务队列通常用Redis或RabbitMQ实现。

实际场景

Celery的实际场景在日常生活中经常出现:

例如Web应用中,当用户触发了一个需要长时间执行的操作时(高计算/高IO等会造成阻塞的任务),可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待。

对于用户来说,他点击了执行按钮后,得到了一个任务ID,而程序在后台执行,用户只需要等一段时间,通过任务ID拿到任务的执行结果。

还有一个场景是定时任务:例如需要定时向一些地址发布邮件。

上手代码之前,我发现了大坑,在我的Windows机器上运行Celery程序时,始终报错[ValueError: not enough values to unpack],起初我以为是代码逻辑的问题,最终发现是Celery的最新版暂时不支持Windows,可以使用WSL或者celery -A my_project_name worker --pool=solo -l info执行。后者的话就意味着单线程执行代码。

最简单的案例

首先是一个最简单的案例:我们有个计算的程序,负责将输入的两个数字相加,得到结果。为了模拟高计算量的程序,我们在计算时加上sleep2秒。

现在,我们想让用户在执行该程序时,程序不会因为sleep的两秒而阻塞,而是会在后台执行。这个过程我们放在队列里。要完成这一点,我们要实现以下几个内容:

  1. 我们需要实现本地的一个消息代理Broker,例如Redis
  2. 我们需要实现一个生产者程序,负责生成任务并发送到消息代理。
  3. 需要实现一个消费者程序,负责从消息队列中接收任务并执行它们。

在这个过程中,生产者不负责执行程序,只负责发布任务。

以下是代码的实现:


首先我们用Docker在本地的6379端口启动redis服务,此处不赘述。

消费者程序,我们命名为tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
import time
from celery import Celery

broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'

app = Celery('my_task', broker=broker, backend=backend)

@app.task
def add(x, y):
time.sleep(2) # 模拟耗时操作
return x + y

消费者程序里定义了消息代理和结果后端,两个都是用redis实现的。顾名思义,一个用来连接消息队列,一个用来存储结果。

首先创建了一个Celery实例,名为my_task@app.task是一个装饰器,将被装饰的函数注册为Celery任务。这样这个函数就能被异步调用了。

生产者程序,命名为client.py,负责发布任务。

1
2
3
4
5
from tasks import add

# 异步任务
add.delay(2, 8)
print('hello world')

生产者中,首先导入了来源于消费者的add函数。add函数经过了@app.task的包装,变成了一个Celery任务。这时候我们就可以通过delay方法来异步执行它,并传入两个参数2,8。

当执行异步任务的时候,程序不会干等两秒返回结果,而是马上执行下面的print('hello world')。而add的结果会在后台计算并返回。

如何执行他们呢?首先需要在命令行执行:

1
celery -A tasks worker --pool=solo -l info

这是启动了一个Celery工作进程来监听队列,并执行任务。-A代表应用的模块名来自于tasks.pyworker表示要启动工作进程,loglevel则表示日志级别。

启动后能看到成功连接的日志:

这时我们在另一个命令行执行python client.py。命令行会马上返回hello world。此时程序会在后台执行,可以在Celery进程的后台看到接收和执行的结果。

这样就实现了一个最简单的用例。

app.task装饰器

@app.task是一个装饰器,用于将程序包装成Celery的实例,其中有几个需要注意的点。

  • 如果程序本身有多个装饰器,那么app.task必须在最后一个,也就是最上面的那个装饰器。
  • app.task包含了一个参数bind,意为绑定方法。如果需要访问当前任务请求的信息,或者添加到自定义的任务基类,就需要设置为True。例如:
1
2
3
@app.task(bind=True)
def add(self, x, y):
print(self.request.id)

此时程序的第一个参数必须是任务实例,不然拿不到任务id。

  • app.task装饰器支持为每个任务都设置一个名称,再未设置的情况下,默认为任务模块.任务名称。例如:
1
2
3
@app.task(name='tasks.add') # 不显式设置的话也为task.add
def add(x, y):
return x + y
  • app.task装饰器支持retry,这意味着当报错的时候,可以使用实例的retry方法,例如:
1
2
3
4
5
6
7
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)

或者一种更方便的方法:

1
2
3
4
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)

Delay方法

Cellery提供的delay方法是异步执行的一个接口,它是另外一个接口apply_async的封装。执行后它们会返回一个AsyncResult的实例,这个实例用来跟踪任务的状态,backend就是用来存储这个的。

结果的获取

我们可以在上面的代码里直接获取结果以及任务相关的信息。如下:

1
2
3
4
5
6
7
8
9
10
from tasks import add

# 异步任务
res = add.delay(2, 8)
print('hello world')

res.get(timeout=1) # 10,如果出现报错会将调用栈返回
res.id # 获取任务id
res.get(propagate=False) # 10,但是不返回报错信息
res.state # 任务状态,包含PENDING/STARTED/SUCCESS/FAILURE等

我们在这里直接获取结果,实际上有点像顺序执行。如果我们拿到了任务id,需要有另外一个服务去查看任务状态要怎么做呢?

1
2
3
from tasks import app # 先导入Celery实例

res = app.AsyncResult('given-task-id') # 这时候就可以和上面一样获取任务结果了

构建链

和Langchain一样,Celery也支持链式调用。例如如果需要在一个任务返回后调用另外一个任务。这时就涉及到签名。所谓签名,就是将一个任务的执行选项和参数进行打包,例如:

1
2
3
add.signature((2, 2), countdown=10) # 为add任务增加了2,2的参数,和倒计时10秒的执行选项

add.s(2, 2) # 简写

对于上面这个签名,也可以直接执行:

1
2
3
s1 = add.s(2, 2)
res = s1.delay()
res.get()

如果使用链的话是这样的:

1
2
3
4
5
from celery import chain
from tasks import add, multiply

# (4 + 4) * 8
chain(add.s(4,4) | multiply.s(8))().get()

路由

Celery支持路由,也就是根据名称将结果发到不同队列:

1
2
3
4
5
app.conf.update(
task_routes = {
'tasks.add': {'queue': 'add_queue'},
},
)

在执行时,在apply_async方法中加入queue参数:

1
2
from tasks import add
add.apply_async((2, 2), queue='add_queue')

并在执行时使用-Q来选择队列:

1
celery -A tasks worker -Q add_queue

读取配置文件

在上面的程序中,Broker和Backend的配置写在程序中,但是也可以写成配置文件,用appconfig_from_object方法来加载配置。注意配置文件需要和启动文件放在同一路径下。例如:

在项目路径下创建celery_config.py,内容为:

1
2
3
4
5
6
7
8
9
10
from datetime import timedelta
from celery.schedules import crontab

broker_url = 'redis://127.0.0.1:6379' # 指定 Broker
result_backend = 'redis://127.0.0.1:6379/0' # 指定 Backend
broker_connection_retry_on_startup = True

imports = ( # 指定导入的任务模块
'tasks',
)

相应的,tasks.py也要修改一下,修改后内容如下:

1
2
3
4
5
6
7
8
9
10
import time
from celery import Celery

app = Celery('demo') # Celery实例的名称
app.config_from_object('celery_config')

@app.task
def add(x, y):
time.sleep(2) # 模拟耗时操作
return x + y

原先定义地址和app都写在task.py中,现在只要在task.py中直接加载配置文件就可以了。

2024/5/26 于苏州