Django中使用Celery执行异步和定时任务的注意事项

WEB前端开发社区 昨天

1. Windows中使用Celery 4.0及以后版本
Celery 4.0+及以后版本不支持在windows系统上运行。如果你希望在windows系统上使用celery, 有两种方法。
方法一:安装3.1.25版本
    pip install celery==3.1.25
    方法二:安装gevent
      pip install gevent# 启动workercelery -A <module> worker -l info -P gevent
      2. @task与@shared_task的区别
      当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。
        app = Celery('myproject')@app.task(bind=True)def debug_task(self):    print('Request: {0!r}'.format(self.request))
        然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。
          from __future__ import absolute_importfrom celery import shared_task@shared_taskdef add(x, y):    return x + y

          3. 如果异步的任务包括耗时的I/O操作
          一个无限期阻塞的任务会使得工作单元无法再做其他事情。如果你的任务里有 I/O 操作,请确保给这些操作加上超时时间,例如使用 requests 库时给网络请求添加一个超时时间:
            connect_timeout, read_timeout = 5.0, 30.0response = requests.get(URL, timeout=(connect_timeout, read_timeout))
            默认的 prefork 池调度器对长时间任务不是很友好,所以如果你的任务需要运行很长时间,确保在启动工作单元时使能了 -ofair 选项。
            4. 使用多装饰器
            当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置):
              @app.task@decorator2@decorator1def add(x, y):    return x + y
              5. 使用bind=True绑定任务
              一个绑定任务意味着任务函数的第一个参数总是任务实例本身(self),就像 Python 绑定方法类似,如下例所示:
                from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)@task(bind=True)def add(self, x, y):    logger.info(self.request.id)
                绑定任务在这些情况下是必须的:任务重试(使用 app.Task.retry() ),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。
                6. 忽略不想要的结果
                如果你不在意任务的返回结果,可以设置 ignore_result 选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result 设置全局忽略任务结果。
                  @app.task(ignore_result=True)def mytask():    something()
                  7. 避免启动同步子任务
                  让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。
                  # 坏例子
                    @app.taskdef update_page_info(url):    page = fetch_page.delay(url).get()    info = parse_page.delay(url, page).get()    store_page_info.delay(url, info)@app.taskdef fetch_page(url):    return myhttplib.get(url)@app.taskdef parse_page(url, page):    return myparser.parse_document(page)@app.taskdef store_page_info(url, info):    return PageInfo.objects.create(url, info)
                    # 好例子
                      def update_page_info(url):    # fetch_page -> parse_page -> store_page    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)    chain()@app.task()def fetch_page(url):    return myhttplib.get(url)@app.task()def parse_page(page):    return myparser.parse_document(page)@app.task(ignore_result=True)def store_page_info(info, url):    PageInfo.objects.create(url=url, info=info)
                      在好例子里,我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行。警告:不建议同步执行子任务!
                      8.  Django的模型对象不应该作为参数传递给任务
                      Django 的模型对象。他们不应该作为参数传递给任务。几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:
                        class Article(models.Model):    title = models.CharField()    body = models.TextField()@app.taskdef expand_abbreviations(article):    article.body.replace('MyCorp', 'My Corporation')    article.save()
                        首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:
                          >>> article = Article.objects.get(id=102)>>> expand_abbreviations.delay(article)
                          现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:
                            @app.taskdef expand_abbreviations(article_id):    article = Article.objects.get(id=article_id)    article.body.replace('MyCorp', 'My Corporation')    article.save()
                            9. 使用on_commit函数处理事务
                            我们再看另外一个celery中处理事务的例子。这是在数据库中创建一个文章对象的 Django 视图,此时传递主键给任务。它使用 commit_on_success 装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。
                              from django.db import transaction@transaction.commit_on_successdef create_article(request):    article = Article.objects.create()    expand_abbreviations.delay(article.pk)
                              如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit 回调函数来在所有事务提交成功后启动任务。
                                from django.db.transaction import on_commitdef create_article(request):    article = Article.objects.create()    on_commit(lambda: expand_abbreviations.delay(article.pk))
                                10. 自定义重试延迟
                                当任务发送例外时,app.Task.retry() 函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay 属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown 参数覆盖这个默认值。
                                  # retry in 30 minutes.@app.task(bind=True, default_retry_delay=30 * 60)  def add(self, x, y):try:        something_raising()except Exception as exc:# overrides the default delay to retry after 1 minuteraise self.retry(exc=exc, countdown=60)
                                  声明:
                                  本文于网络整理,版权归原作者所有,如来源信息有误或侵犯权益,请联系我们删除或授权事宜。
                                  (0)

                                  相关推荐