python测试开发django-160.Celery 定时任务 (beat)
前言
Celery 可以异步执行,也可以通过定时任务触发
环境准备
这里用redis作为中间件,django使用的版本是v2.1.2
安装django需要用到的第三方包,注意版本号
pip install celery==3.1.26.post2
pip install django-celery==3.3.1
pip install redis==2.10.6
详细的基础教程参考前面的https://www.cnblogs.com/yoyoketang/p/15422804.html。
本篇主要讲定时任务如何实现,下图中的Celery beat 定时任务
celery 的5个角色
Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。
任务的消费者是Worker。
Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat 定时任务调度器,根据配置定时将任务发送给Broker。
Backend 用于存储任务的执行结果。
Django 中使用 Celery
要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)
如果你有一个现代的 Django 项目布局,比如:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
其中debug_task是测试的任务,可以注掉
# @app.task(bind=True)
# def debug_task(self):
# print('Request: {0!r}'.format(self.request))
上面一段只需改这句,’proj’是自己django项目的app名称
app = Celery('proj')
然后你需要在你的proj/proj/__init__.py
模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便 @shared_task 装饰器(稍后提到)将使用它:
proj/proj/__init__.py
:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
上面这段固定的,不用改
tasks任务
在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
print("task----------111111----------")
return x + y
@shared_task
def mul(x, y):
print("task----------22222----------")
return x * y
tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器
添加setting配置
setting.py添加配置
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# celery 配置连接redis
BROKER_URL = 'redis://192.168.1.1:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.1.1:6379'
# 配置定时任务
from celery.schedules import crontab
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'yoyo(django app名称).tasks.add', # 任务
'schedule': timedelta(seconds=5), # 每5秒执行add函数
'args': (11, 12) # 运行参数
},
'mul': {
'task': 'yoyo(django app名称).tasks.mul', # 任务
'schedule': timedelta(seconds=10), # 每10秒执行mul函数
'args': (11, 2) # 运行参数
}
}
CELERYBEAT_SCHEDULE 是配置定时任务,可以添加多个任务,任务名称可以与tasks中的函数名称保持一致,也可以自己定义一个任务名称。
task 参数是对应app目录下的tasks文件中任务函数名称
schedule 运行周期,支持contrab表达式
args 运行任务时候带上的参数
启动worker 和beat服务
启动worker,执行任务
celery -A MyDjango(django 项目名称) worker -l info
运行日志
D:\202107django\MyDjango>celery -A MyDjango worker -l info
-------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: yoyo:0x219342ff978
- ** ---------- .> transport: redis://192.168.1.1:6379//
- ** ---------- .> results: redis://192.168.1.1:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. yoyo.tasks.add
. yoyo.tasks.mul
[2021-10-21 12:20:32,079: INFO/MainProcess] Connected to redis://192.168.1.1:6379//
[2021-10-21 12:20:32,167: INFO/MainProcess] mingle: searching for neighbors
[2021-10-21 12:20:33,700: INFO/MainProcess] mingle: all alone
启动beat 定时任务监听
celery -A MyDjango(django 项目名称) beat -l info
启动日志
MyDjango>celery -A MyDjango beat -l info
celery beat v3.1.26.post2 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> redis://192.168.1.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2021-10-21 12:22:56,867: INFO/MainProcess] beat: Starting...
启动完成后,会看到beat运行日志,定时任务已经推过去
worker运行日志,执行任务
crontab 周期任务
前面是设置每多少秒执行任务,这个只是测试下功能,任务很简单,我们一般用crontab 实现周期性任务,比如每周1-5早上执行一遍任务,用crontab 可以轻松实现
# crontab任务
# 每周一8:30调用task.add
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 8:30 A.M
'add': {
'task': 'yoyo(django app名称).tasks.add', # 任务
'schedule': crontab(hour=8, minute=30, day_of_week=1),
'args': (11, 12) # 运行参数
}
}
crontab定时任务命令规则:
分 | 时 | 天 | 月 | 星期 | 命令 | 路径 |
---|---|---|---|---|---|---|
minute | hour | day | month | week | command | path |
* | * | * | * | * | command | path |
minute:
表示分钟,可以是从0到59之间的任何整数。
hour:
表示小时,可以是从0到23之间的任何整数。
day:
表示日期,可以是从1到31之间的任何整数。
month:
表示月份,可以是从1到12之间的任何整数。
week:
表示星期几,可以是从0到7之间的任何整数,这里的0或7代表星期日。
command:
要执行的命令,可以是系统命令,也可以是自己编写的脚本文件。
path:
需执行的文件,用绝对路径
crontab命令常用的特殊字符
符号 | 说明 |
---|---|
* | 表示任何时刻 |
, | 表示分割 |
- | 表示一个段,如第二段里:1-5,就表示1到5点 |
/n | 表示每个n的单位执行一次,如第二段里,*/1, 就表示每隔1个小时执行一次命令。也可以写成1-23/1 |
定时任务如果做成可配置,存到数据库,可以用到 djcelery 实现