python测试开发django-160.Celery 定时任务 (beat)

前言

Celery 可以异步执行,也可以通过定时任务触发

环境准备

这里用redis作为中间件,django使用的版本是v2.1.2
安装django需要用到的第三方包,注意版本号

pip install celery==3.1.26.post2
pip install django-celery==3.3.1
pip install redis==2.10.6

详细的基础教程参考前面的https://www.cnblogs.com/yoyoketang/p/15422804.html。
本篇主要讲定时任务如何实现,下图中的Celery beat 定时任务

celery 的5个角色

  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)

  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。

    任务的消费者是Worker。

    Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

  • Beat 定时任务调度器,根据配置定时将任务发送给Broker。

  • Backend 用于存储任务的执行结果。

Django 中使用 Celery

要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)

如果你有一个现代的 Django 项目布局,比如:

- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py

那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

其中debug_task是测试的任务,可以注掉

# @app.task(bind=True)
# def debug_task(self):
# print('Request: {0!r}'.format(self.request))

上面一段只需改这句,’proj’是自己django项目的app名称

app = Celery('proj')

然后你需要在你的proj/proj/__init__.py 模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便 @shared_task 装饰器(稍后提到)将使用它:

proj/proj/__init__.py:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

上面这段固定的,不用改

tasks任务

在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
print("task----------111111----------")
return x + y

@shared_task
def mul(x, y):
print("task----------22222----------")
return x * y

tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器

添加setting配置

setting.py添加配置

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# celery 配置连接redis
BROKER_URL = 'redis://192.168.1.1:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.1.1:6379'

# 配置定时任务
from celery.schedules import crontab
from datetime import timedelta

CELERYBEAT_SCHEDULE = {
'add': {
'task': 'yoyo(django app名称).tasks.add', # 任务
'schedule': timedelta(seconds=5), # 每5秒执行add函数
'args': (11, 12) # 运行参数
},
'mul': {
'task': 'yoyo(django app名称).tasks.mul', # 任务
'schedule': timedelta(seconds=10), # 每10秒执行mul函数
'args': (11, 2) # 运行参数
}
}

CELERYBEAT_SCHEDULE 是配置定时任务,可以添加多个任务,任务名称可以与tasks中的函数名称保持一致,也可以自己定义一个任务名称。

  • task 参数是对应app目录下的tasks文件中任务函数名称

  • schedule 运行周期,支持contrab表达式

  • args  运行任务时候带上的参数

启动worker 和beat服务

启动worker,执行任务

celery -A MyDjango(django 项目名称) worker -l info

运行日志

D:\202107django\MyDjango>celery -A MyDjango worker -l info

-------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: yoyo:0x219342ff978
- ** ---------- .> transport: redis://192.168.1.1:6379//
- ** ---------- .> results: redis://192.168.1.1:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery

[tasks]
. yoyo.tasks.add
. yoyo.tasks.mul

[2021-10-21 12:20:32,079: INFO/MainProcess] Connected to redis://192.168.1.1:6379//
[2021-10-21 12:20:32,167: INFO/MainProcess] mingle: searching for neighbors
[2021-10-21 12:20:33,700: INFO/MainProcess] mingle: all alone

启动beat 定时任务监听

celery -A MyDjango(django 项目名称) beat -l info

启动日志

MyDjango>celery -A MyDjango beat -l info
celery beat v3.1.26.post2 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> redis://192.168.1.1:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2021-10-21 12:22:56,867: INFO/MainProcess] beat: Starting...

启动完成后,会看到beat运行日志,定时任务已经推过去

worker运行日志,执行任务

crontab 周期任务

前面是设置每多少秒执行任务,这个只是测试下功能,任务很简单,我们一般用crontab 实现周期性任务,比如每周1-5早上执行一遍任务,用crontab 可以轻松实现

# crontab任务
# 每周一8:30调用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 8:30 A.M
'add': {
'task': 'yoyo(django app名称).tasks.add', # 任务
'schedule': crontab(hour=8, minute=30, day_of_week=1),
'args': (11, 12) # 运行参数
}
}

crontab定时任务命令规则:

星期 命令 路径
minute hour day month week command path
* * * * * command path
  • minute:

    表示分钟,可以是从0到59之间的任何整数。

  • hour:

    表示小时,可以是从0到23之间的任何整数。

  • day:

    表示日期,可以是从1到31之间的任何整数。

  • month:

    表示月份,可以是从1到12之间的任何整数。

  • week:

    表示星期几,可以是从0到7之间的任何整数,这里的0或7代表星期日。

  • command:

    要执行的命令,可以是系统命令,也可以是自己编写的脚本文件。

  • path:

    需执行的文件,用绝对路径

crontab命令常用的特殊字符

符号 说明
* 表示任何时刻
, 表示分割
- 表示一个段,如第二段里:1-5,就表示1到5点
/n 表示每个n的单位执行一次,如第二段里,*/1, 就表示每隔1个小时执行一次命令。也可以写成1-23/1

定时任务如果做成可配置,存到数据库,可以用到 djcelery 实现

(0)

相关推荐