fastapi with celery
Table of Contents
Overview
这其实是典型的发布订阅模式, 生产者发布任务到任务队列中, 消费者从任务队列中消费任务.
这里的任务队列是 Redis, 也可以使用 RabbitMQ
Quick start
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
item_id: int
name: str
description: str = None
@app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int):
return {"item_id": item_id, "name": "Some item", "description": "Description of the item"}
start cerely by docker compose
version: '3.8'
services:
web:
build:
context: .
dockerfile: ./compose/local/fastapi/Dockerfile
image: fastapi_celery_example_web
# '/start' is the shell script used to run the service
command: /start
# this volume is used to map the files and folders on the host to the container
# so if we change code on the host, code in the docker container will also be changed
volumes:
- .:/app
ports:
- 8010:8000
env_file:
- .env/.dev-sample
depends_on:
- redis
- db
db:
image: postgres:16-alpine
volumes:
- postgres_data:/var/lib/postgresql/data/
environment:
- POSTGRES_DB=fastapi_celery
- POSTGRES_USER=fastapi_celery
- POSTGRES_PASSWORD=fastapi_celery
redis:
image: redis:7-alpine
celery_worker:
build:
context: .
dockerfile: ./compose/local/fastapi/Dockerfile
image: fastapi_celery_example_celery_worker
command: /start-celeryworker
volumes:
- .:/app
env_file:
- .env/.dev-sample
depends_on:
- redis
- db
celery_beat:
build:
context: .
dockerfile: ./compose/local/fastapi/Dockerfile
image: fastapi_celery_example_celery_beat
command: /start-celerybeat
volumes:
- .:/app
env_file:
- .env/.dev-sample
depends_on:
- redis
- db
flower:
build:
context: .
dockerfile: ./compose/local/fastapi/Dockerfile
image: fastapi_celery_example_celery_flower
command: /start-flower
volumes:
- .:/app
env_file:
- .env/.dev-sample
ports:
- 5557:5555
depends_on:
- redis
- db
volumes:
postgres_data:
folder structure example
├─📁 backend--------------- # Backend
│ ├─📁 alembic------------- # DB migration
│ ├─📁 app----------------- # Application
│ │ ├─📁 admin------------- # System admin
│ │ │ ├─📁 api------------- # Interface
│ │ │ ├─📁 crud------------ # CRUD
│ │ │ ├─📁 model----------- # SQLA model
│ │ │ ├─📁 schema---------- # Data transmit
│ │ │ ├─📁 service--------- # Service
│ │ │ └─📁 tests----------- # Pytest
│ │ ├─📁 generator--------- # Code generate
│ │ └─📁 task-------------- # Celery task
│ ├─📁 common-------------- # public resources
│ ├─📁 core---------------- # Core configuration
│ ├─📁 database------------ # Database connection
│ ├─📁 log----------------- # Log
│ ├─📁 middleware---------- # Middlewares
│ ├─📁 scripts------------- # Scripts
│ ├─📁 sql----------------- # SQL files
│ ├─📁 static-------------- # Static files
│ ├─📁 templates----------- # Template files
│ └─📁 utils--------------- # Toolkit
└─📁 deploy---------------- # Server deployment
Dockerfile
FROM python3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app"]
celery
celery -A app.task.celery worker -l info
# Scheduled tasks (optional)
celery -A app.task.celery beat -l info
# Web monitor (optional)
celery -A app.task.celery flower --port=8555 --basic-auth=admin:123456
snippets
- config.py
import os
import pathlib
from functools import lru_cache
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
class DevelopmentConfig(BaseConfig):
pass
class ProductionConfig(BaseConfig):
pass
class TestingConfig(BaseConfig):
pass
@lru_cache()
def get_settings():
config_cls_dict = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig
}
config_name = os.environ.get("FASTAPI_CONFIG", "development")
config_cls = config_cls_dict[config_name]
return config_cls()
settings = get_settings()
- database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from project.config import settings
# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
reference
- https://testdriven.io/blog/moving-from-flask-to-fastapi/
- https://testdriven.io/courses/fastapi-celery/app-factory/
Comments |0|
Category: 似水流年