|  |  |  | import io | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import pandas as pd | 
					
						
							|  |  |  | import re | 
					
						
							|  |  |  | from openpyxl import Workbook | 
					
						
							|  |  |  | from openpyxl.styles import Alignment, PatternFill | 
					
						
							|  |  |  | from openpyxl.utils import get_column_letter | 
					
						
							|  |  |  | from openpyxl.worksheet.datavalidation import DataValidation | 
					
						
							|  |  |  | from sqlalchemy.engine.row import Row | 
					
						
							|  |  |  | from typing import Any, Dict, List, Literal, Union | 
					
						
							|  |  |  | from config.database import Base | 
					
						
							|  |  |  | from config.env import CachePathConfig | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def worship(): | 
					
						
							|  |  |  |     print("""
 | 
					
						
							|  |  |  | //////////////////////////////////////////////////////////////////// | 
					
						
							|  |  |  | //                          _ooOoo_                               // | 
					
						
							|  |  |  | //                         o8888888o                              // | 
					
						
							|  |  |  | //                         88" . "88                              // | 
					
						
							|  |  |  | //                         (| ^_^ |)                              // | 
					
						
							|  |  |  | //                         O\  =  /O                              // | 
					
						
							|  |  |  | //                      ____/`---'\____                           // | 
					
						
							|  |  |  | //                    .'  \\|     |//  `.                         // | 
					
						
							|  |  |  | //                   /  \\|||  :  |||//  \                        // | 
					
						
							|  |  |  | //                  /  _||||| -:- |||||-  \                       // | 
					
						
							|  |  |  | //                  |   | \\\  -  /// |   |                       // | 
					
						
							|  |  |  | //                  | \_|  ''\---/''  |   |                       // | 
					
						
							|  |  |  | //                  \  .-\__  `-`  ___/-. /                       // | 
					
						
							|  |  |  | //                ___`. .'  /--.--\  `. . ___                     // | 
					
						
							|  |  |  | //              ."" '<  `.___\_<|>_/___.'  >'"".                  // | 
					
						
							|  |  |  | //            | | :  `- \`.;`\ _ /`;.`/ - ` : | |                 // | 
					
						
							|  |  |  | //            \  \ `-.   \_ __\ /__ _/   .-` /  /                 // | 
					
						
							|  |  |  | //      ========`-.____`-.___\_____/___.-`____.-'========         // | 
					
						
							|  |  |  | //                           `=---='                              // | 
					
						
							|  |  |  | //      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^        // | 
					
						
							|  |  |  | //             佛祖保佑       永不宕机      永无BUG                 // | 
					
						
							|  |  |  | //////////////////////////////////////////////////////////////////// | 
					
						
							|  |  |  |     """)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SqlalchemyUtil: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     sqlalchemy工具类 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def base_to_dict( | 
					
						
							|  |  |  |             cls, obj: Union[Base, Dict], | 
					
						
							|  |  |  |             transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case' | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         将sqlalchemy模型对象转换为字典 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param obj: sqlalchemy模型对象或普通字典 | 
					
						
							|  |  |  |         :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case' | 
					
						
							|  |  |  |         :return: 字典结果 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         if isinstance(obj, Base): | 
					
						
							|  |  |  |             base_dict = obj.__dict__.copy() | 
					
						
							|  |  |  |             base_dict.pop('_sa_instance_state', None) | 
					
						
							|  |  |  |         elif isinstance(obj, dict): | 
					
						
							|  |  |  |             base_dict = obj.copy() | 
					
						
							|  |  |  |         if transform_case == 'snake_to_camel': | 
					
						
							|  |  |  |             return {CamelCaseUtil.snake_to_camel(k): v for k, v in base_dict.items()} | 
					
						
							|  |  |  |         elif transform_case == 'camel_to_snake': | 
					
						
							|  |  |  |             return {SnakeCaseUtil.camel_to_snake(k): v for k, v in base_dict.items()} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return base_dict | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def serialize_result( | 
					
						
							|  |  |  |             cls, result: Any, transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case' | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         将sqlalchemy查询结果序列化 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param result: sqlalchemy查询结果 | 
					
						
							|  |  |  |         :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case' | 
					
						
							|  |  |  |         :return: 序列化结果 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         if isinstance(result, (Base, dict)): | 
					
						
							|  |  |  |             return cls.base_to_dict(result, transform_case) | 
					
						
							|  |  |  |         elif isinstance(result, list): | 
					
						
							|  |  |  |             return [cls.serialize_result(row, transform_case) for row in result] | 
					
						
							|  |  |  |         elif isinstance(result, Row): | 
					
						
							|  |  |  |             if all([isinstance(row, Base) for row in result]): | 
					
						
							|  |  |  |                 return [cls.base_to_dict(row, transform_case) for row in result] | 
					
						
							|  |  |  |             elif any([isinstance(row, Base) for row in result]): | 
					
						
							|  |  |  |                 return [cls.serialize_result(row, transform_case) for row in result] | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 result_dict = result._asdict() | 
					
						
							|  |  |  |                 if transform_case == 'snake_to_camel': | 
					
						
							|  |  |  |                     return {CamelCaseUtil.snake_to_camel(k): v for k, v in result_dict.items()} | 
					
						
							|  |  |  |                 elif transform_case == 'camel_to_snake': | 
					
						
							|  |  |  |                     return {SnakeCaseUtil.camel_to_snake(k): v for k, v in result_dict.items()} | 
					
						
							|  |  |  |                 return result_dict | 
					
						
							|  |  |  |         return result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CamelCaseUtil: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     下划线形式(snake_case)转小驼峰形式(camelCase)工具方法 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def snake_to_camel(cls, snake_str: str): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         下划线形式字符串(snake_case)转换为小驼峰形式字符串(camelCase) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param snake_str: 下划线形式字符串 | 
					
						
							|  |  |  |         :return: 小驼峰形式字符串 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         # 分割字符串 | 
					
						
							|  |  |  |         words = snake_str.split('_') | 
					
						
							|  |  |  |         # 小驼峰命名,第一个词首字母小写,其余词首字母大写 | 
					
						
							|  |  |  |         return words[0] + ''.join(word.capitalize() for word in words[1:]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def transform_result(cls, result: Any): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param result: 输入数据 | 
					
						
							|  |  |  |         :return: 小驼峰形式结果 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         return SqlalchemyUtil.serialize_result(result=result, transform_case='snake_to_camel') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class SnakeCaseUtil: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     小驼峰形式(camelCase)转下划线形式(snake_case)工具方法 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def camel_to_snake(cls, camel_str: str): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         小驼峰形式字符串(camelCase)转换为下划线形式字符串(snake_case) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param camel_str: 小驼峰形式字符串 | 
					
						
							|  |  |  |         :return: 下划线形式字符串 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         # 在大写字母前添加一个下划线,然后将整个字符串转为小写 | 
					
						
							|  |  |  |         words = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str) | 
					
						
							|  |  |  |         return re.sub('([a-z0-9])([A-Z])', r'\1_\2', words).lower() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @classmethod | 
					
						
							|  |  |  |     def transform_result(cls, result: Any): | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         :param result: 输入数据 | 
					
						
							|  |  |  |         :return: 小驼峰形式结果 | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         return SqlalchemyUtil.serialize_result(result=result, transform_case='camel_to_snake') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def bytes2human(n, format_str='%(value).1f%(symbol)s'): | 
					
						
							|  |  |  |     """Used by various scripts. See:
 | 
					
						
							|  |  |  |     http://goo.gl/zeJZl | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     >>> bytes2human(10000) | 
					
						
							|  |  |  |     '9.8K' | 
					
						
							|  |  |  |     >>> bytes2human(100001221) | 
					
						
							|  |  |  |     '95.4M' | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     symbols = ('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') | 
					
						
							|  |  |  |     prefix = {} | 
					
						
							|  |  |  |     for i, s in enumerate(symbols[1:]): | 
					
						
							|  |  |  |         prefix[s] = 1 << (i + 1) * 10 | 
					
						
							|  |  |  |     for symbol in reversed(symbols[1:]): | 
					
						
							|  |  |  |         if n >= prefix[symbol]: | 
					
						
							|  |  |  |             value = float(n) / prefix[symbol] | 
					
						
							|  |  |  |             return format_str % locals() | 
					
						
							|  |  |  |     return format_str % dict(symbol=symbols[0], value=n) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def bytes2file_response(bytes_info): | 
					
						
							|  |  |  |     yield bytes_info | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def export_list2excel(list_data: List): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     工具方法:将需要导出的list数据转化为对应excel的二进制数据 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     :param list_data: 数据列表 | 
					
						
							|  |  |  |     :return: 字典信息对应excel的二进制数据 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     df = pd.DataFrame(list_data) | 
					
						
							|  |  |  |     binary_data = io.BytesIO() | 
					
						
							|  |  |  |     df.to_excel(binary_data, index=False, engine='openpyxl') | 
					
						
							|  |  |  |     binary_data = binary_data.getvalue() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return binary_data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_excel_template(header_list: List, selector_header_list: List, option_list: List[dict]): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     工具方法:将需要导出的list数据转化为对应excel的二进制数据 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     :param header_list: 表头数据列表 | 
					
						
							|  |  |  |     :param selector_header_list: 需要设置为选择器格式的表头数据列表 | 
					
						
							|  |  |  |     :param option_list: 选择器格式的表头预设的选项列表 | 
					
						
							|  |  |  |     :return: 模板excel的二进制数据 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     # 创建Excel工作簿 | 
					
						
							|  |  |  |     wb = Workbook() | 
					
						
							|  |  |  |     # 选择默认的活动工作表 | 
					
						
							|  |  |  |     ws = wb.active | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 设置表头文字 | 
					
						
							|  |  |  |     headers = header_list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 设置表头背景样式为灰色,前景色为白色 | 
					
						
							|  |  |  |     header_fill = PatternFill(start_color='ababab', end_color='ababab', fill_type='solid') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 将表头写入第一行 | 
					
						
							|  |  |  |     for col_num, header in enumerate(headers, 1): | 
					
						
							|  |  |  |         cell = ws.cell(row=1, column=col_num) | 
					
						
							|  |  |  |         cell.value = header | 
					
						
							|  |  |  |         cell.fill = header_fill | 
					
						
							|  |  |  |         # 设置列宽度为16 | 
					
						
							|  |  |  |         ws.column_dimensions[chr(64 + col_num)].width = 12 | 
					
						
							|  |  |  |         # 设置水平居中对齐 | 
					
						
							|  |  |  |         cell.alignment = Alignment(horizontal='center') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 设置选择器的预设选项 | 
					
						
							|  |  |  |     options = option_list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 获取selector_header的字母索引 | 
					
						
							|  |  |  |     for selector_header in selector_header_list: | 
					
						
							|  |  |  |         column_selector_header_index = headers.index(selector_header) + 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # 创建数据有效性规则 | 
					
						
							|  |  |  |         header_option = [] | 
					
						
							|  |  |  |         for option in options: | 
					
						
							|  |  |  |             if option.get(selector_header): | 
					
						
							|  |  |  |                 header_option = option.get(selector_header) | 
					
						
							|  |  |  |         dv = DataValidation(type='list', formula1=f'"{",".join(header_option)}"') | 
					
						
							|  |  |  |         # 设置数据有效性规则的起始单元格和结束单元格 | 
					
						
							|  |  |  |         dv.add( | 
					
						
							|  |  |  |             f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576' | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         # 添加数据有效性规则到工作表 | 
					
						
							|  |  |  |         ws.add_data_validation(dv) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 保存Excel文件为字节类型的数据 | 
					
						
							|  |  |  |     file = io.BytesIO() | 
					
						
							|  |  |  |     wb.save(file) | 
					
						
							|  |  |  |     file.seek(0) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 读取字节数据 | 
					
						
							|  |  |  |     excel_data = file.getvalue() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return excel_data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_excel_template_with_sheets(sheet_configs: List[Dict],  # 每个Sheet的配置(包含表头、选择器等) | 
					
						
							|  |  |  |                                    option_list: List[dict]  # 全局选择器选项(所有Sheet共享) | 
					
						
							|  |  |  |                                    ): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     生成多Sheet页的Excel模板(支持下拉选择器) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     :param sheet_configs: 每个Sheet的配置,格式示例: | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "sheet_name": "Sheet1",                  # Sheet名称 | 
					
						
							|  |  |  |                 "header_list": ["姓名", "性别", "城市"],  # 表头 | 
					
						
							|  |  |  |                 "selector_header_list": ["性别", "城市"], # 需要下拉选择的表头 | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "sheet_name": "Sheet2", | 
					
						
							|  |  |  |                 "header_list": ["产品", "类别", "价格"], | 
					
						
							|  |  |  |                 "selector_header_list": ["类别"], | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |     :param option_list: 下拉选项配置,格式示例: | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             {"性别": ["男", "女"], "城市": ["北京", "上海"]},  # Sheet1的下拉选项 | 
					
						
							|  |  |  |             {"类别": ["电子产品", "食品", "服装"]}            # Sheet2的下拉选项 | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  |     :return: Excel文件的二进制数据 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     wb = Workbook() | 
					
						
							|  |  |  |     # 删除默认创建的Sheet(如果不需要) | 
					
						
							|  |  |  |     if "Sheet" in wb.sheetnames: | 
					
						
							|  |  |  |         wb.remove(wb["Sheet"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 遍历每个Sheet的配置 | 
					
						
							|  |  |  |     for sheet_config in sheet_configs: | 
					
						
							|  |  |  |         sheet_name = sheet_config["sheet_name"] | 
					
						
							|  |  |  |         header_list = sheet_config["header_list"] | 
					
						
							|  |  |  |         selector_header_list = sheet_config["selector_header_list"] | 
					
						
							|  |  |  |         # 创建Sheet | 
					
						
							|  |  |  |         ws = wb.create_sheet(title=sheet_name) | 
					
						
							|  |  |  |         # 设置表头样式 | 
					
						
							|  |  |  |         header_fill = PatternFill(start_color="ababab", end_color="ababab", fill_type="solid") | 
					
						
							|  |  |  |         # 写入表头 | 
					
						
							|  |  |  |         for col_num, header in enumerate(header_list, 1): | 
					
						
							|  |  |  |             cell = ws.cell(row=1, column=col_num) | 
					
						
							|  |  |  |             cell.value = header | 
					
						
							|  |  |  |             cell.fill = header_fill | 
					
						
							|  |  |  |             cell.alignment = Alignment(horizontal="center") | 
					
						
							|  |  |  |             ws.column_dimensions[get_column_letter(col_num)].width = 12 | 
					
						
							|  |  |  |         # 设置下拉选择器 | 
					
						
							|  |  |  |         for selector_header in selector_header_list: | 
					
						
							|  |  |  |             if selector_header not in header_list: | 
					
						
							|  |  |  |                 continue  # 防止配置错误 | 
					
						
							|  |  |  |             column_index = header_list.index(selector_header) + 1 | 
					
						
							|  |  |  |             column_letter = get_column_letter(column_index) | 
					
						
							|  |  |  |             # 从option_list中获取当前Sheet的选项(假设option_list顺序与sheet_configs一致) | 
					
						
							|  |  |  |             sheet_options = option_list[sheet_configs.index(sheet_config)] | 
					
						
							|  |  |  |             selector_options = sheet_options.get(selector_header, []) | 
					
						
							|  |  |  |             if selector_options: | 
					
						
							|  |  |  |                 dv = DataValidation( | 
					
						
							|  |  |  |                     type="list", | 
					
						
							|  |  |  |                     formula1=f'"{",".join(selector_options)}"',  # 下拉选项(逗号分隔) | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  |                 # 应用到整个列(从第2行开始) | 
					
						
							|  |  |  |                 dv.add(f"{column_letter}2:{column_letter}1048576") | 
					
						
							|  |  |  |                 ws.add_data_validation(dv) | 
					
						
							|  |  |  |     # 保存为二进制数据 | 
					
						
							|  |  |  |     file = io.BytesIO() | 
					
						
							|  |  |  |     wb.save(file) | 
					
						
							|  |  |  |     file.seek(0) | 
					
						
							|  |  |  |     # 读取字节数据 | 
					
						
							|  |  |  |     excel_data = file.getvalue() | 
					
						
							|  |  |  |     return excel_data | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_filepath_from_url(url: str): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     工具方法:根据请求参数获取文件路径 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     :param url: 请求参数中的url参数 | 
					
						
							|  |  |  |     :return: 文件路径 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     file_info = url.split('?')[1].split('&') | 
					
						
							|  |  |  |     task_id = file_info[0].split('=')[1] | 
					
						
							|  |  |  |     file_name = file_info[1].split('=')[1] | 
					
						
							|  |  |  |     task_path = file_info[2].split('=')[1] | 
					
						
							|  |  |  |     filepath = os.path.join(CachePathConfig.PATH, task_path, task_id, file_name) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return filepath |