From b76d72168038b469a55f1b9085fe20a137089970 Mon Sep 17 00:00:00 2001 From: carry <2641257231@qq.com> Date: Fri, 14 Feb 2025 16:59:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90refresh=20token=E8=83=BD?= =?UTF-8?q?=E8=AE=BF=E9=97=AE=E7=9A=84bug=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- routes/auth.py | 2 +- routes/depends.py | 4 ++-- services/auth.py | 24 +++++++++++++++++++++--- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/routes/auth.py b/routes/auth.py index 93016ec..3ad1498 100644 --- a/routes/auth.py +++ b/routes/auth.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession from schemas.auth import TokenResponse, LoginRequest, RefreshTokenRequest -from services.auth import create_tokens_response, verify_token, refresh_tokens +from services.auth import create_tokens_response, refresh_tokens from services.user import authenticate_user from services.db import get_db_session_dep diff --git a/routes/depends.py b/routes/depends.py index 34e0e18..9fa5edb 100644 --- a/routes/depends.py +++ b/routes/depends.py @@ -3,13 +3,13 @@ from fastapi.security import OAuth2PasswordBearer from typing import Optional from schemas.auth import TokenPayload from schemas.user import UserRole -from services.auth import verify_token +from services.auth import verify_access_token oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") async def _get_token_data(token: str) -> TokenPayload: """验证并返回TokenData""" - token_data = verify_token(token) + token_data = verify_access_token(token) if token_data is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, diff --git a/services/auth.py b/services/auth.py index 3fb46bb..32c64fd 100644 --- a/services/auth.py +++ b/services/auth.py @@ -45,10 +45,28 @@ def create_tokens_response(user_id: int, username: str, role: str) -> TokenRespo refresh_token_exp=refresh_token_exp ) -def verify_token(token: str) -> Optional[TokenPayload]: - """验证token有效性并返回payload,如果token无效则返回None""" +def verify_access_token(token: str) -> Optional[TokenPayload]: + """验证access token有效性并返回payload,如果token无效或类型不匹配则返回None""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + if payload.get("token_type") != "access": + return None + return TokenPayload( + id=payload.get("id"), + username=payload.get("username"), + role=payload.get("role"), + exp=payload.get("exp"), + token_type=payload.get("token_type") + ) + except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): + return None + +def verify_refresh_token(token: str) -> Optional[TokenPayload]: + """验证refresh token有效性并返回payload,如果token无效或类型不匹配则返回None""" + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + if payload.get("token_type") != "refresh": + return None return TokenPayload( id=payload.get("id"), username=payload.get("username"), @@ -61,7 +79,7 @@ def verify_token(token: str) -> Optional[TokenPayload]: def refresh_tokens(refresh_token: str) -> Optional[TokenResponse]: """使用refresh token刷新access token,如果refresh token无效则返回None""" - token_data = verify_token(refresh_token) + token_data = verify_refresh_token(refresh_token) if token_data is None: return None else: