fastapi中使用SQLAlchemy AsyncSession给我一些crud使用封装方法
以下是一些使用SQLAlchemy AsyncSession的基本CRUD封装方法:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from sqlalchemy.exc import NoResultFound
async def create(session: AsyncSession, model: Type[Base]) -> None:
async with session.begin():
session.add(model)
async def get(session: AsyncSession, model: Type[Base], id: int) -> Optional[Base]:
stmt = select(model).where(model.id == id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def update_obj(session: AsyncSession, model: Type[Base], id: int, data: Dict) -> None:
stmt = update(model).where(model.id == id).values(**data)
await session.execute(stmt)
async def delete_obj(session: AsyncSession, model: Type[Base], id: int) -> None:
stmt = delete(model).where(model.id == id)
await session.execute(stmt)
async def get_or_404(session: AsyncSession, model: Type[Base], id: int) -> Base:
obj = await get(session, model, id)
if obj is None:
raise HTTPException(status_code=404, detail=f"{model.__name__} not found")
return obj
async def get_all(session: AsyncSession, model: Type[Base], page: int = 1, limit: int = 10) -> List[Base]:
stmt = select(model).offset((page - 1) * limit).limit(limit)
result = await session.execute(stmt)
return result.scalars().all()
这些方法可以在您的FastAPI应用程序中使用,例如:
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_async_session
from app.models import User
from app.schemas import UserCreate, UserUpdate
app = FastAPI()
@app.post("/users", status_code=201)
async def create_user(user: UserCreate, session: AsyncSession = Depends(get_async_session)):
db_user = User(**user.dict())
await create(session, db_user)
return db_user
@app.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession = Depends(get_async_session)):
db_user = await get_or_404(session, User, user_id)
return db_user
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserUpdate, session: AsyncSession = Depends(get_async_session)):
await update_obj(session, User, user_id, user.dict(exclude_unset=True))
db_user = await get_or_404(session, User, user_id)
return db_user
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, session: AsyncSession = Depends(get_async_session)):
await delete_obj(session, User, user_id)
return {"message": "User deleted"}
原文地址: https://www.cveoy.top/t/topic/b1hq 著作权归作者所有。请勿转载和采集!