通过 fastapi 和 celery 来安排定时任务

Table of Contents

示例一

你想要构建一个基于 FastAPI 的 Web 应用,其中集成 Celery Worker 和 Celery Beat,并且根据 Web 请求来动态安排定时任务。以下是详细的步骤和代码示例,帮助你完成这一任务。

1. 安装依赖

首先,你需要安装 FastAPI、Celery、Redis(用于任务队列)等依赖:

pip install fastapi uvicorn celery redis

Redis 将作为 Celery 的消息队列进行通信,因此你也需要启动 Redis 服务器。如果没有安装 Redis,可以通过以下命令进行安装:

# Ubuntu/Debian
sudo apt-get install redis-server
# MacOS (brew)
brew install redis

启动 Redis 服务器:

redis-server

2. 项目结构

fastapi-celery-app/
│
├── app/
│   ├── main.py        # FastAPI 应用的入口
│   ├── tasks.py       # Celery 任务和配置
│   └── celery_worker.py # Celery worker 和 beat 的启动脚本
├── celeryconfig.py    # Celery 配置文件
└── requirements.txt

3. 配置 Celery(celeryconfig.py

# celeryconfig.py

from datetime import timedelta

broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/0"

beat_schedule = {
    "example-scheduled-task": {
        "task": "app.tasks.scheduled_task",
        "schedule": timedelta(minutes=1),  # 每分钟执行一次
    },
}

timezone = "UTC"

4. 定义 Celery 任务(tasks.py

tasks.py 中定义 Celery 的任务和动态任务调度的功能:

# app/tasks.py

from celery import Celery
from celery.schedules import crontab
import celeryconfig

app = Celery("tasks")
app.config_from_object(celeryconfig)

@app.task
def example_task(data):
    print(f"Received data: {data}")
    return f"Task completed with data: {data}"

@app.task
def scheduled_task():
    print("Scheduled task is running!")

# 动态添加任务
def schedule_task_at(interval_in_minutes, data):
    app.conf.beat_schedule[f"dynamic-task-{data}"] = {
        "task": "app.tasks.example_task",
        "schedule": crontab(minute=f"*/{interval_in_minutes}"),
        "args": [data],
    }
    app.conf.beat_schedule_filename = 'celerybeat-schedule'
    app.conf.update()

这里,我们定义了一个简单的任务 example_task 和一个定时任务 scheduled_task。同时,schedule_task_at 函数允许通过 Web 请求动态添加任务。

5. FastAPI 应用(main.py

main.py 中,处理用户请求并调度 Celery 任务:

# app/main.py

from fastapi import FastAPI
from app.tasks import example_task, schedule_task_at
from pydantic import BaseModel

app = FastAPI()

class TaskRequest(BaseModel):
    interval_in_minutes: int
    data: str

@app.post("/schedule-task/")
async def schedule_task(request: TaskRequest):
    # 调用函数,安排动态任务
    schedule_task_at(request.interval_in_minutes, request.data)
    return {"message": "Task scheduled successfully!"}

@app.post("/run-task/")
async def run_task(request: TaskRequest):
    task = example_task.delay(request.data)
    return {"task_id": task.id}

6. 启动 Celery Worker 和 Beat(celery_worker.py

创建一个脚本来同时启动 Celery Worker 和 Celery Beat:

# app/celery_worker.py

from app.tasks import app

# 启动 Celery worker 和 beat
if __name__ == "__main__":
    app.worker_main(["worker", "-B", "-l", "info"])

这里我们使用 -B 选项同时启动 Celery Worker 和 Celery Beat。

7. 启动应用

启动 Celery Worker 和 Beat

使用 celery_worker.py 来启动 Celery Worker 和 Beat:

python app/celery_worker.py

启动 FastAPI 应用

启动 FastAPI 应用:

uvicorn app.main:app --reload

8. 测试 Web 应用

1. 动态安排定时任务

你可以通过发送 POST 请求来安排动态的定时任务。使用 curl 或者 Postman 来发送请求。

请求示例:

curl -X 'POST' \
  'http://127.0.0.1:8000/schedule-task/' \
  -H 'Content-Type: application/json' \
  -d '{
  "interval_in_minutes": 2,
  "data": "Task data for every 2 minutes"
}'

这个请求会安排一个任务,每两分钟执行一次。

2. 手动运行任务

你也可以手动触发一个任务:

curl -X 'POST' \
  'http://127.0.0.1:8000/run-task/' \
  -H 'Content-Type: application/json' \
  -d '{
  "data": "Immediate task data"
}'

这个请求会立即运行一个任务。

示例二

步骤 1: 设置项目结构

首先,创建一个项目文件夹,并在其中设置以下文件结构:

my_fastapi_app/
├── app/
│   ├── main.py
│   ├── tasks.py
│   └── __init__.py
├── alembic.ini
├── Dockerfile
├── requirements.txt
└── .env

步骤 2: 安装依赖

requirements.txt 文件中,列出你的项目依赖:

fastapi
uvicorn
celery[redis]
redis

然后,使用 pip 安装这些依赖:

pip install -r requirements.txt

步骤 3: 配置 Celery

app/__init__.py 文件中,配置 Celery:

from celery import Celery

def make_celery(app_name=__name__, broker="redis://localhost:6379/0", backend=None):
    return Celery(app_name, broker=broker, backend=backend)

celery = make_celery()

步骤 4: 定义 Celery 任务

app/tasks.py 文件中,定义你的 Celery 任务:

from . import celery
from time import sleep

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello', 'world') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello', 'world'), name='my_every_10')

@celery.task
def test(arg1, arg2):
    print(f"Task received: {arg1}, {arg2}")
    sleep(5)  # 模拟耗时操作
    return f"{arg1} + {arg2} = {'hello world'}"

步骤 5: 创建 FastAPI 应用

app/main.py 文件中,创建你的 FastAPI 应用,并集成 Celery:

from fastapi import FastAPI, HTTPException
from . import celery
from .tasks import test

app = FastAPI()

@app.get("/")
async def read_root():
    return {"Hello": "World"}

@app.post("/run-task/")
async def run_task(arg1: str, arg2: str):
    task = test.delay(arg1, arg2)
    if task:
        return {"task_id": task.id, "status": "Pending"}
    else:
        raise HTTPException(status_code=500, detail="Failed to start the task")

步骤 6: 启动 Celery Worker 和 Beat

Dockerfile 中,配置你的应用和 Celery:

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["celery", "-A", "main", "worker", "--loglevel=info", "--beat"]

步骤 7: 运行应用

.env 文件中,设置环境变量:

CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

使用 Docker 构建并运行你的应用:

docker build -t my_fastapi_app .
docker run -d -p 8000:8000 --name my_fastapi_app_container my_fastapi_app

现在,你的 FastAPI 应用和 Celery 应该在 Docker 容器中运行。你可以通过访问 http://localhost:8000 来测试你的 FastAPI 应用。

注意事项

  • 确保你的 Redis 服务正在运行,因为 Celery 使用它作为消息代理。
  • 这个示例使用了 Docker 来简化部署,但你可以直接在本地环境中运行这些服务。
  • 你可能需要根据你的具体需求调整 Celery 配置和任务定义。

Comments |0|

Legend *) Required fields are marked
**) You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>
Category: 似水流年