You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
7.6 KiB

1 month ago
import io
import json
import uuid
from typing import Optional, List
1 month ago
import pandas as pd
from fastapi import UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_admin.entity.vo.dataint_vo import CdplbPageObject, SaveCdplbModel, SaveCdplbObj
from module_admin.entity.do.dataint_do import SysCdplb
from module_admin.dao.cdplb_dao import CdplbDao
from exceptions.exception import ServiceException, ServiceWarning
from config.env import AppConfig
from datetime import datetime
import requests
1 month ago
from utils.common_util import *
class CdplbService:
"""
智能问答服务层
"""
@classmethod
async def get_cdplb_list_services(cls, result_db: AsyncSession, cdplb_query: CdplbPageObject, current_user: CurrentUserModel):
result = await CdplbDao.get_cdplb_list(result_db, cdplb_query)
return CamelCaseUtil.transform_result(result)
@classmethod
async def save_cdplb(cls, result_db: AsyncSession, saveCdplbModel: SaveCdplbModel, current_user: CurrentUserModel):
if saveCdplbModel.list and len(saveCdplbModel.list) > 0:
for item in saveCdplbModel.list:
if item.onum is None:
oldObj = await CdplbDao.get_cdplb_by_col(result_db, item.ssys_id, item.mdl_name, item.bath_obj_tab_name, item.bath_obj_fld_name)
if oldObj is None:
addObj = SysCdplb(**item.model_dump())
addObj.onum = uuid.uuid4()
addObj.status = '1'
addObj.create_by = current_user.user.user_name
addObj.create_time = datetime.now()
addObj.update_by = current_user.user.user_name
addObj.update_time = datetime.now()
await CdplbDao.insert_cdplb(result_db, addObj)
else:
oldObj = await CdplbDao.get_cdplb_by_id(result_db, item.onum)
if oldObj is None:
raise ServiceException(message='所改对象不存在')
saveObj = item.model_dump(exclude_unset=True)
saveObj['update_by'] = current_user.user.user_name
saveObj['update_time'] = datetime.now()
await CdplbDao.update_cdplb(result_db, saveObj)
await result_db.commit()
return CrudResponseModel(is_success=True, message='操作成功')
@classmethod
async def delete_cdplb(cls, db: AsyncSession, array: List[str]):
await CdplbDao.delete_cdplb(db, array)
await db.commit()
return CrudResponseModel(is_success=True, message='操作成功')
1 month ago
@staticmethod
async def get_import_template_services():
"""
获取元数据导入模板service
:return: 元数据导入模板excel的二进制数据
"""
table_header_list = ['系统名称', '模式名称', '批量对象表名', '批里对象字段名', '状态']
1 month ago
selector_header_list = ['状态']
option_list = [{'状态': ['正常', '停用']}]
sheet_config1 = dict(
sheet_name="词典批量补充", header_list=table_header_list, data_list=[], selector_header_list=selector_header_list
)
sheet_configs = [sheet_config1]
binary_data = get_excel_template_with_sheets(
sheet_configs, # 每个Sheet的配置(包含表头、选择器等)
option_list
)
return binary_data
@classmethod
async def batch_import_cdplb_data(cls,
result_db: AsyncSession,
file: UploadFile,
overWrite: bool,
current_user: CurrentUserModel):
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100'
headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password}
response = requests.get(url, headers=headers, verify=False)
dataSourceList = []
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
dataSourceList = data["data"]["totalList"]
else:
raise ServiceException(message=f'系统异常,获取数据源失败')
1 month ago
table_header_dict = {
'系统名称': 'ssys_cd',
'模式名称': 'mdl_name',
1 month ago
'批量对象表名': 'bath_obj_tab_name',
'批里对象字段名': 'bath_obj_fld_name',
'状态': 'status'
}
contents = await file.read()
excel_file = pd.ExcelFile(io.BytesIO(contents))
await file.close()
# 获取所有sheet名称
sheet_names = excel_file.sheet_names
# 逐个读取
tableSheet = sheet_names[0]
result_list = {
"rows": [],
"successCount": 0
}
if tableSheet == '词典批量补充':
df = excel_file.parse(sheet_name=tableSheet, dtype=str, keep_default_na=False, na_values=[])
df.rename(columns=table_header_dict, inplace=True)
for index, row in df.iterrows():
noneValid = ''
ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None)
1 month ago
if row['bath_obj_tab_name'] is None or len(row['bath_obj_tab_name']) == 0:
noneValid += "批量对象表名不能为空"
if row['bath_obj_fld_name'] is None or len(row['bath_obj_fld_name']) == 0:
if len(noneValid) > 0:
noneValid += ",批里对象字段名不能为空"
else:
noneValid += "批里对象字段名不能为空"
if row['ssys_cd'] is None or len(row['ssys_cd']) == 0:
1 month ago
if len(noneValid) > 0:
noneValid += ",系统不能为空"
1 month ago
else:
noneValid += "系统不能为空"
if row['mdl_name'] is None or len(row['mdl_name']) == 0:
1 month ago
if len(noneValid) > 0:
noneValid += ",模式不能为空"
1 month ago
else:
noneValid += "模式不能为空"
1 month ago
if row['status'] is None or len(row['status']) == 0:
if len(noneValid) > 0:
noneValid += ",状态不能为空"
else:
noneValid += "状态不能为空"
if ssysId is None:
if len(noneValid) > 0:
noneValid += ",系统不存在,为无效系统"
else:
noneValid += "系统不存在,为无效系统"
1 month ago
if len(noneValid) > 0:
result_list['rows'].append({
"row": index + 2,
"errorInfo": noneValid
})
continue
cdplb = SaveCdplbObj()
cdplb.ssys_id = ssysId
cdplb.mdl_name = row['mdl_name']
1 month ago
cdplb.bath_obj_tab_name = row['bath_obj_tab_name']
cdplb.bath_obj_fld_name = row['bath_obj_fld_name']
cdplb.status = '1' if row['status'] == '正常' else '0'
mdl = SaveCdplbModel(list=[cdplb])
await cls.save_cdplb(result_db, mdl, current_user)
1 month ago
result_list['successCount'] += 1
return result_list