Browse Source

style: 使用ruff格式化utils模块,优化导入

master
insistence 7 months ago
parent
commit
ed9ad54b46
  1. 57
      ruoyi-fastapi-backend/utils/common_util.py
  2. 2
      ruoyi-fastapi-backend/utils/log_util.py
  3. 2
      ruoyi-fastapi-backend/utils/message_util.py
  4. 23
      ruoyi-fastapi-backend/utils/page_util.py
  5. 2
      ruoyi-fastapi-backend/utils/pwd_util.py
  6. 111
      ruoyi-fastapi-backend/utils/response_util.py
  7. 6
      ruoyi-fastapi-backend/utils/upload_util.py

57
ruoyi-fastapi-backend/utils/common_util.py

@ -1,6 +1,6 @@
import pandas as pd
import io
import os
import pandas as pd
import re
from openpyxl import Workbook
from openpyxl.styles import Alignment, PatternFill
@ -42,6 +42,7 @@ class CamelCaseUtil:
"""
下划线形式(snake_case)转小驼峰形式(camelCase)工具方法
"""
@classmethod
def snake_to_camel(cls, snake_str):
"""
@ -68,10 +69,24 @@ class CamelCaseUtil:
return {cls.snake_to_camel(k): v for k, v in result.items()}
# 如果是一组字典或其他类型的列表,遍历列表进行转换
elif isinstance(result, list):
return [cls.transform_result(row) if isinstance(row, (dict, Row)) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result]
return [
cls.transform_result(row)
if isinstance(row, (dict, Row))
else (
cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row
)
for row in result
]
# 如果是sqlalchemy的Row实例,遍历Row进行转换
elif isinstance(result, Row):
return [cls.transform_result(row) if isinstance(row, dict) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result]
return [
cls.transform_result(row)
if isinstance(row, dict)
else (
cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row
)
for row in result
]
# 如果是其他类型,如模型实例,先转换为字典
else:
return cls.transform_result({c.name: getattr(result, c.name) for c in result.__table__.columns})
@ -81,6 +96,7 @@ class SnakeCaseUtil:
"""
小驼峰形式(camelCase)转下划线形式(snake_case)工具方法
"""
@classmethod
def camel_to_snake(cls, camel_str):
"""
@ -106,16 +122,30 @@ class SnakeCaseUtil:
return {cls.camel_to_snake(k): v for k, v in result.items()}
# 如果是一组字典或其他类型的列表,遍历列表进行转换
elif isinstance(result, list):
return [cls.transform_result(row) if isinstance(row, (dict, Row)) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result]
return [
cls.transform_result(row)
if isinstance(row, (dict, Row))
else (
cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row
)
for row in result
]
# 如果是sqlalchemy的Row实例,遍历Row进行转换
elif isinstance(result, Row):
return [cls.transform_result(row) if isinstance(row, dict) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result]
return [
cls.transform_result(row)
if isinstance(row, dict)
else (
cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row
)
for row in result
]
# 如果是其他类型,如模型实例,先转换为字典
else:
return cls.transform_result({c.name: getattr(result, c.name) for c in result.__table__.columns})
def bytes2human(n, format_str="%(value).1f%(symbol)s"):
def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See:
http://goo.gl/zeJZl
@ -170,7 +200,7 @@ def get_excel_template(header_list: List, selector_header_list: List, option_lis
headers = header_list
# 设置表头背景样式为灰色,前景色为白色
header_fill = PatternFill(start_color="ababab", end_color="ababab", fill_type="solid")
header_fill = PatternFill(start_color='ababab', end_color='ababab', fill_type='solid')
# 将表头写入第一行
for col_num, header in enumerate(headers, 1):
@ -194,10 +224,11 @@ def get_excel_template(header_list: List, selector_header_list: List, option_lis
for option in options:
if option.get(selector_header):
header_option = option.get(selector_header)
dv = DataValidation(type="list", formula1=f'"{",".join(header_option)}"')
dv = DataValidation(type='list', formula1=f'"{",".join(header_option)}"')
# 设置数据有效性规则的起始单元格和结束单元格
dv.add(
f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576')
f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576'
)
# 添加数据有效性规则到工作表
ws.add_data_validation(dv)
@ -218,10 +249,10 @@ def get_filepath_from_url(url: str):
:param url: 请求参数中的url参数
:return: 文件路径
"""
file_info = url.split("?")[1].split("&")
task_id = file_info[0].split("=")[1]
file_name = file_info[1].split("=")[1]
task_path = file_info[2].split("=")[1]
file_info = url.split('?')[1].split('&')
task_id = file_info[0].split('=')[1]
file_name = file_info[1].split('=')[1]
task_path = file_info[2].split('=')[1]
filepath = os.path.join(CachePathConfig.PATH, task_path, task_id, file_name)
return filepath

2
ruoyi-fastapi-backend/utils/log_util.py

@ -8,4 +8,4 @@ if not os.path.exists(log_path):
log_path_error = os.path.join(log_path, f'{time.strftime("%Y-%m-%d")}_error.log')
logger.add(log_path_error, rotation="50MB", encoding="utf-8", enqueue=True, compression="zip")
logger.add(log_path_error, rotation='50MB', encoding='utf-8', enqueue=True, compression='zip')

2
ruoyi-fastapi-backend/utils/message_util.py

@ -2,4 +2,4 @@ from utils.log_util import logger
def message_service(sms_code: str):
logger.info(f"短信验证码为{sms_code}")
logger.info(f'短信验证码为{sms_code}')

23
ruoyi-fastapi-backend/utils/page_util.py

@ -1,9 +1,9 @@
import math
from typing import Optional, List
from sqlalchemy import Select, select, func
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, ConfigDict
from pydantic.alias_generators import to_camel
from sqlalchemy import func, select, Select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional, List
from utils.common_util import CamelCaseUtil
@ -11,6 +11,7 @@ class PageResponseModel(BaseModel):
"""
列表分页查询返回模型
"""
model_config = ConfigDict(alias_generator=to_camel)
rows: List = []
@ -43,11 +44,7 @@ class PageUtil:
has_next = True if math.ceil(len(data_list) / page_size) > page_num else False
result = PageResponseModel(
rows=paginated_data,
pageNum=page_num,
pageSize=page_size,
total=len(data_list),
hasNext=has_next
rows=paginated_data, pageNum=page_num, pageSize=page_size, total=len(data_list), hasNext=has_next
)
return result
@ -65,7 +62,7 @@ class PageUtil:
"""
if is_page:
total = (await db.execute(select(func.count('*')).select_from(query.subquery()))).scalar()
query_result = (await db.execute(query.offset((page_num - 1) * page_size).limit(page_size)))
query_result = await db.execute(query.offset((page_num - 1) * page_size).limit(page_size))
paginated_data = []
for row in query_result:
if row and len(row) == 1:
@ -78,7 +75,7 @@ class PageUtil:
pageNum=page_num,
pageSize=page_size,
total=total,
hasNext=has_next
hasNext=has_next,
)
else:
query_result = await db.execute(query)
@ -110,11 +107,7 @@ def get_page_obj(data_list: List, page_num: int, page_size: int):
has_next = True if math.ceil(len(data_list) / page_size) > page_num else False
result = PageResponseModel(
rows=paginated_data,
pageNum=page_num,
pageSize=page_size,
total=len(data_list),
hasNext=has_next
rows=paginated_data, pageNum=page_num, pageSize=page_size, total=len(data_list), hasNext=has_next
)
return result

2
ruoyi-fastapi-backend/utils/pwd_util.py

@ -1,6 +1,6 @@
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
class PwdUtil:

111
ruoyi-fastapi-backend/utils/response_util.py

@ -1,9 +1,9 @@
from datetime import datetime
from fastapi import status
from fastapi.responses import JSONResponse, Response, StreamingResponse
from fastapi.encoders import jsonable_encoder
from typing import Any, Dict, Optional
from fastapi.responses import JSONResponse, Response, StreamingResponse
from pydantic import BaseModel
from datetime import datetime
from typing import Any, Dict, Optional
from config.constant import HttpStatusConstant
@ -13,8 +13,14 @@ class ResponseUtil:
"""
@classmethod
def success(cls, msg: str = '操作成功', data: Optional[Any] = None, rows: Optional[Any] = None,
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
def success(
cls,
msg: str = '操作成功',
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> Response:
"""
成功响应方法
:param msg: 可选自定义成功响应信息
@ -24,10 +30,7 @@ class ResponseUtil:
:param model_content: 可选BaseModel类型成功响应结果中自定义属性的值
:return: 成功响应结果
"""
result = {
'code': HttpStatusConstant.SUCCESS,
'msg': msg
}
result = {'code': HttpStatusConstant.SUCCESS, 'msg': msg}
if data is not None:
result['data'] = data
@ -40,14 +43,17 @@ class ResponseUtil:
result.update({'success': True, 'time': datetime.now()})
return JSONResponse(
status_code=status.HTTP_200_OK,
content=jsonable_encoder(result)
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def failure(cls, msg: str = '操作失败', data: Optional[Any] = None, rows: Optional[Any] = None,
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
def failure(
cls,
msg: str = '操作失败',
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> Response:
"""
失败响应方法
:param msg: 可选自定义失败响应信息
@ -57,10 +63,7 @@ class ResponseUtil:
:param model_content: 可选BaseModel类型失败响应结果中自定义属性的值
:return: 失败响应结果
"""
result = {
'code': HttpStatusConstant.WARN,
'msg': msg
}
result = {'code': HttpStatusConstant.WARN, 'msg': msg}
if data is not None:
result['data'] = data
@ -73,14 +76,17 @@ class ResponseUtil:
result.update({'success': False, 'time': datetime.now()})
return JSONResponse(
status_code=status.HTTP_200_OK,
content=jsonable_encoder(result)
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def unauthorized(cls, msg: str = '登录信息已过期,访问系统资源失败', data: Optional[Any] = None, rows: Optional[Any] = None,
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
def unauthorized(
cls,
msg: str = '登录信息已过期,访问系统资源失败',
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> Response:
"""
未认证响应方法
:param msg: 可选自定义未认证响应信息
@ -90,10 +96,7 @@ class ResponseUtil:
:param model_content: 可选BaseModel类型未认证响应结果中自定义属性的值
:return: 未认证响应结果
"""
result = {
'code': HttpStatusConstant.UNAUTHORIZED,
'msg': msg
}
result = {'code': HttpStatusConstant.UNAUTHORIZED, 'msg': msg}
if data is not None:
result['data'] = data
@ -106,14 +109,17 @@ class ResponseUtil:
result.update({'success': False, 'time': datetime.now()})
return JSONResponse(
status_code=status.HTTP_200_OK,
content=jsonable_encoder(result)
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def forbidden(cls, msg: str = '该用户无此接口权限', data: Optional[Any] = None, rows: Optional[Any] = None,
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
def forbidden(
cls,
msg: str = '该用户无此接口权限',
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> Response:
"""
未认证响应方法
:param msg: 可选自定义未认证响应信息
@ -123,10 +129,7 @@ class ResponseUtil:
:param model_content: 可选BaseModel类型未认证响应结果中自定义属性的值
:return: 未认证响应结果
"""
result = {
'code': HttpStatusConstant.FORBIDDEN,
'msg': msg
}
result = {'code': HttpStatusConstant.FORBIDDEN, 'msg': msg}
if data is not None:
result['data'] = data
@ -139,14 +142,17 @@ class ResponseUtil:
result.update({'success': False, 'time': datetime.now()})
return JSONResponse(
status_code=status.HTTP_200_OK,
content=jsonable_encoder(result)
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def error(cls, msg: str = '接口异常', data: Optional[Any] = None, rows: Optional[Any] = None,
dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response:
def error(
cls,
msg: str = '接口异常',
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> Response:
"""
错误响应方法
:param msg: 可选自定义错误响应信息
@ -156,10 +162,7 @@ class ResponseUtil:
:param model_content: 可选BaseModel类型错误响应结果中自定义属性的值
:return: 错误响应结果
"""
result = {
'code': HttpStatusConstant.ERROR,
'msg': msg
}
result = {'code': HttpStatusConstant.ERROR, 'msg': msg}
if data is not None:
result['data'] = data
@ -172,10 +175,7 @@ class ResponseUtil:
result.update({'success': False, 'time': datetime.now()})
return JSONResponse(
status_code=status.HTTP_200_OK,
content=jsonable_encoder(result)
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def streaming(cls, *, data: Any = None):
@ -184,7 +184,4 @@ class ResponseUtil:
:param data: 流式传输的内容
:return: 流式响应结果
"""
return StreamingResponse(
status_code=status.HTTP_200_OK,
content=data
)
return StreamingResponse(status_code=status.HTTP_200_OK, content=data)

6
ruoyi-fastapi-backend/utils/upload_util.py

@ -1,7 +1,7 @@
import random
import os
from fastapi import UploadFile
import random
from datetime import datetime
from fastapi import UploadFile
from config.env import UploadConfig
@ -62,7 +62,7 @@ class UploadUtil:
"""
校验文件随机码是否合法
"""
valid_code_list = [f"{i:03}" for i in range(1, 999)]
valid_code_list = [f'{i:03}' for i in range(1, 999)]
if filename.rsplit('.', 1)[0][-3:] in valid_code_list:
return True
return False

Loading…
Cancel
Save