You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1346 lines
						
					
					
						
							53 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							1346 lines
						
					
					
						
							53 KiB
						
					
					
				| from sqlalchemy import delete, select, update, desc, or_, not_, and_ | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from module_admin.entity.do.datastd_do import DataStdCode, DataStdDict, DataAstContent, DataAstContentRela, DataStdMain, \ | |
|     DataStdMainAppr, DataStdDictAppr, DataStdCodeAppr | |
| from module_admin.entity.vo.datastd_vo import DataStdCodeModel, DataStdDictModel, DataStdMainModel, \ | |
|     DataStdMainApprModel, DataStdDictApprModel, DataStdCodeApprModel, StdDictNoPageParam | |
| from module_admin.entity.do.dept_do import SysDept | |
| from utils.page_util import PageUtil | |
| from sqlalchemy.orm import aliased | |
| from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel, \ | |
|     DataCatalogChild | |
| from datetime import datetime | |
| from module_admin.entity.do.approval_do import FlowApproval, FlowConfig | |
| from module_admin.entity.do.meta_do   import MetadataFldSuppInfo | |
| 
 | |
| class DataStdDao: | |
| 
 | |
|     # ----------------------------------------------------------------数据标准模块---------------------------------------------------------------------------------------------------- | |
|     @classmethod | |
|     async def get_std_code_appr_list(cls, flowId: str, db: AsyncSession): | |
|         filters = [] | |
|         filters.append(DataStdCodeAppr.flowId == flowId) | |
|         filters.append(DataStdCodeAppr.class_id == "code") | |
|         query = select(DataStdCodeAppr).where(*filters).order_by(desc(DataStdCodeAppr.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_appr_list_Flow(cls, flowId: str, db: AsyncSession): | |
|         filters = [] | |
|         filters.append(DataStdCodeAppr.flowId == flowId) | |
|         query = select(DataStdCodeAppr).where(*filters).order_by(desc(DataStdCodeAppr.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): | |
|         """ | |
|         查询 DataStdCode 列表,带出 code_map_cn 字段(通过 code_map_id 关联同表获取 cd_val_cn_mean) | |
|         :param db: ORM 会话对象 | |
|         :param query_object: 查询条件对象 | |
|         :param is_page: 是否分页 | |
|         :return: 列表结果(分页或不分页) | |
|         """ | |
|         # 构建别名 | |
|         c1 = aliased(DataStdCode)  # 当前记录 | |
|         c2 = aliased(DataStdCode)  # 关联 code_map_id | |
| 
 | |
|         # 构建过滤条件 | |
|         filters = [] | |
|         if query_object.cd_val_cn_mean: | |
|             filters.append(c1.cd_val_cn_mean.like(f"%{query_object.cd_val_cn_mean}%")) | |
|         if query_object.cd_no: | |
|             filters.append(or_( | |
|                 c1.cd_val_cn_mean.like(f"%{query_object.cd_no}%"), | |
|                 c1.cd_no.like(f"%{query_object.cd_no}%") | |
|             )) | |
|         if query_object.cd_val_stat: | |
|             filters.append(c1.cd_val_stat == query_object.cd_val_stat) | |
|         if query_object.src_sys: | |
|             filters.append(c1.src_sys == query_object.src_sys) | |
|         if query_object.cd_type: | |
|             filters.append(c1.cd_type == query_object.cd_type) | |
|         if query_object.class_id: | |
|             filters.append(c1.class_id == query_object.class_id) | |
|             if query_object.class_id == 'codeItem' and not query_object.parent_id: | |
|                 filters.append(1 == 2) | |
|         if query_object.parent_id: | |
|             filters.append(c1.parent_id == query_object.parent_id) | |
| 
 | |
|         # 构建查询 | |
|         query = ( | |
|             select( | |
|                 c1.onum,  # 当前 code 数据的 Id | |
|                 c1.cd_no,  # 当前代码编号 | |
|                 c1.cd_val_cn_mean,  # 当前代码值 | |
|                 c1.cd_type,  # 当前代码类型 | |
|                 c1.cd_val_stat,  # 当前代码状态 | |
|                 c1.src_sys,  # 当前归属系统 | |
|                 c1.class_id,  # 当前代码类别 | |
|                 c1.parent_id,  # 当前代码父 id | |
|                 c1.code_map_id,  # 当前代码映射 Id | |
|                 c2.cd_val_cn_mean.label("code_map_cn")  # 映射名称 | |
|             ) | |
|             .select_from(c1) | |
|             .outerjoin(c2, c1.code_map_id == c2.onum) | |
|             .where(*filters) | |
|         ) | |
|         if query_object.parent_id: | |
|             query = query.order_by(c1.cd_no) | |
|         else: | |
|             query = query.order_by(desc(c1.create_time)) | |
|         # 分页处理 | |
|         result = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
|         return result | |
| 
 | |
|     @classmethod | |
|     async def get_appr_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeApprModel, is_page: bool = False): | |
|         """ | |
|      获取 DataStdCode 的列表信息,支持模糊查询和分页 | |
|      :param db: ORM对象 | |
|      :param query_object: 查询参数对象 | |
|      :param is_page: 是否开启分页 | |
|      :return: 列表信息 | |
|      """ | |
|         # 构建查询条件 | |
|         filters = [] | |
|         if query_object.cd_val_cn_mean: | |
|             filters.append(DataStdCodeAppr.cd_val_cn_mean.like(f"%{query_object.cd_val_cn_mean}%")) | |
|         if query_object.cd_no: | |
|             filters.append(or_( | |
|                 DataStdCodeAppr.cd_val_cn_mean.like(f"%{query_object.cd_no}%"), | |
|                 DataStdCodeAppr.cd_no.like(f"%{query_object.cd_no}%") | |
|             )) | |
|         if query_object.cd_val_stat: | |
|             filters.append(DataStdCodeAppr.cd_val_stat == query_object.cd_val_stat) | |
|         if query_object.src_sys: | |
|             filters.append(DataStdCodeAppr.src_sys == query_object.src_sys) | |
|         if query_object.cd_type: | |
|             filters.append(DataStdCodeAppr.cd_type == query_object.cd_type) | |
| 
 | |
|         if query_object.class_id: | |
|             filters.append(DataStdCodeAppr.class_id == query_object.class_id) | |
|             if query_object.class_id == 'codeItem' and not query_object.parent_id: | |
|                 filters.append(1 == 2) | |
|         if query_object.parent_id: | |
|             filters.append(DataStdCodeAppr.parent_id == query_object.parent_id) | |
|             # 构建查询语句 | |
|         query = ( | |
|             select(DataStdCodeAppr) | |
|             .where(*filters) | |
|             .order_by(desc(DataStdCodeAppr.create_time))  # 按创建时间降序排序 | |
|         ) | |
| 
 | |
|         # 分页处理 | |
|         col_list = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
|         return col_list | |
| 
 | |
|     @classmethod | |
|     async def get_last_std_code_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         result = await db.execute( | |
|             select(DataStdCodeAppr) | |
|             .where( | |
|                 DataStdCodeAppr.oldInstId == Id, | |
|                 DataStdCodeAppr.approStatus == "succeed" | |
|             ) | |
|             .order_by(DataStdCodeAppr.upd_time.desc()) | |
|             .limit(1) | |
|         ) | |
|         return result.scalar_one_or_none() | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_list_all(cls, query_db: AsyncSession): | |
|         """ | |
|         获取所有标准代码数据列表(仅有效的) | |
|         :param session: 异步数据库会话 | |
|         :return: List[DataStdCode] | |
|         """ | |
|         stmt = select(DataStdCode).where(DataStdCode.class_id == 'code')  # 只查有效的 | |
|         result = await query_db.execute(stmt) | |
|         return result.scalars().all() | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_list_import(cls, query_db: AsyncSession): | |
|         """ | |
|         获取所有标准代码数据列表(仅有效的) | |
|         :param session: 异步数据库会话 | |
|         :return: List[DataStdCode] | |
|         """ | |
|         stmt = select(DataStdMain).where(1 == 1)  # 只查有效的 | |
|         result = await query_db.execute(stmt) | |
|         return result.scalars().all() | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_map_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): | |
|         # 构建查询条件 | |
|         filters = [] | |
|         # 系统级代码 | |
|         c1 = aliased(DataStdCode)  # 当前 codeItem 表(c1) | |
|         c2 = aliased(DataStdCode)  # 别名 c2 表示父级(parent) | |
|         c3 = aliased(DataStdCode)  # 别名 c3 表示映射的代码 | |
|         c4 = aliased(DataStdCode)  # 别名 c4 表示映射父级代码 | |
|         if query_object.parent_id: | |
|             if query_object.cd_type == 'sys': | |
|                 filters.append(c1.parent_id == query_object.parent_id) | |
|             else: | |
|                 filters.append(c3.parent_id == query_object.parent_id) | |
|                 # 公司级代码 | |
|         # 2. 创建别名并进行连接 | |
|         filters.append(c1.class_id == 'codeItem')  # class_id 为 'codeItem' | |
|         # 3. 构建基础查询,使用连接和别名 | |
|         query = ( | |
|             select( | |
|                 c2.src_sys.label("target_sys_id"),  # 父级系统编号 | |
|                 c2.src_sys.label("target_sys_name"),  # 父级系统名称 | |
|                 c2.cd_no.label("target_code_num"),  # 父级代码编号 | |
|                 c2.cd_val_cn_mean.label("target_code_name"),  # 父级代码名称 | |
|                 c1.cd_no.label("target_code_num_item"),  # 当前 codeItem 的编号 | |
|                 c1.cd_val_cn_mean.label("target_code_name_item"),  # 当前 codeItem 的名称 | |
|                 c4.cd_no.label("resource_code_num"),  # 映射的父级代码编号 | |
|                 c4.cd_val_cn_mean.label("resource_code_name"),  # 映射的父级代码名称 | |
|                 c3.cd_no.label("resource_code_num_item"),  # 映射的父级代码编号 | |
|                 c3.cd_val_cn_mean.label("resource_code_name_item")  # 映射的父级代码名称 | |
|             ) | |
|             .select_from(c1)  # 从 c1 开始查询 | |
|             .join(c2, c1.parent_id == c2.onum, isouter=True)  # 连接 c2 | |
|             .join(c3, c1.code_map_id == c3.onum, isouter=True)  # 连接 c3 | |
|             .join(c4, c3.parent_id == c4.onum, isouter=True)  # 连接 c4 | |
|             .where(*filters)  # 使用所有过滤条件 | |
|             .order_by(desc(c1.create_time))  # 按照创建时间降序排列 | |
|         ) | |
|         # 4. 分页逻辑 | |
|         col_list = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
|         return col_list | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_map_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False): | |
|         # 构建查询条件 | |
|         filters = [] | |
|         # 系统级代码 | |
|         c1 = aliased(DataStdMain)  # | |
|         c2 = aliased(DataStdDict)  # | |
| 
 | |
|         filters.append(c1.onum == query_object.onum) | |
| 
 | |
|         # 3. 构建基础查询,使用连接和别名 | |
|         query = ( | |
|             select( | |
|                 c1.data_std_no.label("data_std_no"),  # 标准编号 | |
|                 c1.data_std_eng_name.label("data_std_eng_name"),  # 标准英文名称 | |
|                 c1.data_std_cn_name.label("data_std_cn_name"),  # 标准中文名 | |
|                 c2.data_dict_no.label("data_dict_no"),  # 字典编号 | |
|                 c2.data_dict_eng_name.label("data_dict_eng_name"),  # 字典英文名称 | |
|                 c2.data_dict_cn_name.label("data_dict_cn_name"),  # 字典中文名 | |
|             ) | |
|             .select_from(c1)  # 从 c1 开始查询 | |
|             .join(c2, c1.data_std_no == c2.data_std_no, isouter=True)  # 连接 c2 | |
| 
 | |
|             .where(*filters)  # 使用所有过滤条件 | |
|             .order_by(desc(c1.create_time))  # 按照创建时间降序排列 | |
|         ) | |
|         # 4. 分页逻辑 | |
|         col_list = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
|         return col_list | |
|     @classmethod | |
|     async def get_std_dict_map_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False): | |
|         # 构建查询条件 | |
|         filters = [] | |
|         # 系统级代码 | |
|         c1 = aliased(DataStdDict)  # | |
|         c2 = aliased(MetadataFldSuppInfo)  # | |
| 
 | |
|         filters.append(c1.onum == query_object.onum) | |
| 
 | |
|         # 3. 构建基础查询,使用连接和别名 | |
|         query = ( | |
|             select( | |
|                 c1.data_dict_no.label("data_dict_no"),  # 字典编号 | |
|                 c1.data_dict_cn_name.label("data_dict_cn_name"),  # 字典中文名 | |
|                 c1.data_dict_eng_name.label("data_dict_eng_name"),  # 字典英文名称 | |
|                 c2.mdl_name.label("mdl_name"),  # 元数据模式 | |
|                 c2.ssys_cd.label("ssys_cd"),  # 元数据表名 | |
|                 c2.tab_eng_name.label("tab_eng_name"),  # 元数据表名 | |
|                 c2.fld_eng_name.label("fld_eng_name"),  # 元数据字段 | |
|                 c2.fld_crrct_name.label("fld_crrct_name"),  # 元数据字段中文名 | |
| 
 | |
|             ) | |
|             .select_from(c1)  # 从 c1 开始查询 | |
|             .join(c2, c1.onum == c2.data_dict_id, isouter=True)  # 连接 c2 | |
| 
 | |
|             .where(*filters)  # 使用所有过滤条件 | |
|             .order_by(desc(c1.create_time))  # 按照创建时间降序排列 | |
|         ) | |
|         # 4. 分页逻辑 | |
|         col_list = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
|         return col_list | |
| 
 | |
|     @classmethod | |
|     async def get_data_code_list_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdCode) | |
|                 .where( | |
|                     DataStdCode.cd_val_cn_mean == query_object.cd_val_cn_mean if query_object.cd_val_cn_mean else True, | |
|                     DataStdCode.onum == query_object.onum if query_object.onum else True, | |
|                     DataStdCode.cd_val_stat == query_object.cd_val_stat if query_object.cd_val_stat else True, | |
|                     DataStdCode.src_sys == query_object.src_sys if query_object.src_sys else True, | |
|                     DataStdCode.cd_type == query_object.cd_type if query_object.cd_type else True, | |
|                     DataStdCode.code_map_id == query_object.code_map_id if query_object.code_map_id else True, | |
|                     DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, | |
|                     DataStdCode.class_id == query_object.class_id if query_object.class_id else True | |
|                 ) | |
|             ) | |
|         ).scalars().all() | |
|         return List | |
| 
 | |
|     @classmethod | |
|     async def get_data_code_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdCode) | |
|                 .where( | |
|                     DataStdCode.cd_val_cn_mean == query_object.cd_val_cn_mean if query_object.cd_val_cn_mean else True, | |
|                     DataStdCode.cd_no == query_object.cd_no if query_object.cd_no else True, | |
|                     DataStdCode.cd_val_stat == query_object.cd_val_stat if query_object.cd_val_stat else True, | |
|                     DataStdCode.src_sys == query_object.src_sys if query_object.src_sys else True, | |
|                     DataStdCode.cd_type == query_object.cd_type if query_object.cd_type else True, | |
|                     DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, | |
|                     DataStdCode.class_id == query_object.class_id if query_object.class_id else True | |
|                 ) | |
|             ) | |
|         ).scalars().first() | |
|         return List | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_by_id(cls, db: AsyncSession, Id: str): | |
|         col = ( | |
|             await db.execute( | |
|                 select(DataStdCode) | |
|                 .where(DataStdCode.onum == Id) | |
|             ) | |
|         ).scalars().first() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def get_std_code_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         col = ( | |
|             await db.execute( | |
|                 select(DataStdCodeAppr) | |
|                 .where(DataStdCodeAppr.onum == Id) | |
|             ) | |
|         ).scalars().first() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def add_std_code(cls, db: AsyncSession, model: DataStdCodeModel): | |
|         col = DataStdCode( | |
|             **model.model_dump() | |
|         ) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def add_std_code_appr(cls, db: AsyncSession, model: DataStdCodeApprModel): | |
|         col = DataStdCodeAppr( | |
|             **model.model_dump() | |
|         ) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def delete_std_code(cls, db: AsyncSession, Id: str): | |
|         await db.execute(delete(DataStdCode).where(DataStdCode.onum == Id)) | |
| 
 | |
|     @classmethod | |
|     async def update_std_code_appr(cls, db: AsyncSession, update_data: DataStdCodeApprModel): | |
| 
 | |
|         await db.execute(update(DataStdCodeAppr), [update_data]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel): | |
| 
 | |
|         await db.execute(update(DataStdCode), [update_data]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def check_code_num_exists(cls, query_db: AsyncSession, cd_no: str) -> bool: | |
|         """ | |
|         检查标准代码或代码项的code_num是否已经存在 | |
|         """ | |
|         # 查询标准代码表中是否存在相同的code_num | |
|         result = await query_db.execute(select(DataStdCodeModel).filter(DataStdCodeModel.cd_no == cd_no)) | |
|         existing_code = result.scalar_one_or_none() | |
| 
 | |
|         # 如果不存在,则检查代码项表中是否存在相同的code_num | |
|         if not existing_code: | |
|             result = await query_db.execute(select(DataStdCode).filter(DataStdCodeModel.cd_no == cd_no)) | |
|             existing_code_item = result.scalar_one_or_none() | |
|             return existing_code_item is not None | |
| 
 | |
|         return True | |
| 
 | |
|     # ----------------------------------------------------------------数据字典---------------------------------------------------------------------------------------------------- | |
|     @classmethod | |
|     async def get_std_dict_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False): | |
|         """ | |
|         获取 DataStdDict 的列表信息,支持模糊查询和分页 | |
|         :param db: ORM对象 | |
|         :param query_object: 查询参数对象 | |
|         :param is_page: 是否开启分页 | |
|         :return: 列表信息 | |
|         """ | |
|         # 构建查询条件 | |
|         filters = [] | |
|         if query_object.data_dict_cn_name: | |
|             filters.append(or_( | |
|                 DataStdDict.data_dict_cn_name.like(f"%{query_object.data_dict_cn_name}%"), | |
|                 DataStdDict.data_dict_eng_name.like(f"%{query_object.data_dict_cn_name}%") | |
|             )) | |
|         if query_object.data_dict_no: | |
|             filters.append(DataStdDict.data_dict_no.like(f"%{query_object.data_dict_no}%")) | |
|         if query_object.data_dict_eng_name: | |
|             filters.append(DataStdDict.data_dict_eng_name.like(f"%{query_object.data_dict_eng_name}%")) | |
|         if query_object.data_dict_vest: | |
|             filters.append(DataStdDict.data_dict_vest == query_object.data_dict_vest) | |
|         if query_object.src_sys: | |
|             filters.append(DataStdDict.src_sys == query_object.src_sys) | |
|         if query_object.data_dict_data_type: | |
|             filters.append(DataStdDict.data_dict_data_type == query_object.data_dict_data_type) | |
|         if query_object.data_dict_stat: | |
|             filters.append(DataStdDict.data_dict_stat == query_object.data_dict_stat) | |
|         c1 = aliased(DataStdDict)  # 技术部门 | |
|         # 构建查询语句 | |
|         query = ( | |
|             select(DataStdDict.onum, | |
|                    DataStdDict.create_by, | |
|                    DataStdDict.create_time, | |
|                    DataStdDict.upd_prsn, | |
|                    DataStdDict.upd_time, | |
|                    DataStdDict.data_dict_no, | |
|                    DataStdDict.data_dict_eng_name, | |
|                    DataStdDict.data_dict_cn_name, | |
|                    DataStdDict.data_dict_vest, | |
|                    DataStdDict.data_dict_type, | |
|                    DataStdDict.data_dict_data_type, | |
|                    DataStdDict.src_sys, | |
|                    DataStdDict.src_sys, | |
|                    DataStdDict.data_dict_busi_mean, | |
|                    DataStdDict.data_dict_data_type, | |
|                    DataStdDict.data_std_no, | |
|                    DataStdDict.data_dict_stat, | |
|                    DataStdMain.data_std_cn_name.label("data_std_cn_name"), | |
|                    DataStdMain.data_std_it_ownership_dept.label("data_std_it_ownership_dept"), | |
|                    DataStdMain.data_std_busi_ownership_dept.label("data_std_busi_ownership_dept"), | |
|                    DataStdMain.data_std_it_ownership_prsn.label("data_std_it_ownership_prsn"), | |
|                    DataStdMain.data_std_busi_ownership_prsn.label("data_std_busi_ownership_prsn") | |
| 
 | |
|                    ) | |
|             .outerjoin(DataStdMain, DataStdDict.data_std_no == DataStdMain.data_std_no)  # 正确方向的左连接 | |
| 
 | |
|             .where(*filters) | |
|             .order_by(desc(DataStdDict.create_time))  # 按创建时间降序排序 | |
|         ) | |
|         # 分页处理 | |
|         col_list = await PageUtil.paginate( | |
|             db, query, query_object.page_num, query_object.page_size, is_page | |
|         ) | |
| 
 | |
|         return col_list | |
| 
 | |
|     @classmethod | |
|     async def get_std_dict_list_no_page(cls, db: AsyncSession, query: StdDictNoPageParam): | |
|         """ | |
|         获取 DataStdDict 的列表信息,支持模糊查询和分页 | |
|         :param db: ORM对象 | |
|         :param query_object: 查询参数对象 | |
|         :param is_page: 是否开启分页 | |
|         :return: 列表信息 | |
|         """ | |
|         # 构建查询条件 | |
|         filters = [] | |
|         # filters.append(or_( | |
|         #     DataStdDict.data_dict_cn_name.like(f"%{query.queryStr}%"), | |
|         #     DataStdDict.data_dict_eng_name.like(f"%{query.queryStr}%") | |
|         # )) | |
|         filters.append(DataStdDict.src_sys == query.ssysCd) | |
|         # c1 = aliased(DataStdDict)  # 技术部门 | |
|         # 构建查询语句 | |
|         query = ( | |
|             select(DataStdDict.onum, | |
|                    DataStdDict.data_dict_no, | |
|                    DataStdDict.data_dict_eng_name, | |
|                    DataStdDict.data_dict_cn_name) | |
|             .where(*filters) | |
|             .order_by(desc(DataStdDict.create_time))  # 按创建时间降序排序 | |
|         ) | |
|         list = (await db.execute(query)).fetchall() | |
|         return list | |
| 
 | |
|     @classmethod | |
|     async def add_std_dict(cls, db: AsyncSession, model: DataStdDictModel): | |
|         col = DataStdDict( | |
|             **model.model_dump() | |
|         ) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def delete_std_dict(cls, db: AsyncSession, Id: str): | |
|         await db.execute(delete(DataStdDict).where(DataStdDict.onum == Id)) | |
| 
 | |
|     @classmethod | |
|     async def get_std_dict_list_all(cls, db: AsyncSession, query_object: DataStdDict): | |
|         filters = [] | |
|         if query_object.data_std_no: | |
|             filters.append(DataStdDict.data_std_no.like(f"%{query_object.data_std_no}%")) | |
|         query = select(DataStdDict).where(*filters).order_by(desc(DataStdDict.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_data_dict_list_by_info(cls, db: AsyncSession, query_object: DataStdDictModel): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdDict) | |
|                 .where( | |
|                     DataStdDict.data_std_no == query_object.data_std_no if query_object.data_std_no else True | |
|                 ) | |
|             ) | |
|         ).scalars().all() | |
|         return List | |
|     @classmethod | |
|     async def get_data_main_list_by_info(cls, db: AsyncSession, query_object: DataStdMain): | |
|         stmt = select(DataStdMain) | |
|      | |
|         # 动态拼接条件 | |
|         if query_object.data_std_no: | |
|             stmt = stmt.where(DataStdMain.data_std_no == query_object.data_std_no) | |
|      | |
|         if query_object.company_level_data_std_no: | |
|             stmt = stmt.where(DataStdMain.company_level_data_std_no == query_object.company_level_data_std_no) | |
|      | |
|         result = await db.execute(stmt) | |
|         return result.scalars().all() | |
| 
 | |
| 
 | |
|     @classmethod | |
|     async def get_meta_field_list_by_onum(cls, db: AsyncSession, onum: str): | |
| 
 | |
|         List = ( | |
|             await db.execute( | |
|                 select(MetadataFldSuppInfo) | |
|                 .where( | |
|                         MetadataFldSuppInfo.data_dict_id == onum if onum else False | |
|                 ) | |
|             ) | |
|         ).scalars().all() | |
|         return List     | |
|     @classmethod | |
|     async def update_std_dict(cls, db: AsyncSession, update_data: DataStdDictModel): | |
| 
 | |
|         await db.execute(update(DataStdDict), [update_data]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def get_std_dict_by_id(cls, db: AsyncSession, Id: str): | |
|         col = ( | |
|             await db.execute( | |
|                 select(DataStdDict) | |
|                 .where(DataStdDict.onum == Id) | |
|             ) | |
|         ).scalars().first() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def get_data_dict_by_info(cls, db: AsyncSession, query_object: DataStdDictModel): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdDict) | |
|                 .where( | |
|                     DataStdDict.data_dict_cn_name == query_object.data_dict_cn_name if query_object.data_dict_cn_name else True, | |
|                     DataStdDict.data_dict_no == query_object.data_dict_no if query_object.data_dict_no else True, | |
|                     DataStdDict.data_dict_stat == query_object.data_dict_stat if query_object.data_dict_stat else True, | |
|                     DataStdDict.data_std_no == query_object.data_std_no if query_object.data_std_no else True, | |
|                     DataStdDict.src_sys == query_object.src_sys if query_object.src_sys else True, | |
|                     DataStdDict.data_dict_data_type == query_object.data_dict_data_type if query_object.data_dict_data_type else True, | |
|                 ) | |
|             ) | |
|         ).scalars().first() | |
|         return List | |
| 
 | |
|     @classmethod | |
|     async def get_data_dict_by_id(cls, db: AsyncSession, id: str): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdDict) | |
|                 .where( | |
|                     DataStdDict.onum == id | |
|                 ) | |
|             ) | |
|         ).scalars().first() | |
|         return List | |
| 
 | |
|     @classmethod | |
|     async def get_data_dict_by_code(cls, db: AsyncSession, code: str): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdDict) | |
|                 .where( | |
|                     DataStdDict.data_dict_no == code | |
|                 ) | |
|             ) | |
|         ).scalars().first() | |
|         return List | |
| 
 | |
|     @classmethod | |
|     async def get_data_sec_lvl_by_dict_id(cls, db: AsyncSession, id: str): | |
|         List = ( | |
|             await db.execute( | |
|                 select(DataStdMain.data_sec_lvl).select_from(DataStdMain) | |
|                 .join( | |
|                     DataStdDict, | |
|                     and_( | |
|                         DataStdDict.data_std_no == DataStdMain.data_std_no, | |
|                         ), isouter=True | |
|                 ) | |
|                 .where( | |
|                     DataStdDict.onum == id | |
|                 ) | |
|             ) | |
|         ).scalars().first() | |
|         return List | |
| 
 | |
|     # 数据标准目录 | |
| 
 | |
|     @classmethod | |
|     async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int): | |
|         """ | |
|         根据目录ID获取目录详细信息 | |
|  | |
|         :param db: orm对象 | |
|         :param content_onum: 目录ID | |
|         :return: 目录信息对象 | |
|         """ | |
|         catalog_info = ( | |
|             (await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum, | |
|                                                            DataAstContent.content_stat == 1))) | |
|             .scalars() | |
|             .first() | |
|         ) | |
| 
 | |
|         return catalog_info | |
| 
 | |
|     @classmethod | |
|     async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel): | |
|         """ | |
|         根据目录参数获取目录信息 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 目录参数对象 | |
|         :return: 目录信息对象 | |
|         """ | |
|         catalog_info = ( | |
|             ( | |
|                 await db.execute( | |
|                     select(DataAstContent).where( | |
|                         DataAstContent.content_name == catalog.content_name if catalog.content_name else True, | |
|                         DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True, | |
|                         DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True, | |
|                         DataAstContent.content_stat == 1, | |
|                     ) | |
|                 ) | |
|             ) | |
|             .scalars() | |
|             .first() | |
|         ) | |
| 
 | |
|         return catalog_info | |
| 
 | |
|     @classmethod | |
|     async def update_leaf_node_flag(cls, db: AsyncSession): | |
|         """ | |
|         更新leaf_node_flag字段 | |
|         """ | |
|         # 创建别名对象 | |
|         t2 = aliased(DataAstContent, name='t2')  # 正确使用aliased创建别名 | |
|         subquery = ( | |
|             select(DataAstContent.content_onum) | |
|             .where( | |
|                 DataAstContent.content_stat == '1', | |
|                 DataAstContent.leaf_node_flag == 0, | |
|                 not_( | |
|                     select(1) | |
|                     .select_from(t2)  # 使用别名后的表 | |
|                     .where( | |
|                         t2.supr_content_onum == DataAstContent.content_onum, | |
|                         t2.content_stat == '1' | |
|                     ) | |
|                     .exists()  # 添加exists()方法 | |
|                 ) | |
|             ) | |
|         ).alias('temp') | |
| 
 | |
|         stmt = ( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum.in_(subquery)) | |
|             .values(leaf_node_flag=1, upd_time=datetime.now()) | |
|         ) | |
|         await db.execute(stmt) | |
| 
 | |
|     @classmethod | |
|     async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict): | |
|         """ | |
|         新增目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 目录对象 | |
|         :return: | |
|         """ | |
|         db_catalog = DataAstContent(**catalog1) | |
|         db.add(db_catalog) | |
|         await db.flush() | |
| 
 | |
|         # 处理子关系(统一转换为 ORM 模型) | |
|         for child in catalog2.get('children', []): | |
|             # 如果是 Pydantic 模型实例,先转换为字典 | |
|             if isinstance(child, DataCatalogChild): | |
|                 child_dict = child.model_dump() | |
|             elif isinstance(child, dict): | |
|                 child_dict = child | |
|             else: | |
|                 raise TypeError("不支持的子关系数据类型") | |
| 
 | |
|             # 创建 ORM 模型实例 | |
|             processed_child = dict(child_dict) | |
|             processed_child['content_onum'] = db_catalog.content_onum | |
|             db_child = DataAstContentRela(**processed_child) | |
| 
 | |
|             db.add(db_child) | |
|             await db.flush() | |
| 
 | |
|         return db_catalog | |
| 
 | |
|     @classmethod | |
|     async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict): | |
|         """ | |
|         编辑叶子节点目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典 | |
|         :return: | |
|         """ | |
|         content_onum = catalog['content_onum'] | |
|         stmt = ( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum == content_onum) | |
|             .values( | |
|                 leaf_node_flag=catalog['leaf_node_flag'] | |
|             ) | |
|         ) | |
| 
 | |
|         await db.execute(stmt) | |
| 
 | |
|     @classmethod | |
|     async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict): | |
|         """ | |
|         编辑目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典 | |
|         :return: | |
|         """ | |
|         content_onum = catalog['content_onum'] | |
| 
 | |
|         stmt = ( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum == content_onum) | |
|             .values( | |
|                 content_name=catalog['content_name'], | |
|                 content_stat=catalog['content_stat'], | |
|                 content_intr=catalog['content_intr'], | |
|                 content_pic=catalog['content_pic'], | |
|                 supr_content_onum=catalog['supr_content_onum'], | |
|                 leaf_node_flag=catalog['leaf_node_flag'], | |
|                 upd_prsn=catalog['upd_prsn'], | |
|                 upd_time=datetime.now() | |
|             )) | |
| 
 | |
|         await db.execute(stmt) | |
| 
 | |
|         # 处理子关系 | |
|         for child in catalog.get('children', []): | |
|             rela_onum = child.get('rela_onum') | |
|             if rela_onum: | |
|                 st = ( | |
|                     update(DataAstContentRela) | |
|                     .where(DataAstContentRela.rela_onum == rela_onum) | |
|                     .values( | |
|                         content_onum=child.get('content_onum'), | |
|                         ast_onum=child.get('ast_onum'), | |
|                         rela_type=child.get('rela_type'), | |
|                         rela_eff_begn_date=child.get('rela_eff_begn_date'), | |
|                         rela_eff_end_date=child.get('rela_eff_end_date'), | |
|                         upd_prsn=child.get('upd_prsn')) | |
|                 ) | |
| 
 | |
|                 await db.execute(st) | |
|                 await cls.update_leaf_node_flag(db) | |
|             else: | |
|                 child['content_onum'] = content_onum | |
|                 db_child = DataAstContentRela(**child) | |
|                 db.add(db_child) | |
|                 await db.flush() | |
|                 await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel): | |
|         """ | |
|         删除目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 目录对象 | |
|         :content_stat=0 作废 | |
|         :return: | |
|         """ | |
|         content_onums = catalog.content_onums.split(',') | |
|         await db.execute( | |
|             update(DataAstContentRela) | |
|             .where(DataAstContentRela.content_onum.in_(content_onums)) | |
|             .values( | |
|                 rela_status=0, | |
|                 rela_eff_end_date=datetime.now() | |
|             ) | |
|         ) | |
|         await db.execute( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum.in_(content_onums)) | |
|             .values( | |
|                 content_stat=0, | |
|                 upd_time=datetime.now() | |
|             ) | |
|         ) | |
| 
 | |
|         await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict): | |
|         """ | |
|         编辑目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典 | |
|         :return: | |
|         """ | |
|         # content_onum = moved_catalog_data['content_onum'] | |
| 
 | |
|         stmt = ( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum == moved_catalog_data['content_onum'], | |
|                    DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum']) | |
|             .values( | |
|                 supr_content_onum=moved_catalog_data['supr_content_onum_after'], | |
|                 upd_time=datetime.now() | |
|             )) | |
| 
 | |
|         await db.execute(stmt) | |
|         await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict): | |
|         """ | |
|         编辑目录数据库操作 | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典 | |
|         :return: | |
|         """ | |
| 
 | |
|         # stmt = ( | |
|         #     update(DataAstContent) | |
|         #     .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) | |
|         #     .values( | |
|         #         content_onum=merge_catalog_data['content_onum_after'], | |
|         #         supr_content_onum=merge_catalog_data['supr_content_onum_after'], | |
|         #         upd_time=datetime.now() | |
|         # ) ) | |
| 
 | |
|         # await db.execute(stmt) | |
|         stmt1 = ( | |
|             update(DataAstContentRela) | |
|             .where(DataAstContentRela.content_onum == merge_catalog_data[ | |
|                 'content_onum'] and DataAstContentRela.rela_status == 1) | |
|             .values( | |
|                 content_onum=merge_catalog_data['content_onum_after'], | |
|                 rela_eff_begn_date=datetime.now() | |
|             ) | |
|         ) | |
|         await db.execute(stmt1) | |
| 
 | |
|         stmt2 = ( | |
|             update(DataAstContent) | |
|             .where(DataAstContent.content_onum == merge_catalog_data['content_onum'], | |
|                    DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) | |
|             .values(content_stat='0') | |
|         ) | |
|         await db.execute(stmt2) | |
| 
 | |
|         await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict): | |
|         """ | |
|         编辑资产关系数据库操作  | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典  | |
|         :return: | |
|         """ | |
| 
 | |
|         stmt = ( | |
|             update(DataAstContentRela) | |
|             .where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'], | |
|                    DataAstContentRela.content_onum == removerel_catalog_data['content_onum']) | |
|             .values( | |
|                 rela_status=removerel_catalog_data['rela_status'] | |
|             )) | |
| 
 | |
|         await db.execute(stmt) | |
|         await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict): | |
|         """ | |
|         编辑资产关系数据库操作  | |
|  | |
|         :param db: orm对象 | |
|         :param catalog: 需要更新的目录字典  | |
|         :return: | |
|         """ | |
| 
 | |
|         stmt = ( | |
|             update(DataAstContentRela) | |
|             .where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'], | |
|                    DataAstContentRela.content_onum == moverel_catalog_data['content_onum']) | |
|             .values( | |
|                 content_onum=moverel_catalog_data['content_onum_after'], | |
|                 rela_eff_end_date=datetime.now() | |
|             )) | |
| 
 | |
|         await db.execute(stmt) | |
|         await cls.update_leaf_node_flag(db) | |
| 
 | |
|     @classmethod | |
|     async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, | |
|                                is_page: bool = False): | |
|         """ | |
|         根据查询参数获取数据资产目录列表 | |
|          | |
|         :param db: 异步会话对象 | |
|         :param query_object: 分页查询参数对象 | |
|         :param is_page: 是否分页 | |
|         :return: 数据资产目录分页列表 | |
|         """ | |
| 
 | |
|         # 修改子查询部分 | |
|         subquery_t1 = ( | |
|             select(DataAstContentRela) | |
|             .where( | |
|                    DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1') | |
|             .union_all( | |
|                 select(DataAstContentRela) | |
|                 .where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1') | |
|             ) | |
|         ).alias('subquery_t1')  # 为子查询分配唯一别名 | |
| 
 | |
|         query = ( | |
|             select( | |
|                 DataAstContent.content_onum, | |
|                 DataAstContent.content_name, | |
|                 DataAstContent.content_stat, | |
|                 DataAstContent.content_intr, | |
|                 DataAstContent.content_pic, | |
|                 DataAstContent.supr_content_onum, | |
|                 DataAstContent.leaf_node_flag, | |
|                 DataAstContent.upd_prsn, | |
|                 DataAstContent.upd_time, | |
| 
 | |
|                 subquery_t1.c.rela_onum,  # 明确指定子查询的字段 | |
|                 subquery_t1.c.ast_onum, | |
|                 subquery_t1.c.rela_type, | |
|                 subquery_t1.c.rela_eff_begn_date, | |
|                 subquery_t1.c.rela_eff_end_date, | |
|                 subquery_t1.c.upd_prsn, | |
|             ) | |
|             .distinct() | |
|             .select_from(DataAstContent) | |
|             .outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum)  # 明确使用子查询别名 | |
|             .where(DataAstContent.content_stat == 1) | |
|             .order_by(DataAstContent.content_onum) | |
|         ) | |
| 
 | |
|         # 使用分页工具进行查询 | |
|         data_ast_list = await PageUtil.paginate( | |
|             db, | |
|             query, | |
|             page_num=1, | |
|             page_size=100000, | |
|             is_page=is_page | |
|         ) | |
| 
 | |
|         return data_ast_list | |
| 
 | |
|     # ------------------------------------------------------------数据标准(658行)------------------------------------------------------------------------------------------------- | |
|     @classmethod | |
|     async def get_std_main_list(cls, db: AsyncSession, query_object: DataStdMainModel, is_page: bool = False): | |
|      filters = [] | |
|   | |
|      if query_object.data_std_cn_name: | |
|          filters.append(DataStdMain.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) | |
|      if query_object.data_std_eng_name: | |
|          filters.append( | |
|              or_( | |
|                  DataStdMain.data_std_cn_name.like(f"%{query_object.data_std_eng_name}%"), | |
|                  DataStdMain.data_std_eng_name.like(f"%{query_object.data_std_eng_name}%") | |
|              ) | |
|          ) | |
|      if query_object.data_std_busi_defn: | |
|          filters.append(DataStdMain.data_std_busi_defn.like(f"%{query_object.data_std_busi_defn}%")) | |
|      if query_object.data_std_no: | |
|          filters.append(DataStdMain.data_std_no.like(f"%{query_object.data_std_no}%")) | |
|      if query_object.src_sys: | |
|          filters.append(DataStdMain.src_sys == query_object.src_sys) | |
|      if query_object.data_std_type: | |
|          filters.append(DataStdMain.data_std_type == query_object.data_std_type) | |
|      if query_object.std_status: | |
|          filters.append(DataStdMain.std_status == query_object.std_status) | |
|   | |
|      # === 新增:目录过滤 === | |
|      if query_object.belt_data_std_content: | |
|          # 定义递归CTE | |
|          content_alias = aliased(DataAstContent)  # 假设SQLAlchemy模型叫 DataStdContent | |
|          cte = ( | |
|              select(DataAstContent.content_onum) | |
|              .where(DataAstContent.content_onum == query_object.belt_data_std_content) | |
|              .cte(name="content_cte", recursive=True) | |
|          ) | |
|   | |
|          cte = cte.union_all( | |
|              select(content_alias.content_onum) | |
|              .where(content_alias.supr_content_onum == cte.c.content_onum) | |
|          )  | |
|         # 只取叶子节点 | |
|          filters.append( | |
|              DataStdMain.belt_data_std_content.in_( | |
|                  select(DataAstContent.content_onum) | |
|                  .where(DataAstContent.content_onum == cte.c.content_onum) | |
|              ) | |
|          )          | |
|      # === 查询 === | |
|      query = ( | |
|          select( | |
|              DataStdMain.onum, | |
|              DataStdMain.create_by, | |
|              DataStdMain.create_time, | |
|              DataStdMain.upd_prsn, | |
|              DataStdMain.upd_time, | |
|              DataStdMain.data_std_no, | |
|              DataStdMain.data_std_eng_name, | |
|              DataStdMain.data_std_cn_name, | |
|              DataStdMain.data_std_type, | |
|              DataStdMain.data_sec_lvl, | |
|              DataStdMain.src_sys, | |
|              DataStdMain.data_std_vest, | |
|              DataStdMain.data_std_busi_defn, | |
|              DataStdMain.cd_id, | |
|              DataStdMain.std_status, | |
|              DataStdMain.data_std_busi_ownership_dept, | |
|              DataStdMain.data_std_it_ownership_dept, | |
|              DataStdMain.data_std_busi_ownership_prsn, | |
|              DataStdMain.data_std_it_ownership_prsn, | |
|              DataStdMain.belt_data_std_content, | |
|              DataStdMain.data_std_src, | |
|              DataStdMain.data_clas, | |
|              DataStdMain.typical_fld, | |
|              DataStdCode.cd_no.label("cd_no") | |
|          ) | |
|          .outerjoin( | |
|              DataStdCode, | |
|              DataStdMain.cd_id == DataStdCode.onum | |
|          ) | |
|          .where(*filters) | |
|          .order_by(desc(DataStdMain.create_time)) | |
|      ) | |
|   | |
|      return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) | |
|   | |
|     @classmethod | |
|     async def get_std_main_list_all(cls, db: AsyncSession, query_object: DataStdMainModel): | |
|         filters = [] | |
| 
 | |
|         if query_object.data_std_cn_name: | |
|             filters.append(DataStdMain.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) | |
|         if query_object.data_std_no: | |
|             filters.append(DataStdMain.data_std_no.like(f"%{query_object.data_std_no}%")) | |
|         if query_object.src_sys: | |
|             filters.append(DataStdMain.src_sys == query_object.src_sys) | |
|         if query_object.company_level_data_std_no: | |
|             filters.append(DataStdMain.company_level_data_std_no == query_object.company_level_data_std_no) | |
|         if query_object.data_std_vest: | |
|             filters.append(DataStdMain.data_std_vest == query_object.data_std_vest) | |
|         if query_object.cd_id: | |
|             filters.append(DataStdMain.cd_id == query_object.cd_id) | |
|         if query_object.data_std_type: | |
|             filters.append(DataStdMain.data_std_type == query_object.data_std_type) | |
|         if query_object.std_status: | |
|             filters.append(DataStdMain.std_status == query_object.std_status) | |
|         query = ( | |
|             select( | |
|                 DataStdMain.onum, | |
|                 DataStdMain.create_by, | |
|                 DataStdMain.create_time, | |
|                 DataStdMain.upd_prsn, | |
|                 DataStdMain.upd_time, | |
|                 DataStdMain.data_std_no, | |
|                 DataStdMain.data_std_eng_name, | |
|                 DataStdMain.data_std_cn_name, | |
|                 DataStdMain.data_std_type, | |
|                 DataStdMain.data_sec_lvl, | |
|                 DataStdMain.src_sys, | |
|                 DataStdMain.data_std_vest, | |
|                 DataStdMain.data_std_busi_defn, | |
|                 DataStdMain.cd_id, | |
|                 DataStdMain.std_status, | |
|                 DataStdMain.data_std_busi_ownership_dept, | |
|                 DataStdMain.data_std_it_ownership_dept, | |
|                 DataStdMain.data_std_busi_ownership_prsn, | |
|                 DataStdMain.data_std_it_ownership_prsn, | |
|                 DataStdMain.belt_data_std_content, | |
|                 DataStdMain.data_std_src, | |
|                 DataStdMain.data_clas, | |
|                 DataStdMain.typical_fld, | |
|                 DataStdCode.cd_no.label("cd_no")  # 关联查询的cd_no | |
|             ) | |
|             .outerjoin(  # 使用left outer join确保即使没有关联code也能返回主表数据 | |
|                 DataStdCode, | |
|                 DataStdMain.cd_id == DataStdCode.onum | |
|             ) | |
|             .where(*filters) | |
|             .order_by(desc(DataStdMain.create_time)) | |
|         ) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def add_std_main(cls, db: AsyncSession, model: DataStdMainModel): | |
|         col = DataStdMain(**model.model_dump()) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def add_std_main_appr(cls, db: AsyncSession, model: DataStdMainApprModel): | |
|         col = DataStdMainAppr(**model.model_dump()) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def add_std_dict_appr(cls, db: AsyncSession, model: DataStdDictApprModel): | |
|         col = DataStdDictAppr(**model.model_dump()) | |
|         db.add(col) | |
|         await db.flush() | |
|         return col | |
| 
 | |
|     @classmethod | |
|     async def delete_std_main(cls, db: AsyncSession, Id: str): | |
|         await db.execute(delete(DataStdMain).where(DataStdMain.onum == Id)) | |
| 
 | |
|     @classmethod | |
|     async def delete_std_dict_appr(cls, db: AsyncSession, Id: str): | |
|         await db.execute(delete(DataStdDictAppr).where(DataStdDictAppr.onum == Id)) | |
| 
 | |
|     @classmethod | |
|     async def update_std_main(cls, db: AsyncSession, update_data: DataStdMainModel): | |
|         update_dict = update_data.model_dump(exclude_unset=True) | |
|         await db.execute(update(DataStdMain), [update_dict]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def update_std_dict_appr(cls, db: AsyncSession, update_data: DataStdDictApprModel): | |
|         update_dict = update_data.model_dump(exclude_unset=True) | |
|         await db.execute(update(DataStdDictAppr), [update_dict]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_by_id(cls, db: AsyncSession, Id: str): | |
|         col = await db.execute(select(DataStdMain).where(DataStdMain.onum == Id)) | |
|         return col.scalars().first() | |
| 
 | |
|     @classmethod | |
|     async def get_data_main_by_info(cls, db: AsyncSession, query_object: DataStdMainModel): | |
|         col = await db.execute( | |
|             select(DataStdMain).where( | |
|                 DataStdMain.data_std_cn_name == query_object.data_std_cn_name if query_object.data_std_cn_name else True, | |
|                 DataStdMain.data_std_no == query_object.data_std_no if query_object.data_std_no else True, | |
|                 DataStdMain.data_std_no == query_object.data_std_no if query_object.data_std_no else True, | |
|                 DataStdMain.src_sys == query_object.src_sys if query_object.src_sys else True, | |
|                 DataStdMain.data_std_type == query_object.data_std_type if query_object.data_std_type else True, | |
|                 DataStdMain.cd_id == query_object.cd_id if query_object.cd_id else True, | |
|                 DataStdMain.std_status == query_object.std_status if query_object.std_status else True | |
|             ) | |
|         ) | |
|         return col.scalars().first() | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_appr_list_all(cls, db: AsyncSession, query_object: DataStdMainApprModel): | |
|         filters = [] | |
| 
 | |
|         if query_object.data_std_cn_name: | |
|             filters.append(DataStdMainAppr.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) | |
|         if query_object.data_std_no: | |
|             filters.append(DataStdMainAppr.data_std_no.like(f"%{query_object.data_std_no}%")) | |
|         if query_object.src_sys: | |
|             filters.append(DataStdMainAppr.src_sys == query_object.src_sys) | |
|         if query_object.cd_id: | |
|             filters.append(DataStdMainAppr.cd_id == query_object.cd_id) | |
|         if query_object.flowId: | |
|             filters.append(DataStdMainAppr.flowId == query_object.flowId) | |
|         if query_object.data_std_type: | |
|             filters.append(DataStdMainAppr.data_std_type == query_object.data_std_type) | |
|         if query_object.std_status: | |
|             filters.append(DataStdMainAppr.std_status == query_object.std_status) | |
|         query = select(DataStdMainAppr).where(*filters).order_by(desc(DataStdMainAppr.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_appr_list(cls, flowId: str, db: AsyncSession): | |
|         filters = [] | |
|         filters.append(DataStdMainAppr.flowId == flowId) | |
|         query = select(DataStdMainAppr).where(*filters).order_by(desc(DataStdMainAppr.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_std_dict_appr_list(cls, flowId: str, db: AsyncSession): | |
|         filters = [] | |
|         filters.append(DataStdDictAppr.flowId == flowId) | |
|         query = select(DataStdDictAppr).where(*filters).order_by(desc(DataStdDictAppr.create_time)) | |
|         return await PageUtil.paginate(db, query, 0, 0, False) | |
| 
 | |
|     @classmethod | |
|     async def get_std_main_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         col = await db.execute(select(DataStdMainAppr).where(DataStdMainAppr.onum == Id)) | |
|         return col.scalars().first() | |
| 
 | |
|     @classmethod | |
|     async def get_std_dict_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         col = await db.execute(select(DataStdDictAppr).where(DataStdDictAppr.onum == Id)) | |
|         return col.scalars().first() | |
| 
 | |
|     @classmethod | |
|     async def get_last_std_main_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         result = await db.execute( | |
|             select(DataStdMainAppr) | |
|             .where( | |
|                 DataStdMainAppr.oldInstId == Id, | |
|                 DataStdMainAppr.approStatus == "succeed" | |
|             ) | |
|             .order_by(DataStdMainAppr.upd_time.desc()) | |
|             .limit(1) | |
|         ) | |
|         return result.scalar_one_or_none() | |
| 
 | |
|     @classmethod | |
|     async def get_last_std_dict_appr_by_id(cls, db: AsyncSession, Id: str): | |
|         result = await db.execute( | |
|             select(DataStdDictAppr) | |
|             .where( | |
|                 DataStdDictAppr.oldInstId == Id, | |
|                 DataStdDictAppr.approStatus == "succeed" | |
|             ) | |
|             .order_by(DataStdDictAppr.upd_time.desc()) | |
|             .limit(1) | |
|         ) | |
|         return result.scalar_one_or_none() | |
| 
 | |
|     @classmethod | |
|     async def update_std_main_appr(cls, db: AsyncSession, update_data: DataStdMainApprModel): | |
|         update_dict = update_data.model_dump(exclude_unset=True) | |
|         await db.execute(update(DataStdMainAppr), [update_dict]) | |
|         await db.flush() | |
| 
 | |
|     @classmethod | |
|     async def check_std_main_waiting(cls, oldInstId: str, db: AsyncSession): | |
|         """ | |
|         检查 std_main_appr 的第一条记录是否存在待审批状态(pending 或 waiting) | |
|         """ | |
|         # 1. 查询 DataStdMainAppr 中的所有记录(按时间倒序) | |
|         query = ( | |
|             select(DataStdMainAppr) | |
|             .where(DataStdMainAppr.oldInstId == oldInstId) | |
|             .order_by(desc(DataStdMainAppr.create_time)) | |
|         ) | |
|         result = await db.execute(query) | |
|         first_record = result.scalars().first() | |
| 
 | |
|         if not first_record: | |
|             return None  # 或 raise 自定义异常,例如“未找到审批记录” | |
| 
 | |
|         # 2. 获取 instid,去 FlowApproval 中查询是否存在 pending 或 waiting 状态 | |
|         instid = first_record.flowId | |
| 
 | |
|         approval_query = ( | |
|             select(FlowApproval) | |
|             .where( | |
|                 FlowApproval.businessId == instid, | |
|                 FlowApproval.status.in_(["pending", "waiting"]) | |
|             ) | |
|         ) | |
|         approval_result = await db.execute(approval_query) | |
|         approval = approval_result.scalars().first() | |
| 
 | |
|         # 返回 FlowApproval 对象或布尔值 | |
|         return approval  # 或 return approval is not None | |
| 
 | |
|     @classmethod | |
|     async def check_std_dict_waiting(cls, oldInstId: str, db: AsyncSession): | |
|         """ | |
|         检查 std_main_appr 的第一条记录是否存在待审批状态(pending 或 waiting) | |
|         """ | |
|         # 1. 查询 DataStdDictAppr 中的所有记录(按时间倒序) | |
|         query = ( | |
|             select(DataStdDictAppr) | |
|             .where(DataStdDictAppr.oldInstId == oldInstId) | |
|             .order_by(desc(DataStdDictAppr.create_time)) | |
|         ) | |
|         result = await db.execute(query) | |
|         first_record = result.scalars().first() | |
| 
 | |
|         if not first_record: | |
|             return None  # 或 raise 自定义异常,例如“未找到审批记录” | |
| 
 | |
|         # 2. 获取 instid,去 FlowApproval 中查询是否存在 pending 或 waiting 状态 | |
|         instid = first_record.flowId | |
| 
 | |
|         approval_query = ( | |
|             select(FlowApproval) | |
|             .where( | |
|                 FlowApproval.businessId == instid, | |
|                 FlowApproval.status.in_(["pending", "waiting"]) | |
|             ) | |
|         ) | |
|         approval_result = await db.execute(approval_query) | |
|         approval = approval_result.scalars().first() | |
| 
 | |
|         # 返回 FlowApproval 对象或布尔值 | |
|         return approval  # 或 return approval is not None
 | |
| 
 |