import io import os import pandas as pd import re from openpyxl import Workbook from openpyxl.styles import Alignment, PatternFill from openpyxl.utils import get_column_letter from openpyxl.worksheet.datavalidation import DataValidation from sqlalchemy.engine.row import Row from typing import Any, Dict, List, Literal, Union from config.database import Base from config.env import CachePathConfig def worship(): print(""" //////////////////////////////////////////////////////////////////// // _ooOoo_ // // o8888888o // // 88" . "88 // // (| ^_^ |) // // O\ = /O // // ____/`---'\____ // // .' \\| |// `. // // / \\||| : |||// \ // // / _||||| -:- |||||- \ // // | | \\\ - /// | | // // | \_| ''\---/'' | | // // \ .-\__ `-` ___/-. / // // ___`. .' /--.--\ `. . ___ // // ."" '< `.___\_<|>_/___.' >'"". // // | | : `- \`.;`\ _ /`;.`/ - ` : | | // // \ \ `-. \_ __\ /__ _/ .-` / / // // ========`-.____`-.___\_____/___.-`____.-'======== // // `=---=' // // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // // 佛祖保佑 永不宕机 永无BUG // //////////////////////////////////////////////////////////////////// """) class SqlalchemyUtil: """ sqlalchemy工具类 """ @classmethod def base_to_dict( cls, obj: Union[Base, Dict], transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case' ): """ 将sqlalchemy模型对象转换为字典 :param obj: sqlalchemy模型对象或普通字典 :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case' :return: 字典结果 """ if isinstance(obj, Base): base_dict = obj.__dict__.copy() base_dict.pop('_sa_instance_state', None) elif isinstance(obj, dict): base_dict = obj.copy() if transform_case == 'snake_to_camel': return {CamelCaseUtil.snake_to_camel(k): v for k, v in base_dict.items()} elif transform_case == 'camel_to_snake': return {SnakeCaseUtil.camel_to_snake(k): v for k, v in base_dict.items()} return base_dict @classmethod def serialize_result( cls, result: Any, transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case' ): """ 将sqlalchemy查询结果序列化 :param result: sqlalchemy查询结果 :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case' :return: 序列化结果 """ if isinstance(result, (Base, dict)): return cls.base_to_dict(result, transform_case) elif isinstance(result, list): return [cls.serialize_result(row, transform_case) for row in result] elif isinstance(result, Row): if all([isinstance(row, Base) for row in result]): return [cls.base_to_dict(row, transform_case) for row in result] elif any([isinstance(row, Base) for row in result]): return [cls.serialize_result(row, transform_case) for row in result] else: result_dict = result._asdict() if transform_case == 'snake_to_camel': return {CamelCaseUtil.snake_to_camel(k): v for k, v in result_dict.items()} elif transform_case == 'camel_to_snake': return {SnakeCaseUtil.camel_to_snake(k): v for k, v in result_dict.items()} return result_dict return result class CamelCaseUtil: """ 下划线形式(snake_case)转小驼峰形式(camelCase)工具方法 """ @classmethod def snake_to_camel(cls, snake_str: str): """ 下划线形式字符串(snake_case)转换为小驼峰形式字符串(camelCase) :param snake_str: 下划线形式字符串 :return: 小驼峰形式字符串 """ # 分割字符串 words = snake_str.split('_') # 小驼峰命名,第一个词首字母小写,其余词首字母大写 return words[0] + ''.join(word.capitalize() for word in words[1:]) @classmethod def transform_result(cls, result: Any): """ 针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法 :param result: 输入数据 :return: 小驼峰形式结果 """ return SqlalchemyUtil.serialize_result(result=result, transform_case='snake_to_camel') class SnakeCaseUtil: """ 小驼峰形式(camelCase)转下划线形式(snake_case)工具方法 """ @classmethod def camel_to_snake(cls, camel_str: str): """ 小驼峰形式字符串(camelCase)转换为下划线形式字符串(snake_case) :param camel_str: 小驼峰形式字符串 :return: 下划线形式字符串 """ # 在大写字母前添加一个下划线,然后将整个字符串转为小写 words = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str) return re.sub('([a-z0-9])([A-Z])', r'\1_\2', words).lower() @classmethod def transform_result(cls, result: Any): """ 针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法 :param result: 输入数据 :return: 小驼峰形式结果 """ return SqlalchemyUtil.serialize_result(result=result, transform_case='camel_to_snake') def bytes2human(n, format_str='%(value).1f%(symbol)s'): """Used by various scripts. See: http://goo.gl/zeJZl >>> bytes2human(10000) '9.8K' >>> bytes2human(100001221) '95.4M' """ symbols = ('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') prefix = {} for i, s in enumerate(symbols[1:]): prefix[s] = 1 << (i + 1) * 10 for symbol in reversed(symbols[1:]): if n >= prefix[symbol]: value = float(n) / prefix[symbol] return format_str % locals() return format_str % dict(symbol=symbols[0], value=n) def bytes2file_response(bytes_info): yield bytes_info def export_list2excel(list_data: List): """ 工具方法:将需要导出的list数据转化为对应excel的二进制数据 :param list_data: 数据列表 :return: 字典信息对应excel的二进制数据 """ df = pd.DataFrame(list_data) binary_data = io.BytesIO() df.to_excel(binary_data, index=False, engine='openpyxl') binary_data = binary_data.getvalue() return binary_data def get_excel_template(header_list: List, selector_header_list: List, option_list: List[dict]): """ 工具方法:将需要导出的list数据转化为对应excel的二进制数据 :param header_list: 表头数据列表 :param selector_header_list: 需要设置为选择器格式的表头数据列表 :param option_list: 选择器格式的表头预设的选项列表 :return: 模板excel的二进制数据 """ # 创建Excel工作簿 wb = Workbook() # 选择默认的活动工作表 ws = wb.active # 设置表头文字 headers = header_list # 设置表头背景样式为灰色,前景色为白色 header_fill = PatternFill(start_color='ababab', end_color='ababab', fill_type='solid') # 将表头写入第一行 for col_num, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill # 设置列宽度为16 ws.column_dimensions[chr(64 + col_num)].width = 12 # 设置水平居中对齐 cell.alignment = Alignment(horizontal='center') # 设置选择器的预设选项 options = option_list # 获取selector_header的字母索引 for selector_header in selector_header_list: column_selector_header_index = headers.index(selector_header) + 1 # 创建数据有效性规则 header_option = [] for option in options: if option.get(selector_header): header_option = option.get(selector_header) dv = DataValidation(type='list', formula1=f'"{",".join(header_option)}"') # 设置数据有效性规则的起始单元格和结束单元格 dv.add( f'{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576' ) # 添加数据有效性规则到工作表 ws.add_data_validation(dv) # 保存Excel文件为字节类型的数据 file = io.BytesIO() wb.save(file) file.seek(0) # 读取字节数据 excel_data = file.getvalue() return excel_data def get_excel_template_with_sheets(sheet_configs: List[Dict], # 每个Sheet的配置(包含表头、选择器等) option_list: List[dict] # 全局选择器选项(所有Sheet共享) ): """ 生成多Sheet页的Excel模板(支持下拉选择器) :param sheet_configs: 每个Sheet的配置,格式示例: [ { "sheet_name": "Sheet1", # Sheet名称 "header_list": ["姓名", "性别", "城市"], # 表头 "selector_header_list": ["性别", "城市"], # 需要下拉选择的表头 }, { "sheet_name": "Sheet2", "header_list": ["产品", "类别", "价格"], "selector_header_list": ["类别"], } ] :param option_list: 下拉选项配置,格式示例: [ {"性别": ["男", "女"], "城市": ["北京", "上海"]}, # Sheet1的下拉选项 {"类别": ["电子产品", "食品", "服装"]} # Sheet2的下拉选项 ] :return: Excel文件的二进制数据 """ wb = Workbook() # 删除默认创建的Sheet(如果不需要) if "Sheet" in wb.sheetnames: wb.remove(wb["Sheet"]) # 遍历每个Sheet的配置 for sheet_config in sheet_configs: sheet_name = sheet_config["sheet_name"] header_list = sheet_config["header_list"] selector_header_list = sheet_config["selector_header_list"] # 创建Sheet ws = wb.create_sheet(title=sheet_name) # 设置表头样式 header_fill = PatternFill(start_color="ababab", end_color="ababab", fill_type="solid") # 写入表头 for col_num, header in enumerate(header_list, 1): cell = ws.cell(row=1, column=col_num) cell.value = header cell.fill = header_fill cell.alignment = Alignment(horizontal="center") ws.column_dimensions[get_column_letter(col_num)].width = 12 # 设置下拉选择器 for selector_header in selector_header_list: if selector_header not in header_list: continue # 防止配置错误 column_index = header_list.index(selector_header) + 1 column_letter = get_column_letter(column_index) # 从option_list中获取当前Sheet的选项(假设option_list顺序与sheet_configs一致) sheet_options = option_list[sheet_configs.index(sheet_config)] selector_options = sheet_options.get(selector_header, []) if selector_options: dv = DataValidation( type="list", formula1=f'"{",".join(selector_options)}"', # 下拉选项(逗号分隔) ) # 应用到整个列(从第2行开始) dv.add(f"{column_letter}2:{column_letter}1048576") ws.add_data_validation(dv) # 保存为二进制数据 file = io.BytesIO() wb.save(file) file.seek(0) # 读取字节数据 excel_data = file.getvalue() return excel_data def get_filepath_from_url(url: str): """ 工具方法:根据请求参数获取文件路径 :param url: 请求参数中的url参数 :return: 文件路径 """ file_info = url.split('?')[1].split('&') task_id = file_info[0].split('=')[1] file_name = file_info[1].split('=')[1] task_path = file_info[2].split('=')[1] filepath = os.path.join(CachePathConfig.PATH, task_path, task_id, file_name) return filepath