FastAPI Token 有效时长更新实现方案
为了实现 Token 有效时长更新功能,可以在 Token 的 Payload 中添加一个 'exp' 字段来表示 Token 的过期时间。当用户登录时,服务器会为其生成一个 Token,并将 Token 的过期时间设置为当前时间加上一个固定的时长(例如 30 分钟)。当用户访问需要验证 Token 的 API 时,服务器会检查 Token 是否已过期,如果过期了,则返回 401 错误码并提示用户需要重新登录。如果 Token 还未过期,则服务器会更新 Token 的过期时间为当前时间加上一个固定的时长,并将新的 Token 返回给用户。
下面是一个使用 FastAPI 和 JWT 实现 Token 有效时长更新的示例代码:
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError
app = FastAPI()
SECRET_KEY = 'secret'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')
# 模拟用户信息
users = {
'admin': {
'username': 'admin',
'password': 'admin',
'disabled': False,
}
}
# 生成 Token
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 验证 Token
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
except PyJWTError:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
user = users.get(username)
if user is None:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
if user['disabled']:
raise HTTPException(status_code=400, detail='Inactive user')
return user
# 登录
@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = users.get(form_data.username)
if not user:
raise HTTPException(status_code=400, detail='Incorrect username or password')
if not user['password'] == form_data.password:
raise HTTPException(status_code=400, detail='Incorrect username or password')
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={'sub': user['username']}, expires_delta=access_token_expires
)
return {'access_token': access_token, 'token_type': 'bearer'}
# 保护需要验证 Token 的 API
@app.get('/protected')
async def protected_route(current_user: dict = Depends(get_current_user)):
# 更新 Token 的过期时间
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
new_access_token = create_access_token(
data={'sub': current_user['username']}, expires_delta=access_token_expires
)
return {'message': 'Hello World', 'new_access_token': new_access_token}
在上面的代码中,当用户登录成功后,服务器会为其生成一个 Token,并将 Token 的过期时间设置为当前时间加上 30 分钟。当用户访问 '/protected' API 时,服务器会检查 Token 是否已过期,如果还未过期,则更新 Token 的过期时间为当前时间加上 30 分钟,并将新的 Token 返回给用户。这样,用户就可以继续访问需要验证 Token 的 API,而无需重新登录。
原文地址: https://www.cveoy.top/t/topic/nckp 著作权归作者所有。请勿转载和采集!