以下是一些使用SQLAlchemy AsyncSession的基本CRUD封装方法:

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.exc import NoResultFound


async def create(session: AsyncSession, model: Type[Base]) -> None:
    async with session.begin():
        session.add(model)


async def get(session: AsyncSession, model: Type[Base], id: int) -> Optional[Base]:
    stmt = select(model).where(model.id == id)
    result = await session.execute(stmt)
    return result.scalar_one_or_none()


async def update_obj(session: AsyncSession, model: Type[Base], id: int, data: Dict) -> None:
    stmt = update(model).where(model.id == id).values(**data)
    await session.execute(stmt)


async def delete_obj(session: AsyncSession, model: Type[Base], id: int) -> None:
    stmt = delete(model).where(model.id == id)
    await session.execute(stmt)


async def get_or_404(session: AsyncSession, model: Type[Base], id: int) -> Base:
    obj = await get(session, model, id)
    if obj is None:
        raise HTTPException(status_code=404, detail=f"{model.__name__} not found")
    return obj


async def get_all(session: AsyncSession, model: Type[Base], page: int = 1, limit: int = 10) -> List[Base]:
    stmt = select(model).offset((page - 1) * limit).limit(limit)
    result = await session.execute(stmt)
    return result.scalars().all()

这些方法可以在您的FastAPI应用程序中使用,例如:

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_async_session
from app.models import User
from app.schemas import UserCreate, UserUpdate


app = FastAPI()


@app.post("/users", status_code=201)
async def create_user(user: UserCreate, session: AsyncSession = Depends(get_async_session)):
    db_user = User(**user.dict())
    await create(session, db_user)
    return db_user


@app.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession = Depends(get_async_session)):
    db_user = await get_or_404(session, User, user_id)
    return db_user


@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate, session: AsyncSession = Depends(get_async_session)):
    await update_obj(session, User, user_id, user.dict(exclude_unset=True))
    db_user = await get_or_404(session, User, user_id)
    return db_user


@app.delete("/users/{user_id}")
async def delete_user(user_id: int, session: AsyncSession = Depends(get_async_session)):
    await delete_obj(session, User, user_id)
    return {"message": "User deleted"}
fastapi中使用SQLAlchemy AsyncSession给我一些crud使用封装方法

原文地址: https://www.cveoy.top/t/topic/b1hq 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录