import io import json import uuid from typing import Optional, List import pandas as pd from fastapi import UploadFile from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.dataint_vo import SscfPageObject, SaveSscfModel, TreeOperateModel from module_admin.entity.do.dataint_do import SysSscf from module_admin.dao.sscf_dao import SscfDao from exceptions.exception import ServiceException, ServiceWarning from datetime import datetime from utils.common_util import * class SscfService: """ 智能问答服务层 """ @classmethod async def get_sscf_list_services(cls, result_db: AsyncSession, sscf_query: SscfPageObject, current_user: CurrentUserModel): result = await SscfDao.get_sscf_list(result_db, sscf_query) return CamelCaseUtil.transform_result(result) @classmethod async def get_dasset_tree_services(cls, result_db: AsyncSession, current_user: CurrentUserModel): result = await SscfDao.get_dasset_tree(result_db) return CamelCaseUtil.transform_result(result) @classmethod async def save_dasset_tree_services(cls, result_db: AsyncSession, treeOperate: TreeOperateModel, current_user: CurrentUserModel): tree = await SscfDao.get_dasset_tree_by_code(result_db, treeOperate.dasset_code) if treeOperate.operate == 'ADD': if tree: raise ServiceException(message="已存在code为:"+treeOperate.dasset_code+"的目录,无法重复新增,请修改code后新增") else: await SscfDao.insert_dasset_tree(result_db, treeOperate) await result_db.commit() if treeOperate.operate == 'UPDATE': if tree: if tree.onum == treeOperate.onum: treeObj = dict( onum=treeOperate.onum, dasset_code=treeOperate.dasset_code, dasset_name=treeOperate.dasset_name ) await SscfDao.save_dasset_tree(result_db, treeObj) await result_db.commit() else: raise ServiceException(message="已存在code为:"+treeOperate.dasset_code+"的目录") else: treeObj = dict( onum=treeOperate.onum, dasset_code=treeOperate.dasset_code, dasset_name=treeOperate.dasset_name ) await SscfDao.save_dasset_tree(result_db, treeObj) await result_db.commit() if treeOperate.operate == 'DELETE': if tree: await SscfDao.delete_dasset_tree(result_db, treeOperate.onum) await result_db.commit() else: raise ServiceException(mssage="不存在code为:"+treeOperate.dasset_code+"的目录") return CrudResponseModel(is_success=True, message='操作成功') @classmethod async def save_sscf(cls, result_db: AsyncSession, saveSscfModel: SaveSscfModel, current_user: CurrentUserModel): if saveSscfModel.onum is None: # add saveSscfModel.onum = uuid.uuid4() addObj = SysSscf(**saveSscfModel.model_dump()) addObj.create_by = current_user.user.user_name addObj.create_time = datetime.now() addObj.update_by = current_user.user.user_name addObj.update_time = datetime.now() await SscfDao.insert_sscf(result_db, addObj) else: # update oldObj = await SscfDao.get_sscf_by_id(result_db, saveSscfModel.onum) if oldObj is None: raise ServiceException(message='所改对象不存在') saveObj = saveSscfModel.model_dump(exclude_unset=True) saveObj['update_by'] = current_user.user.user_name saveObj['update_time'] = datetime.now() await SscfDao.update_sscf(result_db, saveObj) await result_db.commit() return CrudResponseModel(is_success=True, message='操作成功') @classmethod async def delete_sscf(cls, db: AsyncSession, array: List[str]): await SscfDao.delete_sscf(db, array) await db.commit() return CrudResponseModel(is_success=True, message='操作成功') @staticmethod async def get_import_template_services(): """ 获取元数据导入模板service :return: 元数据导入模板excel的二进制数据 """ table_header_list = ['归属资产名', '关键词', '算法', '按顺序匹配标志', '整句', '类型', '补充说明', '状态'] selector_header_list = ['算法', '按顺序匹配标志', '整句', '类型', '状态'] option_list = [{'算法': ['包含', '等于'], '按顺序匹配标志': ['是', '否'], '整句': ['是', '否'], '类型': ['补充', '替换'], '状态': ['正常', '停用']}] sheet_config1 = dict( sheet_name="短句配置", header_list=table_header_list, data_list=[], selector_header_list=selector_header_list ) sheet_configs = [sheet_config1] binary_data = get_excel_template_with_sheets( sheet_configs, # 每个Sheet的配置(包含表头、选择器等) option_list ) return binary_data @classmethod async def batch_import_sscf_data(cls, result_db: AsyncSession, file: UploadFile, overWrite: bool, current_user: CurrentUserModel): table_header_dict = { '归属资产名': 'dassetName', '关键词': 'keyword', '算法': 'algorithm', '按顺序匹配标志': 'order', '整句': 'wholeSentence', '类型': 'type', '补充说明': 'suppExpl', '状态': 'status' } contents = await file.read() excel_file = pd.ExcelFile(io.BytesIO(contents)) await file.close() # 获取所有sheet名称 sheet_names = excel_file.sheet_names # 逐个读取 tableSheet = sheet_names[0] result_list = { "rows": [], "successCount": 0 } if tableSheet == '短句配置': df = excel_file.parse(sheet_name=tableSheet, dtype=str, keep_default_na=False, na_values=[]) df.rename(columns=table_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' if row['dassetName'] is None or len(row['dassetName']) == 0: noneValid += "归属资产名不能为空" if row['keyword'] is None or len(row['keyword']) == 0: if len(noneValid) > 0: noneValid += ",关键词不能为空" else: noneValid += "关键词不能为空" if row['algorithm'] is None or len(row['algorithm']) == 0: if len(noneValid) > 0: noneValid += ",算法不能为空" else: noneValid += "算法不能为空" if row['order'] is None or len(row['order']) == 0: if len(noneValid) > 0: noneValid += ",按顺序匹配标志不能为空" else: noneValid += "按顺序匹配标志不能为空" if row['wholeSentence'] is None or len(row['wholeSentence']) == 0: if len(noneValid) > 0: noneValid += ",整句不能为空" else: noneValid += "整句不能为空" if row['type'] is None or len(row['type']) == 0: if len(noneValid) > 0: noneValid += ",类型不能为空" else: noneValid += "类型不能为空" if row['status'] is None or len(row['status']) == 0: if len(noneValid) > 0: noneValid += ",状态不能为空" else: noneValid += "状态不能为空" dasset = await SscfDao.getDassetIdByName(result_db, row['dassetName']) if dasset is None: if len(noneValid) > 0: noneValid += ",归属资产名为无效名称(找不到对应资产目录)" else: noneValid += "归属资产名为无效名称(找不到对应资产目录)" if len(noneValid) > 0: result_list['rows'].append({ "row": index + 2, "errorInfo": noneValid }) continue sscf = SaveSscfModel() sscf.dasset_id = dasset.onum, sscf.keyword = row['keyword'] sscf.algorithm = row['algorithm'] sscf.order = 'True' if row['order'] == '是' else 'False' sscf.whole_sentence = 'True' if row['wholeSentence'] == '是' else 'False' sscf.type = row['type'] sscf.supp_expl = row['suppExpl'] sscf.status = '1' if row['status'] == '正常' else '0' oldSscf =await SscfDao.get_oldSscf(result_db, sscf) if oldSscf is not None: sscf.onum = oldSscf.onum await cls.save_sscf(result_db, sscf, current_user) result_list['successCount'] += 1 return result_list