为了实现 Token 有效时长更新功能,可以在 Token 的 Payload 中添加一个 'exp' 字段来表示 Token 的过期时间。当用户登录时,服务器会为其生成一个 Token,并将 Token 的过期时间设置为当前时间加上一个固定的时长(例如 30 分钟)。当用户访问需要验证 Token 的 API 时,服务器会检查 Token 是否已过期,如果过期了,则返回 401 错误码并提示用户需要重新登录。如果 Token 还未过期,则服务器会更新 Token 的过期时间为当前时间加上一个固定的时长,并将新的 Token 返回给用户。

下面是一个使用 FastAPI 和 JWT 实现 Token 有效时长更新的示例代码:

from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError

app = FastAPI()

SECRET_KEY = 'secret'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

# 模拟用户信息
users = {
    'admin': {
        'username': 'admin',
        'password': 'admin',
        'disabled': False,
    }
}

# 生成 Token
def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({'exp': expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 验证 Token
async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get('sub')
        if username is None:
            raise HTTPException(status_code=401, detail='Invalid authentication credentials')
    except PyJWTError:
        raise HTTPException(status_code=401, detail='Invalid authentication credentials')

    user = users.get(username)
    if user is None:
        raise HTTPException(status_code=401, detail='Invalid authentication credentials')
    if user['disabled']:
        raise HTTPException(status_code=400, detail='Inactive user')
    return user

# 登录
@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = users.get(form_data.username)
    if not user:
        raise HTTPException(status_code=400, detail='Incorrect username or password')
    if not user['password'] == form_data.password:
        raise HTTPException(status_code=400, detail='Incorrect username or password')
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={'sub': user['username']}, expires_delta=access_token_expires
    )
    return {'access_token': access_token, 'token_type': 'bearer'}

# 保护需要验证 Token 的 API
@app.get('/protected')
async def protected_route(current_user: dict = Depends(get_current_user)):
    # 更新 Token 的过期时间
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    new_access_token = create_access_token(
        data={'sub': current_user['username']}, expires_delta=access_token_expires
    )
    return {'message': 'Hello World', 'new_access_token': new_access_token}

在上面的代码中,当用户登录成功后,服务器会为其生成一个 Token,并将 Token 的过期时间设置为当前时间加上 30 分钟。当用户访问 '/protected' API 时,服务器会检查 Token 是否已过期,如果还未过期,则更新 Token 的过期时间为当前时间加上 30 分钟,并将新的 Token 返回给用户。这样,用户就可以继续访问需要验证 Token 的 API,而无需重新登录。

FastAPI Token 有效时长更新实现方案

原文地址: https://www.cveoy.top/t/topic/nckp 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录