Django|Celery 任务状态管理
信息
2024年8月23日 · ·
如何通过 Django 来管理 Celery 任务?通过 Django Admin 界面提供任务的查询、查看、重试、终止等功能?下面是一个完整的步骤指南。
使用 Django 管理 Celery Worker
安装 Django 和相关包
首先,创建一个新的虚拟环境并安装所需的包。
python -m venv myenv
source myenv/bin/activate # Windows 系统使用: myenv\Scripts\activate
pip install django django-celery-results django-celery-beat celery
创建 Django 项目和应用
django-admin startproject myproject
cd myproject
django-admin startapp myapp
配置 Django 和 Celery
在 myproject/settings.py
文件中添加以下内容:
INSTALLED_APPS = [
...,
'django_celery_results',
'django_celery_beat',
'myapp', # 确保 app 在这个列表里
]
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 使用 Redis 作为示例,可以根据需求更改
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TRACK_STARTED = True
CELERY_SEND_EVENTS = True
# 确保已经配置了数据库
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# 配置 Django 缓存(可选)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
}
}
在 myproject
目录中创建一个 celery.py
文件:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# 设置 Django 的配置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 从 Django 的设置中配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
修改 myproject/__init__.py
文件,使得 Django 在启动时加载 Celery:
from __future__ import absolute_import, unicode_literals
# 这将确保当 Django 启动时加载 app.py
from .celery import app as celery_app
__all__ = ('celery_app',)
创建一个 Celery 任务
在 myapp/tasks.py
中创建一个简单的 Celery 任务:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
注册自定义的 TaskResultAdmin
我们需要在自定义 TaskResultAdmin
之前先取消已经注册的模型。
在 myapp/admin.py
中做如下修改:
from django.contrib import admin
from django_celery_results.models import TaskResult
from django_celery_results.admin import TaskResultAdmin as DefaultTaskResultAdmin
from django.urls import path
from django.shortcuts import redirect
from celery.result import AsyncResult
from myproject.celery import app
# 取消已经注册的 TaskResult
admin.site.unregister(TaskResult)
# 创建一个自定义的 TaskResultAdmin 继承自默认的 TaskResultAdmin
class CustomTaskResultAdmin(DefaultTaskResultAdmin):
change_list_template = "admin/celery_task_changelist.html"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('retry/<task_id>/', self.admin_site.admin_view(self.retry_task), name='retry-task'),
path('terminate/<task_id>/', self.admin_site.admin_view(self.terminate_task), name='terminate-task'),
]
return custom_urls + urls
def retry_task(self, request, task_id, *args, **kwargs):
AsyncResult(task_id, app=app).reapply()
self.message_user(request, f'Task {task_id} retried successfully.')
return redirect('..')
def terminate_task(self, request, task_id, *args, **kwargs):
AsyncResult(task_id, app=app).revoke(terminate=True)
self.message_user(request, f'Task {task_id} terminated successfully.')
return redirect('..')
# 注册自定义的 TaskResultAdmin
admin.site.register(TaskResult, CustomTaskResultAdmin)
TaskResult
模型已经被 django_celery_results
自动注册到 Django Admin 中了。我们可以通过继承 django_celery_results
的 TaskResultAdmin
并覆盖的方式来避免重复注册模型。