import io import json import uuid from typing import Optional, List import pandas as pd from fastapi import UploadFile from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.user_vo import CurrentUserModel from module_admin.entity.vo.dataint_vo import CdplbPageObject, SaveCdplbModel, SaveCdplbObj from module_admin.entity.do.dataint_do import SysCdplb from module_admin.dao.cdplb_dao import CdplbDao from exceptions.exception import ServiceException, ServiceWarning from config.env import AppConfig from datetime import datetime import requests from utils.common_util import * class CdplbService: """ 智能问答服务层 """ @classmethod async def get_cdplb_list_services(cls, result_db: AsyncSession, cdplb_query: CdplbPageObject, current_user: CurrentUserModel): result = await CdplbDao.get_cdplb_list(result_db, cdplb_query) return CamelCaseUtil.transform_result(result) @classmethod async def save_cdplb(cls, result_db: AsyncSession, saveCdplbModel: SaveCdplbModel, current_user: CurrentUserModel): if saveCdplbModel.list and len(saveCdplbModel.list) > 0: for item in saveCdplbModel.list: if item.onum is None: oldObj = await CdplbDao.get_cdplb_by_col(result_db, item.ssys_id, item.mdl_name, item.bath_obj_tab_name, item.bath_obj_fld_name) if oldObj is None: addObj = SysCdplb(**item.model_dump()) addObj.onum = uuid.uuid4() addObj.status = '1' addObj.create_by = current_user.user.user_name addObj.create_time = datetime.now() addObj.update_by = current_user.user.user_name addObj.update_time = datetime.now() await CdplbDao.insert_cdplb(result_db, addObj) else: oldObj = await CdplbDao.get_cdplb_by_id(result_db, item.onum) if oldObj is None: raise ServiceException(message='所改对象不存在') saveObj = item.model_dump(exclude_unset=True) saveObj['update_by'] = current_user.user.user_name saveObj['update_time'] = datetime.now() await CdplbDao.update_cdplb(result_db, saveObj) await result_db.commit() return CrudResponseModel(is_success=True, message='操作成功') @classmethod async def delete_cdplb(cls, db: AsyncSession, array: List[str]): await CdplbDao.delete_cdplb(db, array) await db.commit() return CrudResponseModel(is_success=True, message='操作成功') @staticmethod async def get_import_template_services(): """ 获取元数据导入模板service :return: 元数据导入模板excel的二进制数据 """ table_header_list = ['系统名称', '模式名称', '批量对象表名', '批里对象字段名', '状态'] selector_header_list = ['状态'] option_list = [{'状态': ['正常', '停用']}] sheet_config1 = dict( sheet_name="词典批量补充", header_list=table_header_list, data_list=[], selector_header_list=selector_header_list ) sheet_configs = [sheet_config1] binary_data = get_excel_template_with_sheets( sheet_configs, # 每个Sheet的配置(包含表头、选择器等) option_list ) return binary_data @classmethod async def batch_import_cdplb_data(cls, result_db: AsyncSession, file: UploadFile, overWrite: bool, current_user: CurrentUserModel): url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100' headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} response = requests.get(url, headers=headers, verify=False) dataSourceList = [] if response.reason == 'OK': response_text = response.text data = json.loads(response_text) dataSourceList = data["data"]["totalList"] else: raise ServiceException(message=f'系统异常,获取数据源失败') table_header_dict = { '系统名称': 'ssys_cd', '模式名称': 'mdl_name', '批量对象表名': 'bath_obj_tab_name', '批里对象字段名': 'bath_obj_fld_name', '状态': 'status' } contents = await file.read() excel_file = pd.ExcelFile(io.BytesIO(contents)) await file.close() # 获取所有sheet名称 sheet_names = excel_file.sheet_names # 逐个读取 tableSheet = sheet_names[0] result_list = { "rows": [], "successCount": 0 } if tableSheet == '词典批量补充': df = excel_file.parse(sheet_name=tableSheet, dtype=str, keep_default_na=False, na_values=[]) df.rename(columns=table_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None) if row['bath_obj_tab_name'] is None or len(row['bath_obj_tab_name']) == 0: noneValid += "批量对象表名不能为空" if row['bath_obj_fld_name'] is None or len(row['bath_obj_fld_name']) == 0: if len(noneValid) > 0: noneValid += ",批量对象字段名不能为空" else: noneValid += "批量对象字段名不能为空" if row['ssys_cd'] is None or len(row['ssys_cd']) == 0: if len(noneValid) > 0: noneValid += ",系统不能为空" else: noneValid += "系统不能为空" if row['mdl_name'] is None or len(row['mdl_name']) == 0: if len(noneValid) > 0: noneValid += ",模式不能为空" else: noneValid += "模式不能为空" if row['status'] is None or len(row['status']) == 0: if len(noneValid) > 0: noneValid += ",状态不能为空" else: noneValid += "状态不能为空" if ssysId is None: if len(noneValid) > 0: noneValid += ",系统不存在,为无效系统" else: noneValid += "系统不存在,为无效系统" if len(noneValid) > 0: result_list['rows'].append({ "row": index + 2, "errorInfo": noneValid }) continue cdplb = SaveCdplbObj() cdplb.ssys_id = ssysId cdplb.mdl_name = row['mdl_name'] cdplb.bath_obj_tab_name = row['bath_obj_tab_name'] cdplb.bath_obj_fld_name = row['bath_obj_fld_name'] cdplb.status = '1' if row['status'] == '正常' else '0' mdl = SaveCdplbModel(list=[cdplb]) await cls.save_cdplb(result_db, mdl, current_user) result_list['successCount'] += 1 return result_list