FastAPI Token 有效期更新:使用 JWT 刷新令牌机制
要实现 FastAPI Token 的有效期更新,可以使用 JWT(JSON Web Token)和其自带的刷新令牌机制。JWT 是一种用于安全地在两个实体之间传输信息的开放标准,它可以生成具有有效期的 Token,并且可以使用刷新令牌来延长 Token 的有效期。
下面是一个示例代码,演示如何使用 FastAPI 和 PyJWT 实现 JWT Token 的刷新功能:
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError
app = FastAPI()
# JWT 配置
JWT_SECRET = 'myjwtsecret'
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION_TIME_MINUTES = 30
JWT_REFRESH_TIME_MINUTES = 5
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')
# 模拟数据库中的用户
fake_users_db = {
'johndoe': {
'username': 'johndoe',
'full_name': 'John Doe',
'email': 'johndoe@example.com',
'hashed_password': 'fakehashedpassword',
'disabled': False,
}
}
# 创建 JWT Token
def create_access_token(data: dict, expires_delta: timedelta) -> bytes:
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
# 验证 JWT Token
async def decode_access_token(token: str):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
token_data = {'username': username}
return token_data
except PyJWTError:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
# 路由:获取 Token
@app.post('/token')
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail='Incorrect username or password')
hashed_password = user_dict['hashed_password']
if not hashed_password == form_data.password:
raise HTTPException(status_code=400, detail='Incorrect username or password')
access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
refresh_token_expires = timedelta(minutes=JWT_REFRESH_TIME_MINUTES)
access_token = create_access_token(data={'sub': user_dict['username']}, expires_delta=access_token_expires)
refresh_token = create_access_token(data={'sub': user_dict['username']}, expires_delta=refresh_token_expires)
return {'access_token': access_token, 'token_type': 'bearer', 'refresh_token': refresh_token}
# 路由:刷新 Token
@app.get('/refresh_token')
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
username: str = payload.get('sub')
if username is None:
raise HTTPException(status_code=401, detail='Invalid refresh token')
user_dict = fake_users_db.get(username)
if user_dict is None:
raise HTTPException(status_code=401, detail='Invalid refresh token')
access_token_expires = timedelta(minutes=JWT_EXPIRATION_TIME_MINUTES)
access_token = create_access_token(data={'sub': user_dict['username']}, expires_delta=access_token_expires)
return {'access_token': access_token, 'token_type': 'bearer'}
except PyJWTError:
raise HTTPException(status_code=401, detail='Invalid refresh token')
# 路由:测试需要认证的 API
@app.get('/users/me')
async def read_users_me(token: str = Depends(oauth2_scheme)):
token_data = await decode_access_token(token)
user = fake_users_db.get(token_data['username'])
if user is None:
raise HTTPException(status_code=401, detail='Invalid authentication credentials')
return user
在上面的代码中,我们使用了 PyJWT 来实现 JWT Token 的生成和解码,并使用 FastAPI 的 Depends 和 OAuth2PasswordBearer 来验证 Token。在获取 Token 时,我们生成了两个 Token:access_token 和 refresh_token,分别用于访问 API 和刷新 Token 的有效期。access_token 的有效期为 30 分钟,refresh_token 的有效期为 5 分钟。在刷新 Token 的 API 中,我们验证了 refresh_token 的有效性,并生成了新的 access_token。注意,这里的刷新 Token 并不是简单地延长 Token 的有效期,而是使用了一个全新的 Token,以增强安全性。
原文地址: https://www.cveoy.top/t/topic/ncky 著作权归作者所有。请勿转载和采集!