Django中使用Celery执行异步和定时任务的注意事项
WEB前端开发社区 昨天
pip install celery==3.1.25
pip install gevent# 启动workercelery -A <module> worker -l info -P gevent
app = Celery('myproject')
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
return x + y
connect_timeout, read_timeout = 5.0, 30.0response = requests.get(URL, timeout=(connect_timeout, read_timeout))
@app.task@decorator2@decorator1def add(x, y): return x + y
self
),就像 Python 绑定方法类似,如下例所示:from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
app.Task.retry()
),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。ignore_result
选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result
设置全局忽略任务结果。@app.task(ignore_result=True)def mytask(): something()
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
>>> article = Article.objects.get(id=102)>>> expand_abbreviations.delay(article)
@app.taskdef expand_abbreviations(article_id): article = Article.objects.get(id=article_id) article.body.replace('MyCorp', 'My Corporation') article.save()
commit_on_success
装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。from django.db import transaction
@transaction.commit_on_success
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
on_commit
回调函数来在所有事务提交成功后启动任务。from django.db.transaction import on_commitdef create_article(request): article = Article.objects.create() on_commit(lambda: expand_abbreviations.delay(article.pk))
app.Task.retry()
函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay
属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown
参数覆盖这个默认值。# retry in 30 minutes.@app.task(bind=True, default_retry_delay=30 * 60) def add(self, x, y):try: something_raising()except Exception as exc:# overrides the default delay to retry after 1 minuteraise self.retry(exc=exc, countdown=60)
赞 (0)