from sqlalchemy import delete, select, update, desc,or_,not_ from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.datastd_do import DataStdCode,DataStdDict,DataAstContent,DataAstContentRela,DataStdMain,DataStdMainAppr,DataStdDictAppr,DataStdCodeAppr from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DataStdDictModel,DataStdMainModel,DataStdMainApprModel,DataStdDictApprModel,DataStdCodeApprModel from module_admin.entity.do.dept_do import SysDept from utils.page_util import PageUtil from sqlalchemy.orm import aliased from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogChild from datetime import datetime class DataStdDao: # ----------------------------------------------------------------数据标准模块---------------------------------------------------------------------------------------------------- @classmethod async def get_std_code_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataStdCodeAppr.flowId == flowId) filters.append(DataStdCodeAppr.class_id == "code") query = select(DataStdCodeAppr).where(*filters).order_by(desc(DataStdCodeAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_std_code_appr_list_Flow(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataStdCodeAppr.flowId == flowId) query = select(DataStdCodeAppr).where(*filters).order_by(desc(DataStdCodeAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) # @classmethod # async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): # """ # 获取 DataStdCode 的列表信息,支持模糊查询和分页 # :param db: ORM对象 # :param query_object: 查询参数对象 # :param is_page: 是否开启分页 # :return: 列表信息 # """ # # 构建查询条件 # filters = [] # if query_object.cd_val_cn_mean: # filters.append(DataStdCode.cd_val_cn_mean.like(f"%{query_object.cd_val_cn_mean}%")) # if query_object.cd_no: # filters.append( or_( # DataStdCode.cd_val_cn_mean.like(f"%{query_object.cd_no}%"), # DataStdCode.cd_no.like(f"%{query_object.cd_no}%") # )) # if query_object.cd_val_stat: # filters.append(DataStdCode.cd_val_stat==query_object.cd_val_stat) # if query_object.src_sys: # filters.append(DataStdCode.src_sys==query_object.src_sys) # if query_object.cd_type: # filters.append(DataStdCode.cd_type==query_object.cd_type) # if query_object.class_id: # filters.append(DataStdCode.class_id==query_object.class_id) # if query_object.class_id=='codeItem' and not query_object.parent_id: # filters.append(1==2) # if query_object.parent_id: # filters.append(DataStdCode.parent_id==query_object.parent_id) # # 构建查询语句 # # query = ( # # select(DataStdCode) # # .where(*filters) # # .order_by(desc(DataStdCode.create_time)) # 按创建时间降序排序 # # ) # alias_map = aliased(DataStdCode) # query = ( # select( # DataStdCode, # alias_map.cd_val_cn_mean.label("code_map_cn") # ) # .outerjoin(alias_map, DataStdCode.code_map_id == alias_map.onum) # .where(*filters) # .order_by(desc(DataStdCode.create_time)) # ) # # 分页处理 # col_list = await PageUtil.paginate( # db, query, query_object.page_num, query_object.page_size, is_page # ) # return col_list @classmethod async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): """ 查询 DataStdCode 列表,带出 code_map_cn 字段(通过 code_map_id 关联同表获取 cd_val_cn_mean) :param db: ORM 会话对象 :param query_object: 查询条件对象 :param is_page: 是否分页 :return: 列表结果(分页或不分页) """ # 构建别名 c1 = aliased(DataStdCode) # 当前记录 c2 = aliased(DataStdCode) # 关联 code_map_id # 构建过滤条件 filters = [] if query_object.cd_val_cn_mean: filters.append(c1.cd_val_cn_mean.like(f"%{query_object.cd_val_cn_mean}%")) if query_object.cd_no: filters.append(or_( c1.cd_val_cn_mean.like(f"%{query_object.cd_no}%"), c1.cd_no.like(f"%{query_object.cd_no}%") )) if query_object.cd_val_stat: filters.append(c1.cd_val_stat == query_object.cd_val_stat) if query_object.src_sys: filters.append(c1.src_sys == query_object.src_sys) if query_object.cd_type: filters.append(c1.cd_type == query_object.cd_type) if query_object.class_id: filters.append(c1.class_id == query_object.class_id) if query_object.class_id == 'codeItem' and not query_object.parent_id: filters.append(1 == 2) if query_object.parent_id: filters.append(c1.parent_id == query_object.parent_id) # 构建查询 query = ( select( c1.onum, # 当前 code 数据的 Id c1.cd_no, # 当前代码编号 c1.cd_val_cn_mean, # 当前代码值 c1.cd_type, # 当前代码类型 c1.cd_val_stat, # 当前代码状态 c1.src_sys, # 当前归属系统 c1.class_id, # 当前代码类别 c1.parent_id, # 当前代码父 id c1.code_map_id, # 当前代码映射 Id c2.cd_val_cn_mean.label("code_map_cn") # 映射名称 ) .select_from(c1) .outerjoin(c2, c1.code_map_id == c2.onum) .where(*filters) .order_by(desc(c1.create_time)) ) # 分页处理 result = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return result @classmethod async def get_appr_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeApprModel, is_page: bool = False): """ 获取 DataStdCode 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.cd_val_cn_mean: filters.append(DataStdCodeAppr.cd_val_cn_mean.like(f"%{query_object.cd_val_cn_mean}%")) if query_object.cd_no: filters.append( or_( DataStdCodeAppr.cd_val_cn_mean.like(f"%{query_object.cd_no}%"), DataStdCodeAppr.cd_no.like(f"%{query_object.cd_no}%") )) if query_object.cd_val_stat: filters.append(DataStdCodeAppr.cd_val_stat==query_object.cd_val_stat) if query_object.src_sys: filters.append(DataStdCodeAppr.src_sys==query_object.src_sys) if query_object.cd_type: filters.append(DataStdCodeAppr.cd_type==query_object.cd_type) if query_object.class_id: filters.append(DataStdCodeAppr.class_id==query_object.class_id) if query_object.class_id=='codeItem' and not query_object.parent_id: filters.append(1==2) if query_object.parent_id: filters.append(DataStdCodeAppr.parent_id==query_object.parent_id) # 构建查询语句 query = ( select(DataStdCodeAppr) .where(*filters) .order_by(desc(DataStdCodeAppr.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_last_std_code_appr_by_id(cls,db: AsyncSession, Id: str ): result = await db.execute( select(DataStdCodeAppr) .where( DataStdCodeAppr.oldInstId == Id, DataStdCodeAppr.approStatus == "succeed" ) .order_by(DataStdCodeAppr.upd_time.desc()) .limit(1) ) return result.scalar_one_or_none() @classmethod async def get_std_code_map_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False): # 构建查询条件 filters = [] # 系统级代码 c1 = aliased(DataStdCode) # 当前 codeItem 表(c1) c2 = aliased(DataStdCode) # 别名 c2 表示父级(parent) c3 = aliased(DataStdCode) # 别名 c3 表示映射的代码 c4 = aliased(DataStdCode) # 别名 c4 表示映射父级代码 if query_object.parent_id: if query_object.cd_type =='sys': filters.append(c1.parent_id == query_object.parent_id) else: filters.append(c3.parent_id == query_object.parent_id) # 公司级代码 # 2. 创建别名并进行连接 filters.append(c1.class_id == 'codeItem') # class_id 为 'codeItem' # 3. 构建基础查询,使用连接和别名 query = ( select( c2.src_sys.label("target_sys_id"), # 父级系统编号 c2.src_sys.label("target_sys_name"), # 父级系统名称 c2.cd_no.label("target_code_num"), # 父级代码编号 c2.cd_val_cn_mean.label("target_code_name"), # 父级代码名称 c1.cd_no.label("target_code_num_item"), # 当前 codeItem 的编号 c1.cd_val_cn_mean.label("target_code_name_item"), # 当前 codeItem 的名称 c4.cd_no.label("resource_code_num"), # 映射的父级代码编号 c4.cd_val_cn_mean.label("resource_code_name"), # 映射的父级代码名称 c3.cd_no.label("resource_code_num_item"), # 映射的父级代码编号 c3.cd_val_cn_mean.label("resource_code_name_item") # 映射的父级代码名称 ) .select_from(c1) # 从 c1 开始查询 .join(c2, c1.parent_id == c2.onum, isouter=True) # 连接 c2 .join(c3, c1.code_map_id == c3.onum, isouter=True) # 连接 c3 .join(c4, c3.parent_id == c4.onum, isouter=True) # 连接 c4 .where(*filters) # 使用所有过滤条件 .order_by(desc(c1.create_time)) # 按照创建时间降序排列 ) # 4. 分页逻辑 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def get_data_code_list_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): List = ( await db.execute( select(DataStdCode) .where( DataStdCode.cd_val_cn_mean == query_object.cd_val_cn_mean if query_object.cd_val_cn_mean else True, DataStdCode.onum == query_object.onum if query_object.onum else True, DataStdCode.cd_val_stat == query_object.cd_val_stat if query_object.cd_val_stat else True, DataStdCode.src_sys == query_object.src_sys if query_object.src_sys else True, DataStdCode.cd_type == query_object.cd_type if query_object.cd_type else True, DataStdCode.code_map_id == query_object.code_map_id if query_object.code_map_id else True, DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, DataStdCode.class_id == query_object.class_id if query_object.class_id else True ) ) ).scalars().all() return List @classmethod async def get_data_code_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel): List = ( await db.execute( select(DataStdCode) .where( DataStdCode.cd_val_cn_mean == query_object.cd_val_cn_mean if query_object.cd_val_cn_mean else True, DataStdCode.cd_no == query_object.cd_no if query_object.cd_no else True, DataStdCode.cd_val_stat == query_object.cd_val_stat if query_object.cd_val_stat else True, DataStdCode.src_sys == query_object.src_sys if query_object.src_sys else True, DataStdCode.cd_type == query_object.cd_type if query_object.cd_type else True, DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True, DataStdCode.class_id == query_object.class_id if query_object.class_id else True ) ) ).scalars().first() return List @classmethod async def get_std_code_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdCode) .where(DataStdCode.onum == Id) ) ).scalars().first() return col @classmethod async def get_std_code_appr_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdCodeAppr) .where(DataStdCodeAppr.onum == Id) ) ).scalars().first() return col @classmethod async def add_std_code(cls, db: AsyncSession,model:DataStdCodeModel): col = DataStdCode( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def add_std_code_appr(cls, db: AsyncSession,model:DataStdCodeApprModel): col = DataStdCodeAppr( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def delete_std_code(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdCode).where(DataStdCode.onum == Id)) @classmethod async def update_std_code_appr(cls, db: AsyncSession, update_data: DataStdCodeApprModel): await db.execute(update(DataStdCodeAppr), [update_data]) await db.flush() @classmethod async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel): await db.execute(update(DataStdCode), [update_data]) await db.flush() @classmethod async def check_code_num_exists(cls, query_db: AsyncSession, cd_no: str) -> bool: """ 检查标准代码或代码项的code_num是否已经存在 """ # 查询标准代码表中是否存在相同的code_num result = await query_db.execute(select(DataStdCodeModel).filter(DataStdCodeModel.cd_no == cd_no)) existing_code = result.scalar_one_or_none() # 如果不存在,则检查代码项表中是否存在相同的code_num if not existing_code: result = await query_db.execute(select(DataStdCode).filter(DataStdCodeModel.cd_no == cd_no)) existing_code_item = result.scalar_one_or_none() return existing_code_item is not None return True # ----------------------------------------------------------------数据字典---------------------------------------------------------------------------------------------------- @classmethod async def get_std_dict_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False): """ 获取 DataStdDict 的列表信息,支持模糊查询和分页 :param db: ORM对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列表信息 """ # 构建查询条件 filters = [] if query_object.data_dict_cn_name: filters.append(DataStdDict.data_dict_cn_name.like(f"%{query_object.data_dict_cn_name}%")) if query_object.data_dict_eng_name: filters.append(DataStdDict.data_dict_eng_name.like(f"%{query_object.data_dict_eng_name}%")) if query_object.data_dict_vest: filters.append(DataStdDict.data_dict_vest == query_object.data_dict_vest) if query_object.src_sys: filters.append(DataStdDict.src_sys == query_object.src_sys) if query_object.data_dict_data_type: filters.append(DataStdDict.data_dict_data_type == query_object.data_dict_data_type) if query_object.data_dict_stat: filters.append(DataStdDict.data_dict_stat == query_object.data_dict_stat) c1 = aliased(SysDept) # 技术部门 c2 = aliased(SysDept) # 业务部门 # 构建查询语句 query = ( select(DataStdDict.onum, DataStdDict.create_by, DataStdDict.create_time, DataStdDict.upd_prsn, DataStdDict.upd_time, DataStdDict.data_dict_no, DataStdDict.data_dict_eng_name, DataStdDict.data_dict_cn_name, DataStdDict.data_dict_vest, DataStdDict.data_dict_data_type, DataStdDict.src_sys, DataStdDict.src_sys, DataStdDict.data_dict_busi_mean, DataStdDict.data_dict_data_type, DataStdDict.data_std_no, DataStdDict.data_dict_stat ).select_from(DataStdDict) .where(*filters) .order_by(desc(DataStdDict.create_time)) # 按创建时间降序排序 ) # 分页处理 col_list = await PageUtil.paginate( db, query, query_object.page_num, query_object.page_size, is_page ) return col_list @classmethod async def add_std_dict(cls, db: AsyncSession,model:DataStdDictModel): col = DataStdDict( **model.model_dump() ) db.add(col) await db.flush() return col @classmethod async def delete_std_dict(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdDict).where(DataStdDict.onum == Id)) @classmethod async def get_std_dict_list_all(cls, db: AsyncSession, query_object: DataStdDict): filters = [] if query_object.data_std_no: filters.append(DataStdDict.data_std_no.like(f"%{query_object.data_std_no}%")) query = select(DataStdDict).where(*filters).order_by(desc(DataStdDict.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def update_std_dict(cls, db: AsyncSession, update_data: DataStdDictModel): await db.execute(update(DataStdDict), [update_data]) await db.flush() @classmethod async def get_std_dict_by_id(cls, db: AsyncSession, Id: str): col = ( await db.execute( select(DataStdDict) .where(DataStdDict.onum == Id) ) ).scalars().first() return col @classmethod async def get_data_dict_by_info(cls, db: AsyncSession, query_object: DataStdDictModel): List = ( await db.execute( select(DataStdDict) .where( DataStdDict.data_dict_cn_name == query_object.data_dict_cn_name if query_object.data_dict_cn_name else True, DataStdDict.data_dict_no == query_object.data_dict_no if query_object.data_dict_no else True, DataStdDict.data_dict_stat == query_object.data_dict_stat if query_object.data_dict_stat else True, DataStdDict.data_std_no == query_object.data_std_no if query_object.data_std_no else True, DataStdDict.src_sys == query_object.src_sys if query_object.src_sys else True, DataStdDict.data_dict_data_type == query_object.data_dict_data_type if query_object.data_dict_data_type else True, ) ) ).scalars().first() return List # 数据标准目录 @classmethod async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int): """ 根据目录ID获取目录详细信息 :param db: orm对象 :param content_onum: 目录ID :return: 目录信息对象 """ catalog_info = ( (await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum , DataAstContent.content_stat == 1))) .scalars() .first() ) return catalog_info @classmethod async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel): """ 根据目录参数获取目录信息 :param db: orm对象 :param catalog: 目录参数对象 :return: 目录信息对象 """ catalog_info = ( ( await db.execute( select(DataAstContent).where( DataAstContent.content_name == catalog.content_name if catalog.content_name else True, DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True, DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True, DataAstContent.content_stat == 1, ) ) ) .scalars() .first() ) return catalog_info @classmethod async def update_leaf_node_flag(cls, db: AsyncSession): """ 更新leaf_node_flag字段 """ # 创建别名对象 t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名 subquery = ( select(DataAstContent.content_onum) .where( DataAstContent.content_stat == '1', DataAstContent.leaf_node_flag == 0, not_( select(1) .select_from(t2) # 使用别名后的表 .where( t2.supr_content_onum == DataAstContent.content_onum, t2.content_stat == '1' ) .exists() # 添加exists()方法 ) ) ).alias('temp') stmt = ( update(DataAstContent) .where(DataAstContent.content_onum.in_(subquery)) .values(leaf_node_flag=1, upd_time=datetime.now()) ) await db.execute(stmt) @classmethod async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict): """ 新增目录数据库操作 :param db: orm对象 :param catalog: 目录对象 :return: """ db_catalog = DataAstContent(**catalog1) db.add(db_catalog) await db.flush() # 处理子关系(统一转换为 ORM 模型) for child in catalog2.get('children', []): # 如果是 Pydantic 模型实例,先转换为字典 if isinstance(child, DataCatalogChild): child_dict = child.model_dump() elif isinstance(child, dict): child_dict = child else: raise TypeError("不支持的子关系数据类型") # 创建 ORM 模型实例 processed_child = dict(child_dict) processed_child['content_onum'] = db_catalog.content_onum db_child = DataAstContentRela(**processed_child) db.add(db_child) await db.flush() return db_catalog @classmethod async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict): """ 编辑叶子节点目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ content_onum = catalog['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == content_onum) .values( leaf_node_flag=catalog['leaf_node_flag'] ) ) await db.execute(stmt) @classmethod async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ content_onum = catalog['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == content_onum) .values( content_name=catalog['content_name'], content_stat=catalog['content_stat'], content_intr=catalog['content_intr'], content_pic=catalog['content_pic'], supr_content_onum=catalog['supr_content_onum'], leaf_node_flag=catalog['leaf_node_flag'], upd_prsn=catalog['upd_prsn'], upd_time=datetime.now() ) ) await db.execute(stmt) # 处理子关系 for child in catalog.get('children', []): rela_onum = child.get('rela_onum') if rela_onum: st = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == rela_onum) .values( content_onum=child.get('content_onum'), ast_onum=child.get('ast_onum'), rela_type=child.get('rela_type'), rela_eff_begn_date=child.get('rela_eff_begn_date'), rela_eff_end_date=child.get('rela_eff_end_date'), upd_prsn=child.get('upd_prsn')) ) await db.execute(st) await cls.update_leaf_node_flag(db) else: child['content_onum'] = content_onum db_child = DataAstContentRela(**child) db.add(db_child) await db.flush() await cls.update_leaf_node_flag(db) @classmethod async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel): """ 删除目录数据库操作 :param db: orm对象 :param catalog: 目录对象 :content_stat=0 作废 :return: """ content_onums = catalog.content_onums.split(',') await db.execute( update(DataAstContentRela) .where(DataAstContentRela.content_onum.in_(content_onums)) .values( rela_status=0, rela_eff_end_date=datetime.now() ) ) await db.execute( update(DataAstContent) .where(DataAstContent.content_onum.in_(content_onums)) .values( content_stat=0, upd_time=datetime.now() ) ) await cls.update_leaf_node_flag(db) @classmethod async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ # content_onum = moved_catalog_data['content_onum'] stmt = ( update(DataAstContent) .where(DataAstContent.content_onum == moved_catalog_data['content_onum'] , DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum']) .values( supr_content_onum=moved_catalog_data['supr_content_onum_after'], upd_time=datetime.now() ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict): """ 编辑目录数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ # stmt = ( # update(DataAstContent) # .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) # .values( # content_onum=merge_catalog_data['content_onum_after'], # supr_content_onum=merge_catalog_data['supr_content_onum_after'], # upd_time=datetime.now() # ) ) # await db.execute(stmt) stmt1 = ( update(DataAstContentRela) .where( DataAstContentRela.content_onum == merge_catalog_data['content_onum'] and DataAstContentRela.rela_status == 1 ) .values( content_onum=merge_catalog_data['content_onum_after'], rela_eff_begn_date=datetime.now() ) ) await db.execute(stmt1) stmt2 = ( update(DataAstContent) .where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum']) .values( content_stat = '0' ) ) await db.execute(stmt2) await cls.update_leaf_node_flag(db) @classmethod async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict): """ 编辑资产关系数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ stmt = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == removerel_catalog_data['content_onum']) .values( rela_status=removerel_catalog_data['rela_status'] ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict): """ 编辑资产关系数据库操作 :param db: orm对象 :param catalog: 需要更新的目录字典 :return: """ stmt = ( update(DataAstContentRela) .where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == moverel_catalog_data['content_onum']) .values( content_onum=moverel_catalog_data['content_onum_after'], rela_eff_end_date=datetime.now() ) ) await db.execute(stmt) await cls.update_leaf_node_flag(db) @classmethod async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False): """ 根据查询参数获取数据资产目录列表 :param db: 异步会话对象 :param query_object: 分页查询参数对象 :param is_page: 是否分页 :return: 数据资产目录分页列表 """ # 修改子查询部分 subquery_t1 = ( select(DataAstContentRela) .where(DataAstContentRela.upd_prsn == query_object.upd_prsn, DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1') .union_all( select(DataAstContentRela) .where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1') ) ).alias('subquery_t1') # 为子查询分配唯一别名 query = ( select( DataAstContent.content_onum, DataAstContent.content_name, DataAstContent.content_stat, DataAstContent.content_intr, DataAstContent.content_pic, DataAstContent.supr_content_onum, DataAstContent.leaf_node_flag, DataAstContent.upd_prsn, DataAstContent.upd_time, subquery_t1.c.rela_onum, # 明确指定子查询的字段 subquery_t1.c.ast_onum, subquery_t1.c.rela_type, subquery_t1.c.rela_eff_begn_date, subquery_t1.c.rela_eff_end_date, subquery_t1.c.upd_prsn, ) .distinct() .select_from(DataAstContent) .outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名 .where(DataAstContent.content_stat == 1) .order_by(DataAstContent.content_onum) ) # 使用分页工具进行查询 data_ast_list = await PageUtil.paginate( db, query, page_num=query_object.page_num, page_size=query_object.page_size, is_page=is_page ) return data_ast_list #------------------------------------------------------------数据标准(658行)------------------------------------------------------------------------------------------------- @classmethod async def get_std_main_list(cls, db: AsyncSession, query_object: DataStdMainModel, is_page: bool = False): filters = [] if query_object.data_std_cn_name: filters.append(DataStdMain.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) if query_object.data_std_no: filters.append(DataStdMain.data_std_no.like(f"%{query_object.data_std_no}%")) if query_object.src_sys: filters.append(DataStdMain.src_sys == query_object.src_sys) if query_object.data_std_type: filters.append(DataStdMain.data_std_type == query_object.data_std_type) if query_object.std_status: filters.append(DataStdMain.std_status == query_object.std_status) if query_object.belt_data_std_content: filters.append(DataStdMain.belt_data_std_content == query_object.belt_data_std_content) query = select(DataStdMain).where(*filters).order_by(desc(DataStdMain.create_time)) return await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) @classmethod async def get_std_main_list_all(cls, db: AsyncSession, query_object: DataStdMainModel): filters = [] if query_object.data_std_cn_name: filters.append(DataStdMain.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) if query_object.data_std_no: filters.append(DataStdMain.data_std_no.like(f"%{query_object.data_std_no}%")) if query_object.src_sys: filters.append(DataStdMain.src_sys == query_object.src_sys) if query_object.cd_id: filters.append(DataStdMain.cd_id == query_object.cd_id) if query_object.data_std_type: filters.append(DataStdMain.data_std_type == query_object.data_std_type) if query_object.std_status: filters.append(DataStdMain.std_status == query_object.std_status) query = select(DataStdMain).where(*filters).order_by(desc(DataStdMain.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def add_std_main(cls, db: AsyncSession, model: DataStdMainModel): col = DataStdMain(**model.model_dump()) db.add(col) await db.flush() return col @classmethod async def add_std_main_appr(cls, db: AsyncSession, model: DataStdMainApprModel): col = DataStdMainAppr(**model.model_dump()) db.add(col) await db.flush() return col @classmethod async def add_std_dict_appr(cls, db: AsyncSession, model: DataStdDictApprModel): col = DataStdDictAppr(**model.model_dump()) db.add(col) await db.flush() return col @classmethod async def delete_std_main(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdMain).where(DataStdMain.onum == Id)) @classmethod async def delete_std_dict_appr(cls, db: AsyncSession, Id: str): await db.execute(delete(DataStdDictAppr).where(DataStdDictAppr.onum == Id)) @classmethod async def update_std_main(cls, db: AsyncSession, update_data: DataStdMainModel): update_dict = update_data.model_dump(exclude_unset=True) await db.execute(update(DataStdMain), [update_dict]) await db.flush() @classmethod async def update_std_dict_appr(cls, db: AsyncSession, update_data: DataStdDictApprModel): update_dict = update_data.model_dump(exclude_unset=True) await db.execute(update(DataStdDictAppr), [update_dict]) await db.flush() @classmethod async def get_std_main_by_id(cls, db: AsyncSession, Id: str): col = await db.execute(select(DataStdMain).where(DataStdMain.onum == Id)) return col.scalars().first() @classmethod async def get_data_main_by_info(cls, db: AsyncSession, query_object: DataStdMainModel): col = await db.execute( select(DataStdMain).where( DataStdMain.data_std_cn_name == query_object.data_std_cn_name if query_object.data_std_cn_name else True, DataStdMain.data_std_no == query_object.data_std_no if query_object.data_std_no else True, DataStdMain.data_std_no == query_object.data_std_no if query_object.data_std_no else True, DataStdMain.src_sys == query_object.src_sys if query_object.src_sys else True, DataStdMain.data_std_type == query_object.data_std_type if query_object.data_std_type else True, DataStdMain.cd_id == query_object.cd_id if query_object.cd_id else True, DataStdMain.std_status == query_object.std_status if query_object.std_status else True ) ) return col.scalars().first() @classmethod async def get_std_main_appr_list_all(cls, db: AsyncSession, query_object: DataStdMainApprModel): filters = [] if query_object.data_std_cn_name: filters.append(DataStdMainAppr.data_std_cn_name.like(f"%{query_object.data_std_cn_name}%")) if query_object.data_std_no: filters.append(DataStdMainAppr.data_std_no.like(f"%{query_object.data_std_no}%")) if query_object.src_sys: filters.append(DataStdMainAppr.src_sys == query_object.src_sys) if query_object.cd_id: filters.append(DataStdMainAppr.cd_id == query_object.cd_id) if query_object.flowId: filters.append(DataStdMainAppr.flowId == query_object.flowId) if query_object.data_std_type: filters.append(DataStdMainAppr.data_std_type == query_object.data_std_type) if query_object.std_status: filters.append(DataStdMainAppr.std_status == query_object.std_status) query = select(DataStdMainAppr).where(*filters).order_by(desc(DataStdMainAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_std_main_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataStdMainAppr.flowId == flowId) query = select(DataStdMainAppr).where(*filters).order_by(desc(DataStdMainAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_std_dict_appr_list(cls, flowId:str,db: AsyncSession): filters = [] filters.append(DataStdDictAppr.flowId == flowId) query = select(DataStdDictAppr).where(*filters).order_by(desc(DataStdDictAppr.create_time)) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def get_std_main_appr_by_id(cls, db: AsyncSession, Id: str): col = await db.execute(select(DataStdMainAppr).where(DataStdMainAppr.onum == Id)) return col.scalars().first() @classmethod async def get_std_dict_appr_by_id(cls, db: AsyncSession, Id: str): col = await db.execute(select(DataStdDictAppr).where(DataStdDictAppr.onum == Id)) return col.scalars().first() @classmethod async def get_last_std_main_appr_by_id(cls,db: AsyncSession, Id: str ): result = await db.execute( select(DataStdMainAppr) .where( DataStdMainAppr.oldInstId == Id, DataStdMainAppr.approStatus == "succeed" ) .order_by(DataStdMainAppr.upd_time.desc()) .limit(1) ) return result.scalar_one_or_none() @classmethod async def get_last_std_dict_appr_by_id(cls,db: AsyncSession, Id: str ): result = await db.execute( select(DataStdDictAppr) .where( DataStdDictAppr.oldInstId == Id, DataStdDictAppr.approStatus == "succeed" ) .order_by(DataStdDictAppr.upd_time.desc()) .limit(1) ) return result.scalar_one_or_none() @classmethod async def update_std_main_appr(cls, db: AsyncSession, update_data: DataStdMainApprModel): update_dict = update_data.model_dump(exclude_unset=True) await db.execute(update(DataStdMainAppr), [update_dict]) await db.flush() @classmethod async def check_std_main_waiting(cls, oldInstId: str, db: AsyncSession): filters = [ DataStdMainAppr.oldInstId == oldInstId, or_( DataStdMainAppr.approStatus == "waiting", DataStdMainAppr.approStatus == "pending" ) ] query = ( select(DataStdMainAppr) .where(*filters) .order_by(desc(DataStdMainAppr.create_time)) ) return await PageUtil.paginate(db, query, 0, 0, False) @classmethod async def check_std_dict_waiting(cls, oldInstId: str, db: AsyncSession): filters = [ DataStdDictAppr.oldInstId == oldInstId, or_( DataStdDictAppr.approStatus == "waiting", DataStdDictAppr.approStatus == "pending" ) ] query = ( select(DataStdDictAppr) .where(*filters) .order_by(desc(DataStdDictAppr.create_time)) ) return await PageUtil.paginate(db, query, 0, 0, False)