|
|
@ -1602,7 +1602,21 @@ class DataStdService: |
|
|
|
) |
|
|
|
|
|
|
|
return binary_data |
|
|
|
@staticmethod |
|
|
|
async def get_dict_import_template_services(): |
|
|
|
""" |
|
|
|
获取用户导入模板service |
|
|
|
|
|
|
|
:return: 用户导入模板excel的二进制数据 |
|
|
|
""" |
|
|
|
header_list = ['字典归属', '来源系统', '数据字典类型', '数据字典编号', '字典英文名', '字典中文名', '字典业务定义', '数据类型', '数据标准'] |
|
|
|
selector_header_list = ['字典归属', '数据字典类型'] |
|
|
|
option_list = [{'字典归属': ['公司级', '系统级']}, {'数据字典类型': ['基础数据', '指标数据']}] |
|
|
|
binary_data = get_excel_template( |
|
|
|
header_list=header_list, selector_header_list=selector_header_list, option_list=option_list |
|
|
|
) |
|
|
|
|
|
|
|
return binary_data |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
@ -1757,19 +1771,145 @@ class DataStdService: |
|
|
|
except Exception as e: |
|
|
|
await query_db.rollback() |
|
|
|
raise ServiceException(message=f"导入失败:{str(e)}") |
|
|
|
|
|
|
|
@classmethod |
|
|
|
async def batch_import_dict_services( |
|
|
|
cls, |
|
|
|
request: Request, |
|
|
|
query_db: AsyncSession, |
|
|
|
file: UploadFile, |
|
|
|
current_user: CurrentUserModel |
|
|
|
): |
|
|
|
# Step 1: 读取 Excel |
|
|
|
content = await file.read() |
|
|
|
try: |
|
|
|
df = pd.read_excel(io.BytesIO(content)) |
|
|
|
except Exception: |
|
|
|
raise HTTPException(status_code=400, detail="Excel 文件解析失败") |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
async def get_dict_import_template_services(): |
|
|
|
""" |
|
|
|
获取用户导入模板service |
|
|
|
if df.empty: |
|
|
|
raise HTTPException(status_code=400, detail="导入文件内容为空") |
|
|
|
std_type_mapping = { |
|
|
|
'基础数据': '0', |
|
|
|
'指标数据': '1' |
|
|
|
} |
|
|
|
std_vest_mapping = { |
|
|
|
'公司级': 'company', |
|
|
|
'系统级': 'sys' |
|
|
|
} |
|
|
|
# 获取全量标准代码 |
|
|
|
std_list = await DataStdDao.get_std_main_list_import(query_db) |
|
|
|
|
|
|
|
:return: 用户导入模板excel的二进制数据 |
|
|
|
""" |
|
|
|
header_list = ['部门编号', '登录名称', '用户名称', '用户邮箱', '手机号码', '用户性别', '帐号状态'] |
|
|
|
selector_header_list = ['用户性别', '帐号状态'] |
|
|
|
option_list = [{'用户性别': ['男', '女', '未知']}, {'帐号状态': ['正常', '停用']}] |
|
|
|
binary_data = get_excel_template( |
|
|
|
header_list=header_list, selector_header_list=selector_header_list, option_list=option_list |
|
|
|
) |
|
|
|
# Step 2: 表头映射(中文转英文字段名) |
|
|
|
header_dict = { |
|
|
|
'字典归属': 'data_dict_vest', |
|
|
|
'来源系统': 'source_system', |
|
|
|
'数据字典类型': 'dict_type', |
|
|
|
'数据字典编号': 'dict_no', |
|
|
|
'字典英文名': 'dict_name_en', |
|
|
|
'字典中文名': 'dict_name_zh', |
|
|
|
'字典业务定义': 'dict_definition', |
|
|
|
'数据类型': 'data_type', |
|
|
|
'数据标准': 'data_std' |
|
|
|
} |
|
|
|
|
|
|
|
df.rename(columns=header_dict, inplace=True) |
|
|
|
# ds系统列表 |
|
|
|
data_tree_result = await MetataskService.get_data_source_tree( request,current_user) |
|
|
|
|
|
|
|
# Step 3: 生成统一 flowId |
|
|
|
batch_flow_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
try: |
|
|
|
for idx, row in df.iterrows(): |
|
|
|
# VO 数据校验 |
|
|
|
try: |
|
|
|
input_code_no = row.get('data_std') |
|
|
|
matched_code = next((item for item in std_list if item.data_std_no == input_code_no), None) |
|
|
|
|
|
|
|
if input_code_no and not matched_code: |
|
|
|
raise HTTPException(status_code=400, detail=f"第 {idx + 2} 行导入失败,数据标准 [{input_code_no}] 在系统中不存在") |
|
|
|
# 获取来源系统字段 |
|
|
|
input_src_sys = row.get("source_system") |
|
|
|
matched_source = next((ds for ds in data_tree_result if ds.name == input_src_sys or str(ds.id) == str(input_src_sys)), None) |
|
|
|
|
|
|
|
# 如果能匹配到系统,则使用 id;否则判断是否为空 |
|
|
|
if matched_source: |
|
|
|
src_sys_id = matched_source.id |
|
|
|
elif not input_src_sys or str(input_src_sys).strip() == "": |
|
|
|
src_sys_id = 10000 |
|
|
|
else: |
|
|
|
raise HTTPException(status_code=400, detail=f"第 {idx + 2} 行导入失败,来源系统 [{input_src_sys}] 不存在") |
|
|
|
|
|
|
|
# 如果存在,使用对应 onum 作为 cdId |
|
|
|
cd_id = matched_code.data_std_no if matched_code else None |
|
|
|
vo = DataStdDictModel( |
|
|
|
dataDictVest=std_vest_mapping.get(row.get('data_dict_vest'), 'company'), # 默认转为 'company', |
|
|
|
srcSys=str(src_sys_id), |
|
|
|
dataDictNo=row.get('dict_no'), |
|
|
|
dataDictCnName=row.get('dict_name_zh'), |
|
|
|
dataDictEngName=row.get('dict_name_en'), |
|
|
|
dataDictBusiMean=row.get('dict_definition'), |
|
|
|
dataDictType=std_type_mapping.get(row.get('dict_type'), '1'), # 默认转为 '1', |
|
|
|
dataDictDataType=str(row.get('data_type')), |
|
|
|
dataStdNo=cd_id, |
|
|
|
dataDictStat="1", # 固定值 |
|
|
|
) |
|
|
|
except Exception as ve: |
|
|
|
raise HTTPException(status_code=400, detail=f"第 {idx + 2} 行数据校验失败: {ve}") |
|
|
|
|
|
|
|
# 构造查询对象并检查是否存在 |
|
|
|
query_obj = DataStdDictModel(dataDictNo=vo.data_dict_no) |
|
|
|
exist_model = await DataStdDao.get_data_dict_by_info(query_db, query_obj) |
|
|
|
|
|
|
|
# 构造审批模型(无论新增/修改) |
|
|
|
model = DataStdDictModel(**vo.model_dump(exclude_unset=True, by_alias=True)) |
|
|
|
model.create_by = current_user.user.user_name |
|
|
|
|
|
|
|
model.create_time = datetime.now() |
|
|
|
|
|
|
|
if exist_model: |
|
|
|
# === 修改审批逻辑 === |
|
|
|
# 标准正在审批中则抛错 |
|
|
|
wating_list = await DataStdDao.check_std_dict_waiting(exist_model.onum, query_db) |
|
|
|
if wating_list: |
|
|
|
raise ServiceException(message=f"第 {idx + 2} 行数据字典正在审批中,请等待审批完成") |
|
|
|
|
|
|
|
last_appr = await DataStdDao.get_last_std_dict_appr_by_id(query_db, exist_model.onum) |
|
|
|
model.onum = exist_model.onum |
|
|
|
|
|
|
|
return binary_data |
|
|
|
appr_model = DataStdDictApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) |
|
|
|
appr_model.changeType = "edit" |
|
|
|
appr_model.onum = str(uuid.uuid4()) |
|
|
|
appr_model.oldInstId = model.onum |
|
|
|
appr_model.compareId = last_appr.onum if last_appr else model.onum |
|
|
|
appr_model.approStatus = "waiting" |
|
|
|
appr_model.flowId = batch_flow_id |
|
|
|
else: |
|
|
|
# === 新增审批逻辑 === |
|
|
|
model.onum = str(uuid.uuid4()) |
|
|
|
appr_model = DataStdDictApprModel(**model.model_dump(exclude_unset=True, by_alias=True)) |
|
|
|
appr_model.changeType = "add" |
|
|
|
appr_model.compareId = model.onum |
|
|
|
appr_model.oldInstId = model.onum |
|
|
|
appr_model.oldInstId = model.onum |
|
|
|
appr_model.approStatus = "waiting" |
|
|
|
appr_model.flowId = batch_flow_id |
|
|
|
|
|
|
|
# 保存审批数据 |
|
|
|
await DataStdDao.add_std_dict_appr(query_db, appr_model) |
|
|
|
|
|
|
|
# 全部处理完成后统一发起审批流程 |
|
|
|
apply_model = ApplyModel() |
|
|
|
apply_model.businessType = "dataStdDict" |
|
|
|
apply_model.businessId = batch_flow_id |
|
|
|
apply_model.applicant = current_user.user.user_name |
|
|
|
await ApprovalService.apply_services(query_db, apply_model, 'dataStdDict') |
|
|
|
|
|
|
|
await query_db.commit() |
|
|
|
return CrudResponseModel(is_success=True, message="批量导入字典成功") |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
await query_db.rollback() |
|
|
|
raise ServiceException(message=f"导入失败:{str(e)}") |
|
|
|
|