fastapi token 如何实现token有效期更新
要实现FastAPI Token的有效期更新,可以使用JWT(JSON Web Token)和其自带的刷新令牌机制。JWT是一种用于安全地在两个实体之间传输信息的开放标准,它可以生成具有有效期的Token,并且可以使用刷新令牌来延长Token的有效期。
下面是一个示例代码,演示如何使用FastAPI和PyJWT实现JWT Token的刷新功能:
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError
app = FastAPI()
# JWT 配置
JWT_SECRET = "myjwtsecret"
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_TIME_MINUTES = 30
JWT_REFRESH_TIME_MINUTES = 5
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# 模拟数据库中的用户
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedpassword",
"disabled": False,
}
}
# 创建 JWT Token
def create_access_token(data: dict, expires_delta: timedelta) -> bytes:
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
# 验证 JWT Token
async def decode_access_token(token: str):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
token_data = {"username": username}
return token_data
except PyJWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
# 路由:获取 Token
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
hashed_password = user_dict["hashed_password"]
if not hashed_password == form_data.password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
refresh_token_expires = timedelta(minutes=JWT_REFRESH_TIME_MINUTES)
access_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=access_token_expires)
refresh_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=refresh_token_expires)
return {"access_token": access_token, "token_type": "bearer", "refresh_token": refresh_token}
# 路由:刷新 Token
@app.get("/refresh_token")
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid refresh token")
user_dict = fake_users_db.get(username)
if user_dict is None:
raise HTTPException(status_code=401, detail="Invalid refresh token")
access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
access_token = create_access_token(data={"sub": user_dict["username"]}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
except PyJWTError:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# 路由:测试需要认证的 API
@app.get("/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
token_data = await decode_access_token(token)
user = fake_users_db.get(token_data["username"])
if user is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return user
在上面的代码中,我们使用了PyJWT来实现JWT Token的生成和解码,并使用FastAPI的Depends和OAuth2PasswordBearer来验证Token。在获取Token时,我们生成了两个Token:access_token和refresh_token,分别用于访问API和刷新Token的有效期。access_token的有效期为30分钟,refresh_token的有效期为5分钟。在刷新Token的API中,我们验证了refresh_token的有效性,并生成了新的access_token。注意,这里的刷新Token并不是简单地延长Token的有效期,而是使用了一个全新的Token,以增强安全性。
原文地址: https://www.cveoy.top/t/topic/bJmm 著作权归作者所有。请勿转载和采集!