通过 fastapi 和 celery 来安排定时任务
示例一
你想要构建一个基于 FastAPI 的 Web 应用,其中集成 Celery Worker 和 Celery Beat,并且根据 Web 请求来动态安排定时任务。以下是详细的步骤和代码示例,帮助你完成这一任务。
1. 安装依赖
首先,你需要安装 FastAPI、Celery、Redis(用于任务队列)等依赖:
pip install fastapi uvicorn celery redis
Redis 将作为 Celery 的消息队列进行通信,因此你也需要启动 Redis 服务器。如果没有安装 Redis,可以通过以下命令进行安装:
# Ubuntu/Debian
sudo apt-get install redis-server
# MacOS (brew)
brew install redis
启动 Redis 服务器:
redis-server
2. 项目结构
fastapi-celery-app/
│
├── app/
│ ├── main.py # FastAPI 应用的入口
│ ├── tasks.py # Celery 任务和配置
│ └── celery_worker.py # Celery worker 和 beat 的启动脚本
├── celeryconfig.py # Celery 配置文件
└── requirements.txt
3. 配置 Celery(celeryconfig.py
)
# celeryconfig.py
from datetime import timedelta
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/0"
beat_schedule = {
"example-scheduled-task": {
"task": "app.tasks.scheduled_task",
"schedule": timedelta(minutes=1), # 每分钟执行一次
},
}
timezone = "UTC"
4. 定义 Celery 任务(tasks.py
)
在 tasks.py
中定义 Celery 的任务和动态任务调度的功能:
# app/tasks.py
from celery import Celery
from celery.schedules import crontab
import celeryconfig
app = Celery("tasks")
app.config_from_object(celeryconfig)
@app.task
def example_task(data):
print(f"Received data: {data}")
return f"Task completed with data: {data}"
@app.task
def scheduled_task():
print("Scheduled task is running!")
# 动态添加任务
def schedule_task_at(interval_in_minutes, data):
app.conf.beat_schedule[f"dynamic-task-{data}"] = {
"task": "app.tasks.example_task",
"schedule": crontab(minute=f"*/{interval_in_minutes}"),
"args": [data],
}
app.conf.beat_schedule_filename = 'celerybeat-schedule'
app.conf.update()
这里,我们定义了一个简单的任务 example_task
和一个定时任务 scheduled_task
。同时,schedule_task_at
函数允许通过 Web 请求动态添加任务。
5. FastAPI 应用(main.py
)
在 main.py
中,处理用户请求并调度 Celery 任务:
# app/main.py
from fastapi import FastAPI
from app.tasks import example_task, schedule_task_at
from pydantic import BaseModel
app = FastAPI()
class TaskRequest(BaseModel):
interval_in_minutes: int
data: str
@app.post("/schedule-task/")
async def schedule_task(request: TaskRequest):
# 调用函数,安排动态任务
schedule_task_at(request.interval_in_minutes, request.data)
return {"message": "Task scheduled successfully!"}
@app.post("/run-task/")
async def run_task(request: TaskRequest):
task = example_task.delay(request.data)
return {"task_id": task.id}
6. 启动 Celery Worker 和 Beat(celery_worker.py
)
创建一个脚本来同时启动 Celery Worker 和 Celery Beat:
# app/celery_worker.py
from app.tasks import app
# 启动 Celery worker 和 beat
if __name__ == "__main__":
app.worker_main(["worker", "-B", "-l", "info"])
这里我们使用 -B
选项同时启动 Celery Worker 和 Celery Beat。
7. 启动应用
启动 Celery Worker 和 Beat
使用 celery_worker.py
来启动 Celery Worker 和 Beat:
python app/celery_worker.py
启动 FastAPI 应用
启动 FastAPI 应用:
uvicorn app.main:app --reload
8. 测试 Web 应用
1. 动态安排定时任务
你可以通过发送 POST 请求来安排动态的定时任务。使用 curl
或者 Postman 来发送请求。
请求示例:
curl -X 'POST' \
'http://127.0.0.1:8000/schedule-task/' \
-H 'Content-Type: application/json' \
-d '{
"interval_in_minutes": 2,
"data": "Task data for every 2 minutes"
}'
这个请求会安排一个任务,每两分钟执行一次。
2. 手动运行任务
你也可以手动触发一个任务:
curl -X 'POST' \
'http://127.0.0.1:8000/run-task/' \
-H 'Content-Type: application/json' \
-d '{
"data": "Immediate task data"
}'
这个请求会立即运行一个任务。
示例二
步骤 1: 设置项目结构
首先,创建一个项目文件夹,并在其中设置以下文件结构:
my_fastapi_app/
├── app/
│ ├── main.py
│ ├── tasks.py
│ └── __init__.py
├── alembic.ini
├── Dockerfile
├── requirements.txt
└── .env
步骤 2: 安装依赖
在 requirements.txt
文件中,列出你的项目依赖:
fastapi
uvicorn
celery[redis]
redis
然后,使用 pip 安装这些依赖:
pip install -r requirements.txt
步骤 3: 配置 Celery
在 app/__init__.py
文件中,配置 Celery:
from celery import Celery
def make_celery(app_name=__name__, broker="redis://localhost:6379/0", backend=None):
return Celery(app_name, broker=broker, backend=backend)
celery = make_celery()
步骤 4: 定义 Celery 任务
在 app/tasks.py
文件中,定义你的 Celery 任务:
from . import celery
from time import sleep
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello', 'world') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello', 'world'), name='my_every_10')
@celery.task
def test(arg1, arg2):
print(f"Task received: {arg1}, {arg2}")
sleep(5) # 模拟耗时操作
return f"{arg1} + {arg2} = {'hello world'}"
步骤 5: 创建 FastAPI 应用
在 app/main.py
文件中,创建你的 FastAPI 应用,并集成 Celery:
from fastapi import FastAPI, HTTPException
from . import celery
from .tasks import test
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.post("/run-task/")
async def run_task(arg1: str, arg2: str):
task = test.delay(arg1, arg2)
if task:
return {"task_id": task.id, "status": "Pending"}
else:
raise HTTPException(status_code=500, detail="Failed to start the task")
步骤 6: 启动 Celery Worker 和 Beat
在 Dockerfile
中,配置你的应用和 Celery:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["celery", "-A", "main", "worker", "--loglevel=info", "--beat"]
步骤 7: 运行应用
在 .env
文件中,设置环境变量:
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
使用 Docker 构建并运行你的应用:
docker build -t my_fastapi_app .
docker run -d -p 8000:8000 --name my_fastapi_app_container my_fastapi_app
现在,你的 FastAPI 应用和 Celery 应该在 Docker 容器中运行。你可以通过访问 http://localhost:8000
来测试你的 FastAPI 应用。
注意事项
- 确保你的 Redis 服务正在运行,因为 Celery 使用它作为消息代理。
- 这个示例使用了 Docker 来简化部署,但你可以直接在本地环境中运行这些服务。
- 你可能需要根据你的具体需求调整 Celery 配置和任务定义。