fastapi with celery

Table of Contents

Overview

这其实是典型的发布订阅模式, 生产者发布任务到任务队列中, 消费者从任务队列中消费任务.
这里的任务队列是 Redis, 也可以使用 RabbitMQ

file

Quick start

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    item_id: int
    name: str
    description: str = None

@app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int):
    return {"item_id": item_id, "name": "Some item", "description": "Description of the item"}

start cerely by docker compose

version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: ./compose/local/fastapi/Dockerfile
    image: fastapi_celery_example_web
    # '/start' is the shell script used to run the service
    command: /start
    # this volume is used to map the files and folders on the host to the container
    # so if we change code on the host, code in the docker container will also be changed
    volumes:
      - .:/app
    ports:
      - 8010:8000
    env_file:
      - .env/.dev-sample
    depends_on:
      - redis
      - db

  db:
    image: postgres:16-alpine
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    environment:
      - POSTGRES_DB=fastapi_celery
      - POSTGRES_USER=fastapi_celery
      - POSTGRES_PASSWORD=fastapi_celery

  redis:
    image: redis:7-alpine

  celery_worker:
    build:
      context: .
      dockerfile: ./compose/local/fastapi/Dockerfile
    image: fastapi_celery_example_celery_worker
    command: /start-celeryworker
    volumes:
      - .:/app
    env_file:
      - .env/.dev-sample
    depends_on:
      - redis
      - db

  celery_beat:
    build:
      context: .
      dockerfile: ./compose/local/fastapi/Dockerfile
    image: fastapi_celery_example_celery_beat
    command: /start-celerybeat
    volumes:
      - .:/app
    env_file:
      - .env/.dev-sample
    depends_on:
      - redis
      - db

  flower:
    build:
      context: .
      dockerfile: ./compose/local/fastapi/Dockerfile
    image: fastapi_celery_example_celery_flower
    command: /start-flower
    volumes:
      - .:/app
    env_file:
      - .env/.dev-sample
    ports:
      - 5557:5555
    depends_on:
      - redis
      - db
volumes:
  postgres_data:

folder structure example

├─📁 backend--------------- # Backend
│ ├─📁 alembic------------- # DB migration
│ ├─📁 app----------------- # Application
│ │ ├─📁 admin------------- # System admin
│ │ │ ├─📁 api------------- # Interface
│ │ │ ├─📁 crud------------ # CRUD
│ │ │ ├─📁 model----------- # SQLA model
│ │ │ ├─📁 schema---------- # Data transmit
│ │ │ ├─📁 service--------- # Service
│ │ │ └─📁 tests----------- # Pytest
│ │ ├─📁 generator--------- # Code generate
│ │ └─📁 task-------------- # Celery task
│ ├─📁 common-------------- # public resources
│ ├─📁 core---------------- # Core configuration
│ ├─📁 database------------ # Database connection
│ ├─📁 log----------------- # Log
│ ├─📁 middleware---------- # Middlewares
│ ├─📁 scripts------------- # Scripts
│ ├─📁 sql----------------- # SQL files
│ ├─📁 static-------------- # Static files
│ ├─📁 templates----------- # Template files
│ └─📁 utils--------------- # Toolkit
└─📁 deploy---------------- # Server deployment

Dockerfile

FROM python3.10-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app"]

celery

celery -A app.task.celery worker -l info

# Scheduled tasks (optional)
celery -A app.task.celery beat -l info

# Web monitor (optional)
celery -A app.task.celery flower --port=8555 --basic-auth=admin:123456

snippets

  • config.py
import os
import pathlib
from functools import lru_cache

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

class DevelopmentConfig(BaseConfig):
    pass

class ProductionConfig(BaseConfig):
    pass

class TestingConfig(BaseConfig):
    pass

@lru_cache()
def get_settings():
    config_cls_dict = {
        "development": DevelopmentConfig,
        "production": ProductionConfig,
        "testing": TestingConfig
    }

    config_name = os.environ.get("FASTAPI_CONFIG", "development")
    config_cls = config_cls_dict[config_name]
    return config_cls()

settings = get_settings()
  • database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

from project.config import settings

# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
    settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

reference

Comments |0|

Legend *) Required fields are marked
**) You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>
Category: 似水流年