优化了鉴权相关的依赖注入,完成了auth api

This commit is contained in:
carry 2025-01-21 22:32:26 +08:00
parent a90838b79f
commit 1915bcfaa9
2 changed files with 41 additions and 23 deletions

View File

@ -1,17 +1,39 @@
from schemas.auth import Token from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from schemas.auth import Token, LoginRequest, RefreshTokenRequest
from services.auth_service import create_tokens, verify_token, refresh_tokens
from services.user_services import authenticate_user
from services.db import get_db
router = APIRouter(tags=["auth"]) router = APIRouter(tags=["auth"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.post("/login", response_model=Token) @router.post("/login", response_model=Token)
async def login(): async def login(
pass login_data: LoginRequest,
session: AsyncSession = Depends(get_db)
):
user = await authenticate_user(session, login_data.username, login_data.password)
if not user:
raise HTTPException(
status_code=401,
detail="Invalid username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return create_tokens(user.id, user.username, user.role)
@router.post("/logout") @router.post("/logout")
async def logout(): async def logout(token: str = Depends(oauth2_scheme)):
pass token_data = verify_token(token)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid token")
# TODO: 实现token黑名单
return {"message": "Successfully logged out"}
@router.post("/refresh", response_model=Token) @router.post("/refresh", response_model=Token)
async def refresh_token(): async def refresh_token(refresh_data: RefreshTokenRequest):
pass tokens = refresh_tokens(refresh_data.refresh_token)
if not tokens:
raise HTTPException(status_code=401, detail="Invalid refresh token")
return tokens

View File

@ -7,32 +7,28 @@ from services.auth_service import verify_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenData: async def _get_token_data(token: str) -> TokenData:
"""获取当前用户""" """验证并返回TokenData"""
token_data = verify_token(token) token_data = verify_token(token)
if token_data is None: if token_data is None:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials", detail="Invalid or expired authentication credentials",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
return token_data return token_data
async def get_current_user(token: str = Depends(oauth2_scheme)) -> TokenData:
"""获取当前用户"""
return await _get_token_data(token)
async def get_current_admin(token: str = Depends(oauth2_scheme)) -> TokenData: async def get_current_admin(token: str = Depends(oauth2_scheme)) -> TokenData:
"""获取当前用户""" """获取当前管理员用户"""
token_data = verify_token(token) token_data = await _get_token_data(token)
if token_data is None: if token_data.role not in [UserRole.SYSTEM_ADMIN, UserRole.ADMIN]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if token_data.role not in [UserRole.SYSTEM_ADMIN.value, UserRole.ADMIN.value]:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,
detail="You are not admin", detail="Access denied: Insufficient privileges for this operation",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
return token_data return token_data