import json import uuid from typing import Optional, List, io import pandas as pd from fastapi import UploadFile from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.dataint_vo import FccbdPageObject, SaveFccbdModel from module_admin.entity.do.dataint_do import SysFccbd from module_admin.dao.fccbd_dao import FccbdDao from exceptions.exception import ServiceException, ServiceWarning from datetime import datetime from utils.common_util import * class FccbdService: """ 智能问答服务层 """ @classmethod async def get_fccbd_list_services(cls, result_db: AsyncSession, fccbd_query: FccbdPageObject, current_user: CurrentUserModel): result = await FccbdDao.get_fccbd_list(result_db, fccbd_query) return CamelCaseUtil.transform_result(result) @classmethod async def save_fccbd(cls, result_db: AsyncSession, saveFccbdModel: SaveFccbdModel, current_user: CurrentUserModel): if saveFccbdModel.onum is None: # add saveFccbdModel.onum = uuid.uuid4() addObj = SysFccbd(**saveFccbdModel.model_dump()) addObj.create_by = current_user.user.user_name addObj.create_time = datetime.now() addObj.update_by = current_user.user.user_name addObj.update_time = datetime.now() await FccbdDao.insert_fccbd(result_db, addObj) else: # update oldObj = await FccbdDao.get_fccbd_by_id(result_db, saveFccbdModel.onum) if oldObj is None: raise ServiceException(message='所改对象不存在') saveObj = saveFccbdModel.model_dump(exclude_unset=True) saveObj['update_by'] = current_user.user.user_name saveObj['update_time'] = datetime.now() await FccbdDao.update_fccbd(result_db, saveObj) await result_db.commit() return CrudResponseModel(is_success=True, message='操作成功') @classmethod async def delete_fccbd(cls, db: AsyncSession, array: List[str]): await FccbdDao.delete_fccbd(db, array) await db.commit() return CrudResponseModel(is_success=True, message='操作成功') @staticmethod async def get_import_template_services(): """ 获取元数据导入模板service :return: 元数据导入模板excel的二进制数据 """ table_header_list = ['词语', '频次', '词性', '词性名称', '状态'] selector_header_list = ['状态'] option_list = [{'状态': ['正常', '停用']}] sheet_config1 = dict( sheet_name="分词词典补充", header_list=table_header_list,data_list=[], selector_header_list=selector_header_list ) sheet_configs = [sheet_config1] binary_data = get_excel_template_with_sheets( sheet_configs, # 每个Sheet的配置(包含表头、选择器等) option_list ) return binary_data @classmethod async def batch_import_fccbd_data(cls, result_db: AsyncSession, file: UploadFile, overWrite: bool, current_user: CurrentUserModel): table_header_dict = { '词语': 'term', '频次': 'freq', '词性': 'pos', '词性名称': 'pos_name', '状态': 'status' } contents = await file.read() excel_file = pd.ExcelFile(io.BytesIO(contents)) await file.close() # 获取所有sheet名称 sheet_names = excel_file.sheet_names # 逐个读取 tableSheet = sheet_names[0] result_list = { "rows": [], "successCount": 0 } if tableSheet == '分词词典补充': df = excel_file.parse(sheet_name=tableSheet, dtype=str, keep_default_na=False, na_values=[]) df.rename(columns=table_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' if row['term'] is None or len(row['term']) == 0: noneValid += "词语不能为空" if row['freq'] is None or len(row['freq']) == 0: if len(noneValid) > 0: noneValid += ",频次不能为空" else: noneValid += "频次不能为空" if row['pos'] is None or len(row['pos']) == 0: if len(noneValid) > 0: noneValid += ",词性不能为空" else: noneValid += "词性不能为空" if row['pos_name'] is None or len(row['pos_name']) == 0: if len(noneValid) > 0: noneValid += ",词性名称不能为空" else: noneValid += "词性名称不能为空" if row['status'] is None or len(row['status']) == 0: if len(noneValid) > 0: noneValid += ",状态不能为空" else: noneValid += "状态不能为空" if len(noneValid) > 0: result_list['rows'].append({ "row": index + 2, "errorInfo": noneValid }) continue fccbd = SaveFccbdModel() fccbd.term = row['term'] fccbd.freq = row['freq'] fccbd.pos = row['pos'] fccbd.pos_name = row['pos_name'] fccbd.status = '1' if row['status'] == '1' else '0' await cls.save_fccbd(result_db, fccbd, current_user) result_list['successCount'] += 1 return result_list