要实现FastAPI Token的有效期更新,可以使用JWT(JSON Web Token)和其自带的刷新令牌机制。JWT是一种用于安全地在两个实体之间传输信息的开放标准,它可以生成具有有效期的Token,并且可以使用刷新令牌来延长Token的有效期。

下面是一个示例代码,演示如何使用FastAPI和PyJWT实现JWT Token的刷新功能:

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError

app = FastAPI()

# JWT 配置
JWT_SECRET = "myjwtsecret"
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_TIME_MINUTES = 30
JWT_REFRESH_TIME_MINUTES = 5

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

# 模拟数据库中的用户
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedpassword",
        "disabled": False,
    }
}

# 创建 JWT Token
def create_access_token(data: dict, expires_delta: timedelta) -> bytes:
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
    return encoded_jwt

# 验证 JWT Token
async def decode_access_token(token: str):
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid authentication credentials")
        token_data = {"username": username}
        return token_data
    except PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")

# 路由:获取 Token
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    hashed_password = user_dict["hashed_password"]
    if not hashed_password == form_data.password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
    refresh_token_expires = timedelta(minutes=JWT_REFRESH_TIME_MINUTES)
    access_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=access_token_expires)
    refresh_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=refresh_token_expires)
    return {"access_token": access_token, "token_type": "bearer", "refresh_token": refresh_token}

# 路由:刷新 Token
@app.get("/refresh_token")
async def refresh_token(refresh_token: str):
    try:
        payload = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid refresh token")
        user_dict = fake_users_db.get(username)
        if user_dict is None:
            raise HTTPException(status_code=401, detail="Invalid refresh token")
        access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
        access_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=access_token_expires)
        return {"access_token": access_token, "token_type": "bearer"}
    except PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

# 路由:测试需要认证的 API
@app.get("/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
    token_data = await decode_access_token(token)
    user = fake_users_db.get(token_data["username"])
    if user is None:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")
    return user

在上面的代码中,我们使用了PyJWT来实现JWT Token的生成和解码,并使用FastAPI的Depends和OAuth2PasswordBearer来验证Token。在获取Token时,我们生成了两个Token:access_token和refresh_token,分别用于访问API和刷新Token的有效期。access_token的有效期为30分钟,refresh_token的有效期为5分钟。在刷新Token的API中,我们验证了refresh_token的有效性,并生成了新的access_token。注意,这里的刷新Token并不是简单地延长Token的有效期,而是使用了一个全新的Token,以增强安全性。


原文地址: https://www.cveoy.top/t/topic/bJmm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录