|
|
@ -25,80 +25,60 @@ async def login(request: Request, form_data: CustomOAuth2PasswordRequestForm = D |
|
|
|
loginInfo=form_data.login_info, |
|
|
|
captchaEnabled=captcha_enabled |
|
|
|
) |
|
|
|
try: |
|
|
|
result = await LoginService.authenticate_user(request, query_db, user) |
|
|
|
except LoginException as e: |
|
|
|
return ResponseUtil.failure(msg=e.message) |
|
|
|
try: |
|
|
|
access_token_expires = timedelta(minutes=JwtConfig.jwt_expire_minutes) |
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
access_token = await LoginService.create_access_token( |
|
|
|
data={ |
|
|
|
"user_id": str(result[0].user_id), |
|
|
|
"user_name": result[0].user_name, |
|
|
|
"dept_name": result[1].dept_name if result[1] else None, |
|
|
|
"session_id": session_id, |
|
|
|
"login_info": user.login_info |
|
|
|
}, |
|
|
|
expires_delta=access_token_expires |
|
|
|
) |
|
|
|
if AppConfig.app_same_time_login: |
|
|
|
await request.app.state.redis.set(f"{RedisInitKeyConfig.ACCESS_TOKEN.get('key')}:{session_id}", access_token, |
|
|
|
ex=timedelta(minutes=JwtConfig.jwt_redis_expire_minutes)) |
|
|
|
else: |
|
|
|
# 此方法可实现同一账号同一时间只能登录一次 |
|
|
|
await request.app.state.redis.set(f"{RedisInitKeyConfig.ACCESS_TOKEN.get('key')}:{result[0].user_id}", access_token, |
|
|
|
ex=timedelta(minutes=JwtConfig.jwt_redis_expire_minutes)) |
|
|
|
await UserService.edit_user_services(query_db, EditUserModel(userId=result[0].user_id, loginDate=datetime.now(), type='status')) |
|
|
|
logger.info('登录成功') |
|
|
|
# 判断请求是否来自于api文档,如果是返回指定格式的结果,用于修复api文档认证成功后token显示undefined的bug |
|
|
|
request_from_swagger = request.headers.get('referer').endswith('docs') if request.headers.get('referer') else False |
|
|
|
request_from_redoc = request.headers.get('referer').endswith('redoc') if request.headers.get('referer') else False |
|
|
|
if request_from_swagger or request_from_redoc: |
|
|
|
return {'access_token': access_token, 'token_type': 'Bearer'} |
|
|
|
return ResponseUtil.success( |
|
|
|
msg='登录成功', |
|
|
|
dict_content={'token': access_token} |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
logger.exception(e) |
|
|
|
return ResponseUtil.error(msg=str(e)) |
|
|
|
result = await LoginService.authenticate_user(request, query_db, user) |
|
|
|
access_token_expires = timedelta(minutes=JwtConfig.jwt_expire_minutes) |
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
access_token = await LoginService.create_access_token( |
|
|
|
data={ |
|
|
|
"user_id": str(result[0].user_id), |
|
|
|
"user_name": result[0].user_name, |
|
|
|
"dept_name": result[1].dept_name if result[1] else None, |
|
|
|
"session_id": session_id, |
|
|
|
"login_info": user.login_info |
|
|
|
}, |
|
|
|
expires_delta=access_token_expires |
|
|
|
) |
|
|
|
if AppConfig.app_same_time_login: |
|
|
|
await request.app.state.redis.set(f"{RedisInitKeyConfig.ACCESS_TOKEN.get('key')}:{session_id}", access_token, |
|
|
|
ex=timedelta(minutes=JwtConfig.jwt_redis_expire_minutes)) |
|
|
|
else: |
|
|
|
# 此方法可实现同一账号同一时间只能登录一次 |
|
|
|
await request.app.state.redis.set(f"{RedisInitKeyConfig.ACCESS_TOKEN.get('key')}:{result[0].user_id}", access_token, |
|
|
|
ex=timedelta(minutes=JwtConfig.jwt_redis_expire_minutes)) |
|
|
|
await UserService.edit_user_services(query_db, EditUserModel(userId=result[0].user_id, loginDate=datetime.now(), type='status')) |
|
|
|
logger.info('登录成功') |
|
|
|
# 判断请求是否来自于api文档,如果是返回指定格式的结果,用于修复api文档认证成功后token显示undefined的bug |
|
|
|
request_from_swagger = request.headers.get('referer').endswith('docs') if request.headers.get('referer') else False |
|
|
|
request_from_redoc = request.headers.get('referer').endswith('redoc') if request.headers.get('referer') else False |
|
|
|
if request_from_swagger or request_from_redoc: |
|
|
|
return {'access_token': access_token, 'token_type': 'Bearer'} |
|
|
|
return ResponseUtil.success( |
|
|
|
msg='登录成功', |
|
|
|
dict_content={'token': access_token} |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@loginController.get("/getInfo", response_model=CurrentUserModel) |
|
|
|
async def get_login_user_info(request: Request, current_user: CurrentUserModel = Depends(LoginService.get_current_user)): |
|
|
|
try: |
|
|
|
logger.info('获取成功') |
|
|
|
return ResponseUtil.success(model_content=current_user) |
|
|
|
except Exception as e: |
|
|
|
logger.exception(e) |
|
|
|
return ResponseUtil.error(msg=str(e)) |
|
|
|
logger.info('获取成功') |
|
|
|
|
|
|
|
return ResponseUtil.success(model_content=current_user) |
|
|
|
|
|
|
|
|
|
|
|
@loginController.get("/getRouters") |
|
|
|
async def get_login_user_routers(request: Request, current_user: CurrentUserModel = Depends(LoginService.get_current_user), query_db: AsyncSession = Depends(get_db)): |
|
|
|
try: |
|
|
|
logger.info('获取成功') |
|
|
|
user_routers = await LoginService.get_current_user_routers(current_user.user.user_id, query_db) |
|
|
|
return ResponseUtil.success(data=user_routers) |
|
|
|
except Exception as e: |
|
|
|
logger.exception(e) |
|
|
|
return ResponseUtil.error(msg=str(e)) |
|
|
|
logger.info('获取成功') |
|
|
|
user_routers = await LoginService.get_current_user_routers(current_user.user.user_id, query_db) |
|
|
|
|
|
|
|
return ResponseUtil.success(data=user_routers) |
|
|
|
|
|
|
|
|
|
|
|
@loginController.post("/register", response_model=CrudResponseModel) |
|
|
|
async def register_user(request: Request, user_register: UserRegister, query_db: AsyncSession = Depends(get_db)): |
|
|
|
try: |
|
|
|
user_register_result = await LoginService.register_user_services(request, query_db, user_register) |
|
|
|
if user_register_result.is_success: |
|
|
|
logger.info(user_register_result.message) |
|
|
|
return ResponseUtil.success(data=user_register_result, msg=user_register_result.message) |
|
|
|
else: |
|
|
|
logger.warning(user_register_result.message) |
|
|
|
return ResponseUtil.failure(msg=user_register_result.message) |
|
|
|
except Exception as e: |
|
|
|
logger.exception(e) |
|
|
|
return ResponseUtil.error(msg=str(e)) |
|
|
|
user_register_result = await LoginService.register_user_services(request, query_db, user_register) |
|
|
|
logger.info(user_register_result.message) |
|
|
|
|
|
|
|
return ResponseUtil.success(data=user_register_result, msg=user_register_result.message) |
|
|
|
|
|
|
|
|
|
|
|
# @loginController.post("/getSmsCode", response_model=SmsCode) |
|
|
@ -133,12 +113,9 @@ async def register_user(request: Request, user_register: UserRegister, query_db: |
|
|
|
|
|
|
|
@loginController.post("/logout") |
|
|
|
async def logout(request: Request, token: Optional[str] = Depends(oauth2_scheme)): |
|
|
|
try: |
|
|
|
payload = jwt.decode(token, JwtConfig.jwt_secret_key, algorithms=[JwtConfig.jwt_algorithm], options={'verify_exp': False}) |
|
|
|
session_id: str = payload.get("session_id") |
|
|
|
await LoginService.logout_services(request, session_id) |
|
|
|
logger.info('退出成功') |
|
|
|
return ResponseUtil.success(msg="退出成功") |
|
|
|
except Exception as e: |
|
|
|
logger.exception(e) |
|
|
|
return ResponseUtil.error(msg=str(e)) |
|
|
|
payload = jwt.decode(token, JwtConfig.jwt_secret_key, algorithms=[JwtConfig.jwt_algorithm], options={'verify_exp': False}) |
|
|
|
session_id: str = payload.get("session_id") |
|
|
|
await LoginService.logout_services(request, session_id) |
|
|
|
logger.info('退出成功') |
|
|
|
|
|
|
|
return ResponseUtil.success(msg="退出成功") |
|
|
|