重构了鉴权服务,重构payload

This commit is contained in:
carry
2025-01-21 23:38:52 +08:00
parent fc547eebe5
commit 2f1cf11d91
5 changed files with 50 additions and 49 deletions

View File

@@ -1,17 +1,22 @@
from datetime import datetime
from typing import Optional
import jwt
import time
from config import JWT_CONFIG
from schemas.auth import Token, TokenData
from schemas.auth import TokenResponse, TokenPayload
SECRET_KEY = JWT_CONFIG['secret_key']
ALGORITHM = JWT_CONFIG['algorithm']
ACCESS_TOKEN_EXPIRE = JWT_CONFIG['access_token_expire']
REFRESH_TOKEN_EXPIRE = JWT_CONFIG['refresh_token_expire']
def create_access_token(user_id: int, username: str, role: str) -> str:
"""创建access token"""
expire = datetime.utcnow() + ACCESS_TOKEN_EXPIRE
def get_current_time() -> int:
"""获取当前UTC时间戳"""
return int(time.time())
def create_token(user_id: int, username: str, role: str, expire_delta: int) -> str:
"""创建JWT token"""
expire = get_current_time() + expire_delta
to_encode = {
"id": user_id,
"username": username,
@@ -20,33 +25,36 @@ def create_access_token(user_id: int, username: str, role: str) -> str:
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_access_token(user_id: int, username: str, role: str) -> str:
"""创建access token"""
return create_token(user_id, username, role, ACCESS_TOKEN_EXPIRE)
def create_refresh_token(user_id: int, username: str, role: str) -> str:
"""创建refresh token"""
expire = datetime.utcnow() + REFRESH_TOKEN_EXPIRE
to_encode = {
"id": user_id,
"username": username,
"role": role,
"exp": expire
}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return create_token(user_id, username, role, REFRESH_TOKEN_EXPIRE)
def create_tokens(user_id: int, username: str, role: str) -> Token:
def create_tokens_response(user_id: int, username: str, role: str) -> TokenResponse:
"""创建access token和refresh token"""
access_token = create_access_token(user_id, username, role)
refresh_token = create_refresh_token(user_id, username, role)
return Token(
# 获取token的过期时间
access_token_exp = get_current_time() + int(ACCESS_TOKEN_EXPIRE.total_seconds())
refresh_token_exp = get_current_time() + int(REFRESH_TOKEN_EXPIRE.total_seconds())
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=int(ACCESS_TOKEN_EXPIRE.total_seconds())
access_token_exp=access_token_exp,
refresh_token_exp=refresh_token_exp
)
def verify_token(token: str) -> Optional[TokenData]:
def verify_token(token: str) -> Optional[TokenPayload]:
"""验证token有效性并返回payload如果token无效则返回None"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return TokenData(
return TokenPayload(
id=payload.get("id"),
username=payload.get("username"),
role=payload.get("role"),
@@ -55,13 +63,14 @@ def verify_token(token: str) -> Optional[TokenData]:
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def refresh_tokens(refresh_token: str) -> Optional[Token]:
def refresh_tokens(refresh_token: str) -> Optional[TokenResponse]:
"""使用refresh token刷新access token如果refresh token无效则返回None"""
token_data = verify_token(refresh_token)
if token_data is None:
return None
return create_tokens(
user_id=token_data.id,
username=token_data.username,
role=token_data.role
)
else:
return create_tokens_response(
user_id=token_data.id,
username=token_data.username,
role=token_data.role
)