From ed9ad54b4657de3112cdcfbd26b014b89f9e9419 Mon Sep 17 00:00:00 2001 From: insistence <3055204202@qq.com> Date: Fri, 12 Jul 2024 11:13:01 +0800 Subject: [PATCH] =?UTF-8?q?style:=20=E4=BD=BF=E7=94=A8ruff=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96utils=E6=A8=A1=E5=9D=97=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E5=AF=BC=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-fastapi-backend/utils/common_util.py | 59 +++++++--- ruoyi-fastapi-backend/utils/log_util.py | 2 +- ruoyi-fastapi-backend/utils/message_util.py | 2 +- ruoyi-fastapi-backend/utils/page_util.py | 23 ++-- ruoyi-fastapi-backend/utils/pwd_util.py | 2 +- ruoyi-fastapi-backend/utils/response_util.py | 111 +++++++++---------- ruoyi-fastapi-backend/utils/upload_util.py | 6 +- 7 files changed, 113 insertions(+), 92 deletions(-) diff --git a/ruoyi-fastapi-backend/utils/common_util.py b/ruoyi-fastapi-backend/utils/common_util.py index 1dd8c93..67145d9 100644 --- a/ruoyi-fastapi-backend/utils/common_util.py +++ b/ruoyi-fastapi-backend/utils/common_util.py @@ -1,6 +1,6 @@ -import pandas as pd import io import os +import pandas as pd import re from openpyxl import Workbook from openpyxl.styles import Alignment, PatternFill @@ -33,7 +33,7 @@ def worship(): // ========`-.____`-.___\_____/___.-`____.-'======== // // `=---=' // // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // -// 佛祖保佑 永不宕机 永无BUG // +// 佛祖保佑 永不宕机 永无BUG // //////////////////////////////////////////////////////////////////// """) @@ -42,6 +42,7 @@ class CamelCaseUtil: """ 下划线形式(snake_case)转小驼峰形式(camelCase)工具方法 """ + @classmethod def snake_to_camel(cls, snake_str): """ @@ -68,10 +69,24 @@ class CamelCaseUtil: return {cls.snake_to_camel(k): v for k, v in result.items()} # 如果是一组字典或其他类型的列表,遍历列表进行转换 elif isinstance(result, list): - return [cls.transform_result(row) if isinstance(row, (dict, Row)) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result] + return [ + cls.transform_result(row) + if isinstance(row, (dict, Row)) + else ( + cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row + ) + for row in result + ] # 如果是sqlalchemy的Row实例,遍历Row进行转换 elif isinstance(result, Row): - return [cls.transform_result(row) if isinstance(row, dict) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result] + return [ + cls.transform_result(row) + if isinstance(row, dict) + else ( + cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row + ) + for row in result + ] # 如果是其他类型,如模型实例,先转换为字典 else: return cls.transform_result({c.name: getattr(result, c.name) for c in result.__table__.columns}) @@ -81,6 +96,7 @@ class SnakeCaseUtil: """ 小驼峰形式(camelCase)转下划线形式(snake_case)工具方法 """ + @classmethod def camel_to_snake(cls, camel_str): """ @@ -106,16 +122,30 @@ class SnakeCaseUtil: return {cls.camel_to_snake(k): v for k, v in result.items()} # 如果是一组字典或其他类型的列表,遍历列表进行转换 elif isinstance(result, list): - return [cls.transform_result(row) if isinstance(row, (dict, Row)) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result] + return [ + cls.transform_result(row) + if isinstance(row, (dict, Row)) + else ( + cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row + ) + for row in result + ] # 如果是sqlalchemy的Row实例,遍历Row进行转换 elif isinstance(result, Row): - return [cls.transform_result(row) if isinstance(row, dict) else (cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row) for row in result] + return [ + cls.transform_result(row) + if isinstance(row, dict) + else ( + cls.transform_result({c.name: getattr(row, c.name) for c in row.__table__.columns}) if row else row + ) + for row in result + ] # 如果是其他类型,如模型实例,先转换为字典 else: return cls.transform_result({c.name: getattr(result, c.name) for c in result.__table__.columns}) -def bytes2human(n, format_str="%(value).1f%(symbol)s"): +def bytes2human(n, format_str='%(value).1f%(symbol)s'): """Used by various scripts. See: http://goo.gl/zeJZl @@ -170,7 +200,7 @@ def get_excel_template(header_list: List, selector_header_list: List, option_lis headers = header_list # 设置表头背景样式为灰色,前景色为白色 - header_fill = PatternFill(start_color="ababab", end_color="ababab", fill_type="solid") + header_fill = PatternFill(start_color='ababab', end_color='ababab', fill_type='solid') # 将表头写入第一行 for col_num, header in enumerate(headers, 1): @@ -194,10 +224,11 @@ def get_excel_template(header_list: List, selector_header_list: List, option_lis for option in options: if option.get(selector_header): header_option = option.get(selector_header) - dv = DataValidation(type="list", formula1=f'"{",".join(header_option)}"') + dv = DataValidation(type='list', formula1=f'"{",".join(header_option)}"') # 设置数据有效性规则的起始单元格和结束单元格 dv.add( - f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576') + f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576' + ) # 添加数据有效性规则到工作表 ws.add_data_validation(dv) @@ -218,10 +249,10 @@ def get_filepath_from_url(url: str): :param url: 请求参数中的url参数 :return: 文件路径 """ - file_info = url.split("?")[1].split("&") - task_id = file_info[0].split("=")[1] - file_name = file_info[1].split("=")[1] - task_path = file_info[2].split("=")[1] + file_info = url.split('?')[1].split('&') + task_id = file_info[0].split('=')[1] + file_name = file_info[1].split('=')[1] + task_path = file_info[2].split('=')[1] filepath = os.path.join(CachePathConfig.PATH, task_path, task_id, file_name) return filepath diff --git a/ruoyi-fastapi-backend/utils/log_util.py b/ruoyi-fastapi-backend/utils/log_util.py index e904653..e42f393 100644 --- a/ruoyi-fastapi-backend/utils/log_util.py +++ b/ruoyi-fastapi-backend/utils/log_util.py @@ -8,4 +8,4 @@ if not os.path.exists(log_path): log_path_error = os.path.join(log_path, f'{time.strftime("%Y-%m-%d")}_error.log') -logger.add(log_path_error, rotation="50MB", encoding="utf-8", enqueue=True, compression="zip") +logger.add(log_path_error, rotation='50MB', encoding='utf-8', enqueue=True, compression='zip') diff --git a/ruoyi-fastapi-backend/utils/message_util.py b/ruoyi-fastapi-backend/utils/message_util.py index 6a21f88..3d3eb51 100644 --- a/ruoyi-fastapi-backend/utils/message_util.py +++ b/ruoyi-fastapi-backend/utils/message_util.py @@ -2,4 +2,4 @@ from utils.log_util import logger def message_service(sms_code: str): - logger.info(f"短信验证码为{sms_code}") + logger.info(f'短信验证码为{sms_code}') diff --git a/ruoyi-fastapi-backend/utils/page_util.py b/ruoyi-fastapi-backend/utils/page_util.py index d46d260..0a18c9b 100644 --- a/ruoyi-fastapi-backend/utils/page_util.py +++ b/ruoyi-fastapi-backend/utils/page_util.py @@ -1,9 +1,9 @@ import math -from typing import Optional, List -from sqlalchemy import Select, select, func -from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel, ConfigDict from pydantic.alias_generators import to_camel +from sqlalchemy import func, select, Select +from sqlalchemy.ext.asyncio import AsyncSession +from typing import Optional, List from utils.common_util import CamelCaseUtil @@ -11,6 +11,7 @@ class PageResponseModel(BaseModel): """ 列表分页查询返回模型 """ + model_config = ConfigDict(alias_generator=to_camel) rows: List = [] @@ -43,11 +44,7 @@ class PageUtil: has_next = True if math.ceil(len(data_list) / page_size) > page_num else False result = PageResponseModel( - rows=paginated_data, - pageNum=page_num, - pageSize=page_size, - total=len(data_list), - hasNext=has_next + rows=paginated_data, pageNum=page_num, pageSize=page_size, total=len(data_list), hasNext=has_next ) return result @@ -65,7 +62,7 @@ class PageUtil: """ if is_page: total = (await db.execute(select(func.count('*')).select_from(query.subquery()))).scalar() - query_result = (await db.execute(query.offset((page_num - 1) * page_size).limit(page_size))) + query_result = await db.execute(query.offset((page_num - 1) * page_size).limit(page_size)) paginated_data = [] for row in query_result: if row and len(row) == 1: @@ -78,7 +75,7 @@ class PageUtil: pageNum=page_num, pageSize=page_size, total=total, - hasNext=has_next + hasNext=has_next, ) else: query_result = await db.execute(query) @@ -110,11 +107,7 @@ def get_page_obj(data_list: List, page_num: int, page_size: int): has_next = True if math.ceil(len(data_list) / page_size) > page_num else False result = PageResponseModel( - rows=paginated_data, - pageNum=page_num, - pageSize=page_size, - total=len(data_list), - hasNext=has_next + rows=paginated_data, pageNum=page_num, pageSize=page_size, total=len(data_list), hasNext=has_next ) return result diff --git a/ruoyi-fastapi-backend/utils/pwd_util.py b/ruoyi-fastapi-backend/utils/pwd_util.py index 2e9a2f9..badaa04 100644 --- a/ruoyi-fastapi-backend/utils/pwd_util.py +++ b/ruoyi-fastapi-backend/utils/pwd_util.py @@ -1,6 +1,6 @@ from passlib.context import CryptContext -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto') class PwdUtil: diff --git a/ruoyi-fastapi-backend/utils/response_util.py b/ruoyi-fastapi-backend/utils/response_util.py index 512c939..d06a7fa 100644 --- a/ruoyi-fastapi-backend/utils/response_util.py +++ b/ruoyi-fastapi-backend/utils/response_util.py @@ -1,9 +1,9 @@ +from datetime import datetime from fastapi import status -from fastapi.responses import JSONResponse, Response, StreamingResponse from fastapi.encoders import jsonable_encoder -from typing import Any, Dict, Optional +from fastapi.responses import JSONResponse, Response, StreamingResponse from pydantic import BaseModel -from datetime import datetime +from typing import Any, Dict, Optional from config.constant import HttpStatusConstant @@ -13,8 +13,14 @@ class ResponseUtil: """ @classmethod - def success(cls, msg: str = '操作成功', data: Optional[Any] = None, rows: Optional[Any] = None, - dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response: + def success( + cls, + msg: str = '操作成功', + data: Optional[Any] = None, + rows: Optional[Any] = None, + dict_content: Optional[Dict] = None, + model_content: Optional[BaseModel] = None, + ) -> Response: """ 成功响应方法 :param msg: 可选,自定义成功响应信息 @@ -24,10 +30,7 @@ class ResponseUtil: :param model_content: 可选,BaseModel类型,成功响应结果中自定义属性的值 :return: 成功响应结果 """ - result = { - 'code': HttpStatusConstant.SUCCESS, - 'msg': msg - } + result = {'code': HttpStatusConstant.SUCCESS, 'msg': msg} if data is not None: result['data'] = data @@ -40,14 +43,17 @@ class ResponseUtil: result.update({'success': True, 'time': datetime.now()}) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=jsonable_encoder(result) - ) + return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result)) @classmethod - def failure(cls, msg: str = '操作失败', data: Optional[Any] = None, rows: Optional[Any] = None, - dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response: + def failure( + cls, + msg: str = '操作失败', + data: Optional[Any] = None, + rows: Optional[Any] = None, + dict_content: Optional[Dict] = None, + model_content: Optional[BaseModel] = None, + ) -> Response: """ 失败响应方法 :param msg: 可选,自定义失败响应信息 @@ -57,10 +63,7 @@ class ResponseUtil: :param model_content: 可选,BaseModel类型,失败响应结果中自定义属性的值 :return: 失败响应结果 """ - result = { - 'code': HttpStatusConstant.WARN, - 'msg': msg - } + result = {'code': HttpStatusConstant.WARN, 'msg': msg} if data is not None: result['data'] = data @@ -73,14 +76,17 @@ class ResponseUtil: result.update({'success': False, 'time': datetime.now()}) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=jsonable_encoder(result) - ) + return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result)) @classmethod - def unauthorized(cls, msg: str = '登录信息已过期,访问系统资源失败', data: Optional[Any] = None, rows: Optional[Any] = None, - dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response: + def unauthorized( + cls, + msg: str = '登录信息已过期,访问系统资源失败', + data: Optional[Any] = None, + rows: Optional[Any] = None, + dict_content: Optional[Dict] = None, + model_content: Optional[BaseModel] = None, + ) -> Response: """ 未认证响应方法 :param msg: 可选,自定义未认证响应信息 @@ -90,10 +96,7 @@ class ResponseUtil: :param model_content: 可选,BaseModel类型,未认证响应结果中自定义属性的值 :return: 未认证响应结果 """ - result = { - 'code': HttpStatusConstant.UNAUTHORIZED, - 'msg': msg - } + result = {'code': HttpStatusConstant.UNAUTHORIZED, 'msg': msg} if data is not None: result['data'] = data @@ -106,14 +109,17 @@ class ResponseUtil: result.update({'success': False, 'time': datetime.now()}) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=jsonable_encoder(result) - ) + return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result)) @classmethod - def forbidden(cls, msg: str = '该用户无此接口权限', data: Optional[Any] = None, rows: Optional[Any] = None, - dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response: + def forbidden( + cls, + msg: str = '该用户无此接口权限', + data: Optional[Any] = None, + rows: Optional[Any] = None, + dict_content: Optional[Dict] = None, + model_content: Optional[BaseModel] = None, + ) -> Response: """ 未认证响应方法 :param msg: 可选,自定义未认证响应信息 @@ -123,10 +129,7 @@ class ResponseUtil: :param model_content: 可选,BaseModel类型,未认证响应结果中自定义属性的值 :return: 未认证响应结果 """ - result = { - 'code': HttpStatusConstant.FORBIDDEN, - 'msg': msg - } + result = {'code': HttpStatusConstant.FORBIDDEN, 'msg': msg} if data is not None: result['data'] = data @@ -139,14 +142,17 @@ class ResponseUtil: result.update({'success': False, 'time': datetime.now()}) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=jsonable_encoder(result) - ) + return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result)) @classmethod - def error(cls, msg: str = '接口异常', data: Optional[Any] = None, rows: Optional[Any] = None, - dict_content: Optional[Dict] = None, model_content: Optional[BaseModel] = None) -> Response: + def error( + cls, + msg: str = '接口异常', + data: Optional[Any] = None, + rows: Optional[Any] = None, + dict_content: Optional[Dict] = None, + model_content: Optional[BaseModel] = None, + ) -> Response: """ 错误响应方法 :param msg: 可选,自定义错误响应信息 @@ -156,10 +162,7 @@ class ResponseUtil: :param model_content: 可选,BaseModel类型,错误响应结果中自定义属性的值 :return: 错误响应结果 """ - result = { - 'code': HttpStatusConstant.ERROR, - 'msg': msg - } + result = {'code': HttpStatusConstant.ERROR, 'msg': msg} if data is not None: result['data'] = data @@ -172,10 +175,7 @@ class ResponseUtil: result.update({'success': False, 'time': datetime.now()}) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=jsonable_encoder(result) - ) + return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result)) @classmethod def streaming(cls, *, data: Any = None): @@ -184,7 +184,4 @@ class ResponseUtil: :param data: 流式传输的内容 :return: 流式响应结果 """ - return StreamingResponse( - status_code=status.HTTP_200_OK, - content=data - ) + return StreamingResponse(status_code=status.HTTP_200_OK, content=data) diff --git a/ruoyi-fastapi-backend/utils/upload_util.py b/ruoyi-fastapi-backend/utils/upload_util.py index dc71255..99b00bb 100644 --- a/ruoyi-fastapi-backend/utils/upload_util.py +++ b/ruoyi-fastapi-backend/utils/upload_util.py @@ -1,7 +1,7 @@ -import random import os -from fastapi import UploadFile +import random from datetime import datetime +from fastapi import UploadFile from config.env import UploadConfig @@ -62,7 +62,7 @@ class UploadUtil: """ 校验文件随机码是否合法 """ - valid_code_list = [f"{i:03}" for i in range(1, 999)] + valid_code_list = [f'{i:03}' for i in range(1, 999)] if filename.rsplit('.', 1)[0][-3:] in valid_code_list: return True return False