Will Dx

人世一身霜雪, 归来仍是少年.

Django学习笔记18_Django集成Celery异步执行任务

Posted April 07, 2017

1. 需求场景

1、在web应用中,用户触发一个操作,执行后台处理程序,这个程序需要执行很长时间才能返回结果。怎样才能不阻塞http请求,不让用户等待从而提高用户体验呢? 2、定时任务脚本:生产环境经常会跑一些定时任务脚本,假如你有上千台的服务器、上千种任务,定时任务的管理很困难,如何对job进行有效的管理? 3、异步需求:比如发送短信/邮件、推送消息、清理/设置缓存?

2. Celery - 分布式任务队列系统

Celery是一个可以处理大量消息的分布式任务系统,它凭借简单、灵活、可靠的特性被广泛使用。Celery聚焦于实时处理任务,同时也支持定时的任务调度

2.1 特性

  1. 查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
  2. 易于其他框架集成,如使用django管理后台添加、更新、删除任务。
  3. 方便把任务和配置管理相关联。
  4. 可选多进程、Eventlet和Gevent三种模式并发执行。
  5. 提供错误处理机制

从上图中可以知道Celery包含如下组件:

  1. Producer:凡是调用了Celery API、函数或装饰器而产生任务并交给任务队列处理的都是任务生产者。
  2. 任务调度组件:Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  3. Celery Worker:负责执行任务的线程,可以在多台服务器运行提高执行效率
  4. Broker:消息中间件,负责接受任务生产者的任务,并且转发work进行执行。Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作为消息中间件,官方推荐使用RabbitMQ。
  5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式

2.2 一个简单例子

Python
# 1. 在项目根目录创建tasks.py

# -*- coding: utf-8 -*-
"""
hello: 队列名称
broker: 使用redis做消息中间件
backend: 使用redis做结果存储
"""

from celery import Celery
app = Celery('hello', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task
def hello():
    """Worker程序"""
    return 'hello world'

# 2. 启动Hello Worker, 监听hello队列

celery -A tasks worker --loglevel=info

>>> 输出: 可以看到Worker已经在监听队列了
(accouting_server)   accouting_server git:(master)  celery -A tasks worker --loglevel=info
celery@xiang.local v4.1.0 (latentcall)

Darwin-16.6.0-x86_64-i386-64bit 2017-08-03 12:27:33

[config]
.> app:         hello:0x103889b00
.> transport:   redis://localhost:6379/0
.> results:     disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.hello

[2017-08-03 12:27:33,896: INFO/MainProcess] Connected to redis://localhost:6379/0
[2017-08-03 12:27:33,907: INFO/MainProcess] mingle: searching for neighbors
[2017-08-03 12:27:34,939: INFO/MainProcess] mingle: all alone
[2017-08-03 12:27:34,953: INFO/MainProcess] celery@xiang.local ready.

# 3. (生产者)同样目录下添加celery_producter_hello.py文件

from tasks import hello

hello.delay()
# hello.delay().get() # 若需要返回结果

# 4. 执行celery_producter_hello.py

python  celery_producter_hello.py
结果:
[2017-08-03 12:33:28,497: INFO/ForkPoolWorker-2] Task tasks.hello[6390a950-09ab-4670-8977-36fd6737435e] succeeded in 0.0008197110000764951s: 'hello world'

3. Django 集成Celery

3.0 环境准备

  • 安装redis
  • 安装celery及相关依赖: pip install -U celery[redis]
  • 启动redis: redis-server
  • 检验redis已启动: redis-cli ping

3.1 touch celery.py

Python
# -*-coding:utf-8 -*-
from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'accouting_project.settings')
app = Celery('accouting_project')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

3.2 vim settings.py

Python
# CELERY
BROKER_URL = 'redis://localhost:6379' 
CELERY_RESULT_BACKEND = 'redis://localhost:6379' 
CELERY_ACCEPT_CONTENT = ['application/json'] 
CELERY_TASK_SERIALIZER = 'json' 
CELERY_RESULT_SERIALIZER = 'json' 
CELERY_TIMEZONE = TIME_ZONE

3.3 监听Celery

Python
cd  /Users/daixiang/mycode/accouting_server/accouting_project
celery -A accouting_project  worker -l info

3.4 测试Celery

参考文档

4. Celery最佳实践

~未完待续~

参考Celery最佳实践 褚桐 博客 官方文档