simple-rbac/services/user_service.py
2025-01-21 15:06:34 +08:00

74 lines
2.9 KiB
Python

from typing import List, Optional
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from models.user import User
from schemas.user import UserCreate, UserUpdate, UserResponse
# 创建一个密码上下文对象,指定使用 bcrypt 加密算法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserService:
@staticmethod
async def create_user(session: AsyncSession, user_data: UserCreate) -> UserResponse:
"""创建用户"""
hashed_password = pwd_context.hash(user_data.password)
user = User(
username=user_data.username,
password=hashed_password,
role=user_data.role,
description=user_data.description
)
session.add(user)
await session.commit()
await session.refresh(user)
return UserResponse.from_orm(user)
@staticmethod
async def get_user(session: AsyncSession, user_id: int) -> Optional[UserResponse]:
"""根据ID获取用户"""
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
return UserResponse.from_orm(user) if user else None
@staticmethod
async def get_users(session: AsyncSession, skip: int = 0, limit: int = 100) -> List[UserResponse]:
"""获取用户列表"""
result = await session.execute(select(User).offset(skip).limit(limit))
users = result.scalars().all()
return [UserResponse.from_orm(user) for user in users]
@staticmethod
async def update_user(session: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserResponse]:
"""更新用户信息"""
await session.execute(
update(User)
.where(User.id == user_id)
.values(**user_data.dict(exclude_unset=True))
)
await session.commit()
return await UserService.get_user(session, user_id)
@staticmethod
async def delete_user(session: AsyncSession, user_id: int) -> bool:
"""删除用户"""
result = await session.execute(delete(User).where(User.id == user_id))
await session.commit()
return result.rowcount > 0
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证输入的明文密码是否与存储的哈希密码匹配"""
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
async def authenticate_user(session: AsyncSession, username: str, password: str) -> Optional[UserResponse]:
"""验证用户登录"""
result = await session.execute(select(User).where(User.username == username))
user = result.scalars().first()
if not user:
return None
if not UserService.verify_password(password, user.password):
return None
return UserResponse.from_orm(user)