@ -1257,6 +1257,16 @@ class DataStdService:
else :
raise ServiceException ( message = f ' 标准 { model . onum } 不存在 ' )
@classmethod
async def edit_std_main_status ( cls , query_db : AsyncSession , model : DataStdMainModel ) :
existing = await cls . get_std_main_by_id ( query_db , model . onum )
if existing :
existing . std_status = model . std_status
await DataStdDao . update_std_main ( query_db , existing )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = ' 编辑标准成功 ' )
else :
raise ServiceException ( message = f ' 标准 { model . onum } 不存在 ' )
@classmethod
async def add_std_main_appr ( cls , query_db : AsyncSession , model : DataStdMainModel ) :
if not await cls . check_std_num_unique ( query_db , model ) :
raise ServiceException ( message = f " 标准编号 { model . data_std_no } 已存在 " )
@ -1767,15 +1777,19 @@ class DataStdService:
file : UploadFile ,
current_user : CurrentUserModel
) :
"""
批量导入标准信息
"""
# Step 1: 读取 Excel
content = await file . read ( )
try :
df = pd . read_excel ( io . BytesIO ( content ) )
except Exception :
raise HTTPException ( status_code = 400 , detail = " Excel 文件解析失败 " )
if df . empty :
raise HTTPException ( status_code = 400 , detail = " 导入文件内容为空 " )
std_type_mapping = {
' 基础数据 ' : ' 0 ' ,
' 指标数据 ' : ' 1 '
@ -1784,10 +1798,11 @@ class DataStdService:
' 公司级 ' : ' company ' ,
' 系统级 ' : ' sys '
}
# 获取全量标准代码
std_code_list = await DataStdDao . get_std_code_list_all ( query_db )
# Step 2: 表头映射(中文转英文字段名)
# Step 2: 表头映射
header_dict = {
' 标准归属 ' : ' data_std_vest ' ,
' 来源系统 ' : ' std_source_system ' ,
@ -1805,104 +1820,137 @@ class DataStdService:
' 业务认责人员 ' : ' biz_person ' ,
' 技术认责部门 ' : ' tech_dept ' ,
' 技术认责人员 ' : ' tech_person ' ,
}
df . rename ( columns = header_dict , inplace = True )
# ds系统列表
data_tree_result = await MetataskService . get_data_source_tree ( request , current_user )
# 获取 DS 系统列表
data_tree_result = await MetataskService . get_data_source_tree ( request , current_user )
# Step 3: 生成统一 flowId
batch_flow_id = str ( uuid . uuid4 ( ) )
# 统一字符串清洗函数
def safe_str ( val ) :
if pd . isna ( val ) :
return " "
return str ( val ) . strip ( )
try :
for idx , row in df . iterrows ( ) :
# VO 数据校验
try :
input_code_no = row . get ( ' code_no ' )
matched_code = next ( ( item for item in std_code_list if item . cd_no == input_code_no ) , None )
if input_code_no and not matched_code :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,代码编号 [ { input_code_no } ] 在系统中不存在 " )
# 获取来源系统字段
input_src_sys = row . get ( " std_source_system " )
matched_source = next ( ( ds for ds in data_tree_result if ds . name == input_src_sys or str ( ds . id ) == str ( input_src_sys ) ) , None )
# === 获取标准目录树 ===
catalog_result = await cls . get_catalog_list_services ( query_db , query_object = None , user_id = current_user . user . user_id , is_page = True )
# 将目录树展开成 { "一级目录/二级目录": contentOnum } 结构
def build_catalog_mapping ( node , path = " " , mapping = None ) :
if mapping is None :
mapping = { }
current_path = f " { path } / { node [ ' contentName ' ] } " if path else node [ ' contentName ' ]
mapping [ current_path ] = node [ ' contentOnum ' ]
for child in node . get ( " children " , [ ] ) :
build_catalog_mapping ( child , current_path , mapping )
return mapping
catalog_mapping = build_catalog_mapping ( catalog_result . rows [ 0 ] )
# === 处理标准目录 ===
input_dir = row . get ( " std_dir " )
if not input_dir or str ( input_dir ) . strip ( ) == " " :
belt_content_onum = 2
else :
input_dir = input_dir . strip ( )
if input_dir not in catalog_mapping :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,标准目录 [ { input_dir } ] 不存在 " )
belt_content_onum = catalog_mapping [ input_dir ]
# 如果能匹配到系统,则使用 id;否则判断是否为空
if matched_source :
src_sys_id = matched_source . id
elif not input_src_sys or str ( input_src_sys ) . strip ( ) == " " :
src_sys_id = 10000
else :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,来源系统 [ { input_src_sys } ] 不存在 " )
def safe_str ( val ) :
if pd . isna ( val ) :
return " "
return str ( val ) . strip ( )
# 如果存在,使用对应 onum 作为 cdId
cd_id = matched_code . onum if matched_code else None
vo = DataStdMainModel (
dataStdVest = std_vest_mapping . get ( row . get ( ' data_std_vest ' ) , ' company ' ) , # 默认转为 'company',
srcSys = src_sys_id ,
dataStdNo = row . get ( ' data_std_no ' ) ,
dataStdCnName = row . get ( ' std_name_zh ' ) ,
dataStdEngName = row . get ( ' std_name_en ' ) ,
dataStdBusiDefn = row . get ( ' std_definition ' ) ,
dataStdType = std_type_mapping . get ( row . get ( ' std_type ' ) , ' 1 ' ) , # 默认转为 '1',
dataStdSrc = row . get ( ' std_source ' ) ,
dataClas = row . get ( ' data_category ' ) ,
dataSecLvl = str ( row . get ( ' security_level ' ) ) ,
cdId = cd_id ,
dataStdBusiOwnershipDept = safe_str ( row . get ( ' biz_dept ' ) ) ,
dataStdBusiOwnershipPrsn = safe_str ( row . get ( ' biz_person ' ) ) ,
dataStdItOwnershipDept = safe_str ( row . get ( ' tech_dept ' ) ) ,
dataStdItOwnershipPrsn = safe_str ( row . get ( ' tech_person ' ) ) ,
beltDataStdContent = belt_content_onum , # 固定值
)
# === 基础字段取值 ===
input_code_no = safe_str ( row . get ( ' code_no ' ) )
input_src_sys = safe_str ( row . get ( ' std_source_system ' ) )
input_std_vest = safe_str ( row . get ( ' data_std_vest ' ) )
input_std_type = safe_str ( row . get ( ' std_type ' ) )
input_dir = safe_str ( row . get ( ' std_dir ' ) )
input_data_std_no = safe_str ( row . get ( ' data_std_no ' ) )
input_std_name_zh = safe_str ( row . get ( ' std_name_zh ' ) )
input_std_name_en = safe_str ( row . get ( ' std_name_en ' ) )
input_std_definition = safe_str ( row . get ( ' std_definition ' ) )
input_std_source = safe_str ( row . get ( ' std_source ' ) )
input_data_category = safe_str ( row . get ( ' data_category ' ) )
input_security_level = safe_str ( row . get ( ' security_level ' ) )
input_biz_dept = safe_str ( row . get ( ' biz_dept ' ) )
input_biz_person = safe_str ( row . get ( ' biz_person ' ) )
input_tech_dept = safe_str ( row . get ( ' tech_dept ' ) )
input_tech_person = safe_str ( row . get ( ' tech_person ' ) )
# === 校验标准代码编号 ===
matched_code = next (
( item for item in std_code_list if item . cd_no == input_code_no ) ,
None
)
if input_code_no and not matched_code :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败:代码编号 [ { input_code_no } ] 在系统中不存在 " )
# === 来源系统匹配 ===
matched_source = next (
( ds for ds in data_tree_result if ds . name == input_src_sys or str ( ds . id ) == input_src_sys ) ,
None
)
# === 获取标准目录树 ===
catalog_result = await cls . get_catalog_list_services (
query_db ,
query_object = None ,
user_id = current_user . user . user_id ,
is_page = True
)
# 构建目录映射
def build_catalog_mapping ( node , path = " " , mapping = None ) :
if mapping is None :
mapping = { }
current_path = f " { path } / { node [ ' contentName ' ] } " if path else node [ ' contentName ' ]
mapping [ current_path ] = node [ ' contentOnum ' ]
for child in node . get ( " children " , [ ] ) :
build_catalog_mapping ( child , current_path , mapping )
return mapping
catalog_mapping = build_catalog_mapping ( catalog_result . rows [ 0 ] )
# === 处理标准目录 ===
if not input_dir :
belt_content_onum = 2
else :
if input_dir not in catalog_mapping :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败:标准目录 [ { input_dir } ] 不存在 " )
belt_content_onum = catalog_mapping [ input_dir ]
# === 来源系统 id 处理 ===
if matched_source :
src_sys_id = matched_source . id
elif not input_src_sys :
src_sys_id = 10000
else :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败:来源系统 [ { input_src_sys } ] 不存在 " )
# === cdId ===
cd_id = matched_code . onum if matched_code else None
# === 构造 VO ===
vo = DataStdMainModel (
dataStdVest = std_vest_mapping . get ( input_std_vest , ' company ' ) ,
srcSys = src_sys_id ,
dataStdNo = input_data_std_no ,
dataStdCnName = input_std_name_zh ,
dataStdEngName = input_std_name_en ,
dataStdBusiDefn = input_std_definition ,
dataStdType = std_type_mapping . get ( input_std_type , ' 1 ' ) ,
dataStdSrc = input_std_source ,
dataClas = input_data_category ,
dataSecLvl = input_security_level ,
cdId = cd_id ,
dataStdBusiOwnershipDept = input_biz_dept ,
dataStdBusiOwnershipPrsn = input_biz_person ,
dataStdItOwnershipDept = input_tech_dept ,
dataStdItOwnershipPrsn = input_tech_person ,
beltDataStdContent = belt_content_onum ,
)
except Exception as ve :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行数据校验失败: { ve } " )
# 构造查询对象并检查是否存在
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行数据校验失败: { ve } " )
# === 查询是否存在 ===
query_obj = DataStdMainModel ( dataStdNo = vo . data_std_no )
exist_model = await DataStdDao . get_data_main_by_info ( query_db , query_obj )
# 构造审批模型(无论新增/修改)
# === 构造审批模型 ===
model = DataStdMainModel ( * * vo . model_dump ( exclude_unset = True , by_alias = True ) )
model . create_by = current_user . user . user_name
model . std_status = " 1 "
model . create_time = datetime . now ( )
if exist_model :
# === 修改审批逻辑 ===
# 标准正在审批中则抛错
# 修改审批逻辑
wating_list = await DataStdDao . check_std_main_waiting ( exist_model . onum , query_db )
if wating_list :
raise ServiceException ( message = f " 第 { idx + 2 } 行数据标准正在审批中,请等待审批完成 " )
last_appr = await DataStdDao . get_last_std_main_appr_by_id ( query_db , exist_model . onum )
model . onum = exist_model . onum
appr_model = DataStdMainApprModel ( * * model . model_dump ( exclude_unset = True , by_alias = True ) )
appr_model . changeType = " edit "
appr_model . onum = str ( uuid . uuid4 ( ) )
@ -1911,33 +1959,31 @@ class DataStdService:
appr_model . approStatus = " waiting "
appr_model . flowId = batch_flow_id
else :
# === 新增审批逻辑 ===
# 新增审批逻辑
model . onum = str ( uuid . uuid4 ( ) )
appr_model = DataStdMainApprModel ( * * model . model_dump ( exclude_unset = True , by_alias = True ) )
appr_model . changeType = " add "
appr_model . compareId = model . onum
appr_model . oldInstId = model . onum
appr_model . oldInstId = model . onum
appr_model . approStatus = " waiting "
appr_model . flowId = batch_flow_id
# 保存审批数据
await DataStdDao . add_std_main_appr ( query_db , appr_model )
# 全部处理完成后统一发起审批流程
# === 批量发起审批 ===
apply_model = ApplyModel ( )
apply_model . businessType = " dataStdMain "
apply_model . businessId = batch_flow_id
apply_model . applicant = current_user . user . user_name
await ApprovalService . apply_services ( query_db , apply_model , ' dataStdMain ' )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = " 批量导入标准成功 " )
except Exception as e :
await query_db . rollback ( )
raise ServiceException ( message = f " 导入失败: { str ( e ) } " )
raise ServiceException ( message = f " 导入失败: { str ( e ) } " )
@classmethod
async def batch_import_dict_services (
cls ,
@ -1946,15 +1992,19 @@ class DataStdService:
file : UploadFile ,
current_user : CurrentUserModel
) :
"""
批量导入数据字典服务
"""
# Step 1: 读取 Excel
content = await file . read ( )
try :
df = pd . read_excel ( io . BytesIO ( content ) )
except Exception :
raise HTTPException ( status_code = 400 , detail = " Excel 文件解析失败 " )
if df . empty :
raise HTTPException ( status_code = 400 , detail = " 导入文件内容为空 " )
std_type_mapping = {
' 基础数据 ' : ' 0 ' ,
' 指标数据 ' : ' 1 '
@ -1963,10 +2013,11 @@ class DataStdService:
' 公司级 ' : ' company ' ,
' 系统级 ' : ' sys '
}
# 获取全量标准代码
# 获取全量标准主表数据
std_list = await DataStdDao . get_std_main_list_import ( query_db )
# Step 2: 表头映射(中文转 英文字段名)
# Step 2: 表头映射(中文 -> 英文字段名)
header_dict = {
' 字典归属 ' : ' data_dict_vest ' ,
' 来源系统 ' : ' source_system ' ,
@ -1978,72 +2029,75 @@ class DataStdService:
' 数据类型 ' : ' data_type ' ,
' 数据标准 ' : ' data_std '
}
df . rename ( columns = header_dict , inplace = True )
# ds系统列表
data_tree_result = await MetataskService . get_data_source_tree ( request , current_user )
# ds 系统树
data_tree_result = await MetataskService . get_data_source_tree ( request , current_user )
# Step 3: 生成统一 flowId
batch_flow_id = str ( uuid . uuid4 ( ) )
# 安全字符串函数
def safe_str ( val ) :
if pd . isna ( val ) :
return " "
return str ( val ) . strip ( )
try :
for idx , row in df . iterrows ( ) :
# VO 数据校验
try :
input_code_no = row . get ( ' data_std ' )
matched_code = next ( ( item for item in std_list if item . data_std_no == input_code_no ) , None )
if input_code_no and not matched_code :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,数据标准 [ { input_code_no } ] 在系统中不存在 " )
# 获取来源系统字段
input_src_sys = row . get ( " source_system " )
matched_source = next ( ( ds for ds in data_tree_result if ds . name == input_src_sys or str ( ds . id ) == str ( input_src_sys ) ) , None )
# 如果能匹配到系统,则使用 id;否则判断是否为空
if matched_source :
src_sys_id = matched_source . id
elif not input_src_sys or str ( input_src_sys ) . strip ( ) == " " :
src_sys_id = 10000
else :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,来源系统 [ { input_src_sys } ] 不存在 " )
# 如果存在,使用对应 onum 作为 cdId
cd_id = matched_code . data_std_no if matched_code else None
vo = DataStdDictModel (
dataDictVest = std_vest_mapping . get ( row . get ( ' data_dict_vest ' ) , ' company ' ) , # 默认转为 'company',
srcSys = str ( src_sys_id ) ,
dataDictNo = row . get ( ' dict_no ' ) ,
dataDictCnName = row . get ( ' dict_name_zh ' ) ,
dataDictEngName = row . get ( ' dict_name_en ' ) ,
dataDictBusiMean = row . get ( ' dict_definition ' ) ,
dataDictType = std_type_mapping . get ( row . get ( ' dict_type ' ) , ' 1 ' ) , # 默认转为 '1',
dataDictDataType = str ( row . get ( ' data_type ' ) ) ,
dataStdNo = cd_id ,
dataDictStat = " 1 " , # 固定值
)
# === 1. 校验数据标准 ===
input_code_no = safe_str ( row . get ( ' data_std ' ) )
matched_code = next ( ( item for item in std_list if item . data_std_no == input_code_no ) , None )
if input_code_no and not matched_code :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,数据标准 [ { input_code_no } ] 在系统中不存在 " )
# === 2. 来源系统匹配 ===
input_src_sys = safe_str ( row . get ( " source_system " ) )
matched_source = next ( ( ds for ds in data_tree_result if ds . name == input_src_sys or str ( ds . id ) == str ( input_src_sys ) ) , None )
if matched_source :
src_sys_id = matched_source . id
elif not input_src_sys :
src_sys_id = 10000
else :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行导入失败,来源系统 [ { input_src_sys } ] 不存在 " )
# === 3. 构造 VO ===
cd_id = matched_code . data_std_no if matched_code else None
vo = DataStdDictModel (
dataDictVest = std_vest_mapping . get ( safe_str ( row . get ( ' data_dict_vest ' ) ) , ' company ' ) ,
srcSys = str ( src_sys_id ) ,
dataDictNo = safe_str ( row . get ( ' dict_no ' ) ) ,
dataDictCnName = safe_str ( row . get ( ' dict_name_zh ' ) ) ,
dataDictEngName = safe_str ( row . get ( ' dict_name_en ' ) ) ,
dataDictBusiMean = safe_str ( row . get ( ' dict_definition ' ) ) ,
dataDictType = std_type_mapping . get ( safe_str ( row . get ( ' dict_type ' ) ) , ' 1 ' ) ,
dataDictDataType = safe_ str( row . get ( ' data_type ' ) ) ,
dataStdNo = cd_id ,
dataDictStat = " 1 " ,
)
except Exception as ve :
raise HTTPException ( status_code = 400 , detail = f " 第 { idx + 2 } 行数据校验失败: { ve } " )
# 构造查询对象并检查是否存在
# === 4. 判断是否已存在 ===
query_obj = DataStdDictModel ( dataDictNo = vo . data_dict_no )
exist_model = await DataStdDao . get_data_dict_by_info ( query_db , query_obj )
# 构造审批模型(无论新增/修改)
# === 5. 构造审批数据 ===
model = DataStdDictModel ( * * vo . model_dump ( exclude_unset = True , by_alias = True ) )
model . create_by = current_user . user . user_name
model . create_time = datetime . now ( )
if exist_model :
# === 修改审批逻辑 ===
# 标准正在审批中则抛错
wating_list = await DataStdDao . check_std_dict_waiting ( exist_model . onum , query_db )
if wating_list :
# 修改审批逻辑
waiting_list = await DataStdDao . check_std_dict_waiting ( exist_model . onum , query_db )
if waiting_list :
raise ServiceException ( message = f " 第 { idx + 2 } 行数据字典正在审批中,请等待审批完成 " )
last_appr = await DataStdDao . get_last_std_dict_appr_by_id ( query_db , exist_model . onum )
model . onum = exist_model . onum
appr_model = DataStdDictApprModel ( * * model . model_dump ( exclude_unset = True , by_alias = True ) )
appr_model . changeType = " edit "
appr_model . onum = str ( uuid . uuid4 ( ) )
@ -2052,30 +2106,30 @@ class DataStdService:
appr_model . approStatus = " waiting "
appr_model . flowId = batch_flow_id
else :
# === 新增审批逻辑 ===
# 新增审批逻辑
model . onum = str ( uuid . uuid4 ( ) )
appr_model = DataStdDictApprModel ( * * model . model_dump ( exclude_unset = True , by_alias = True ) )
appr_model . changeType = " add "
appr_model . compareId = model . onum
appr_model . oldInstId = model . onum
appr_model . oldInstId = model . onum
appr_model . approStatus = " waiting "
appr_model . flowId = batch_flow_id
# 保存审批数据
await DataStdDao . add_std_dict_appr ( query_db , appr_model )
# 全部处理完成后统一发起审批流程
# === 6. 发起审批流程 ===
apply_model = ApplyModel ( )
apply_model . businessType = " dataStdDict "
apply_model . businessId = batch_flow_id
apply_model . applicant = current_user . user . user_name
await ApprovalService . apply_services ( query_db , apply_model , ' dataStdDict ' )
await query_db . commit ( )
return CrudResponseModel ( is_success = True , message = " 批量导入字典成功 " )
except Exception as e :
await query_db . rollback ( )
raise ServiceException ( message = f " 导入失败: { str ( e ) } " )