Browse Source

chore: 使用PyJWT替换python-jose以解决一些安全性问题

master
insistence 7 months ago
parent
commit
42269f78c3
  1. 2
      ruoyi-fastapi-backend/module_admin/controller/login_controller.py
  2. 13
      ruoyi-fastapi-backend/module_admin/service/login_service.py
  3. 2
      ruoyi-fastapi-backend/module_admin/service/online_service.py
  4. 2
      ruoyi-fastapi-backend/requirements.txt

2
ruoyi-fastapi-backend/module_admin/controller/login_controller.py

@ -1,7 +1,7 @@
import jwt
import uuid
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, Request
from jose import jwt
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from config.enums import BusinessType, RedisInitKeyConfig

13
ruoyi-fastapi-backend/module_admin/service/login_service.py

@ -1,9 +1,10 @@
import jwt
import random
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from fastapi import Depends, Form, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from jwt.exceptions import InvalidTokenError
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Dict, List, Optional, Union
from config.constant import CommonConstant, MenuConstant
@ -172,9 +173,9 @@ class LoginService:
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=30)
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, JwtConfig.jwt_secret_key, algorithm=JwtConfig.jwt_algorithm)
return encoded_jwt
@ -201,11 +202,11 @@ class LoginService:
payload = jwt.decode(token, JwtConfig.jwt_secret_key, algorithms=[JwtConfig.jwt_algorithm])
user_id: str = payload.get('user_id')
session_id: str = payload.get('session_id')
if user_id is None:
if not user_id:
logger.warning('用户token不合法')
raise AuthException(data='', message='用户token不合法')
token_data = TokenData(user_id=int(user_id))
except JWTError:
except InvalidTokenError:
logger.warning('用户token已失效,请重新登录')
raise AuthException(data='', message='用户token已失效,请重新登录')
query_user = await UserDao.get_user_by_id(query_db, user_id=token_data.user_id)

2
ruoyi-fastapi-backend/module_admin/service/online_service.py

@ -1,5 +1,5 @@
import jwt
from fastapi import Request
from jose import jwt
from config.enums import RedisInitKeyConfig
from config.env import JwtConfig
from exceptions.exception import ServiceException

2
ruoyi-fastapi-backend/requirements.txt

@ -9,8 +9,8 @@ passlib[bcrypt]==1.7.4
Pillow==10.4.0
psutil==6.0.0
pydantic-validation-decorator==0.1.2
PyJWT[crypto]==2.8.0
PyMySQL==1.1.1
python-jose[cryptography]==3.3.0
redis==5.0.7
requests==2.32.3
SQLAlchemy[asyncio]==2.0.31

Loading…
Cancel
Save