from sqlalchemy import or_, func from sqlalchemy.orm import Session,aliased from sqlalchemy.sql.expression import text from module_admin.entity.do.dasset_do import SysDasset from module_admin.entity.vo.dasset_vo import DassetModel, DassetResponse, CrudDassetResponse from utils.time_format_util import list_format_datetime class DassetDao: """ 数据资产管理模块数据库操作层 """ @classmethod def get_dasset_by_id(cls, db: Session, dasset_id: int): """ 根据数据资产id获取在用数据资产信息 :param db: orm对象 :param dasset_id: 数据资产id :return: 在用数据资产信息对象 """ dasset_info = db.query(SysDasset) \ .filter(SysDasset.dasset_id == dasset_id, SysDasset.status == 0, SysDasset.del_flag == 0) \ .first() return dasset_info @classmethod def get_dasset_by_id_for_list(cls, db: Session, dasset_id: int): """ 用于获取数据资产列表的工具方法 :param db: orm对象 :param dasset_id: 数据资产id :return: 数据资产id对应的信息对象 """ dasset_info = db.query(SysDasset) \ .filter(SysDasset.dasset_id == dasset_id, SysDasset.del_flag == 0) \ .first() return dasset_info @classmethod def get_dasset_detail_by_id(cls, db: Session, dasset_id: int): """ 根据数据资产id获取数据资产详细信息 :param db: orm对象 :param dasset_id: 数据资产id :return: 数据资产信息对象 """ dasset_info = db.query(SysDasset) \ .filter(SysDasset.dasset_id == dasset_id, SysDasset.del_flag == 0) \ .first() return dasset_info @classmethod def get_dasset_detail_by_info(cls, db: Session, dasset: DassetModel): """ 根据数据资产参数获取数据资产信息 :param db: orm对象 :param dasset: 数据资产参数对象 :return: 数据资产信息对象 """ dasset_info = db.query(SysDasset) \ .filter(SysDasset.dasset_parent_id == dasset.dasset_parent_id if dasset.dasset_parent_id else True, SysDasset.dasset_name == dasset.dasset_name if dasset.dasset_name else True) \ .first() return dasset_info @classmethod def get_dasset_info_for_edit_option(cls, db: Session, dasset_info: DassetModel, data_scope_sql: str): """ 获取数据资产编辑对应的在用数据资产列表信息 :param db: orm对象 :param dasset_info: 数据资产对象 :param data_scope_sql: 数据权限对应的查询sql语句 :return: 数据资产列表信息 """ dasset_result = db.query(SysDasset) \ .filter(SysDasset.dasset_id != dasset_info.dasset_id, SysDasset.dasset_parent_id != dasset_info.dasset_id, SysDasset.del_flag == 0, SysDasset.status == 0, eval(data_scope_sql)) \ .order_by(SysDasset.dasset_order_num) \ .distinct().all() return list_format_datetime(dasset_result) @classmethod def get_children_dasset(cls, db: Session, dasset_id: int): """ 根据数据资产id查询当前数据资产的子数据资产列表信息 :param db: orm对象 :param dasset_id: 数据资产id :return: 子数据资产信息列表 """ dasset_result = db.query(SysDasset) \ .filter(SysDasset.dasset_parent_id == dasset_id, SysDasset.del_flag == 0) \ .all() return list_format_datetime(dasset_result) @classmethod def get_dasset_all_ancestors(cls, db: Session): """ 获取所有数据资产的ancestors信息 :param db: orm对象 :return: ancestors信息列表 """ dasset_ancestors = db.query(SysDasset.dasset_ancestors)\ .filter(SysDasset.del_flag == 0)\ .all() return dasset_ancestors @classmethod def get_dasset_list_for_tree(cls, db: Session, dasset_info: DassetModel, data_scope_sql: str): """ 获取所有在用数据资产列表信息 :param db: orm对象 :param dasset_info: 数据资产对象 :param data_scope_sql: 数据权限对应的查询sql语句 :return: 在用数据资产列表信息 """ dasset_result = db.query(SysDasset) \ .filter(SysDasset.status == 0, SysDasset.del_flag == 0, #SysDasset.dasset_area != '语句配置', SysDasset.dasset_name.like(f'%{dasset_info.dasset_name}%') if dasset_info.dasset_name else True, eval(data_scope_sql)) \ .order_by(SysDasset.dasset_order_num) \ .distinct().all() return list_format_datetime(dasset_result) @classmethod def get_dasset_list(cls, db: Session, page_object: DassetModel, data_scope_sql: str): """ 根据查询参数获取数据资产列表信息 :param db: orm对象 :param page_object: 不分页查询参数对象 :param data_scope_sql: 数据权限对应的查询sql语句 :return: 数据资产列表信息对象 """ dasset_result = db.query(SysDasset) \ .filter(SysDasset.del_flag == 0, SysDasset.status == page_object.status if page_object.status else True, SysDasset.dasset_name.like(f'%{page_object.dasset_name}%') if page_object.dasset_name else True, eval(data_scope_sql)) \ .order_by(SysDasset.dasset_order_num) \ .distinct().all() result = dict( rows=list_format_datetime(dasset_result), ) return DassetResponse(**result) @classmethod def add_dasset_dao(cls, db: Session, dasset: DassetModel): """ 新增数据资产数据库操作 :param db: orm对象 :param dasset: 数据资产对象 :return: 新增校验结果 """ db_dasset = SysDasset(**dasset.dict()) db.add(db_dasset) db.commit() db.flush() # 构建原生SQL更新语句 update_sql = """ update sys_dasset a join ( select row_number() over(PARTITION BY a.dasset_parent_id order by dasset_id ) as s1, a.* from sys_dasset a ) b on a.dasset_id = b.dasset_id set a.dasset_order_num = b.s1 """ # 执行原生SQL db.execute(text(update_sql)) db.commit() db.flush() return db_dasset @classmethod def edit_dasset_dao(cls, db: Session, dasset: dict): """ 编辑数据资产数据库操作 :param db: orm对象 :param dasset: 需要更新的数据资产字典 :return: 编辑校验结果 """ db.query(SysDasset) \ .filter(SysDasset.dasset_id == dasset.get('dasset_id')) \ .update(dasset) @classmethod def delete_dasset_dao(cls, db: Session, dasset: DassetModel): """ 删除数据资产数据库操作 :param db: orm对象 :param dasset: 数据资产对象 :return: #.update({SysDasset.del_flag: '2', SysDasset.update_by: dasset.update_by, SysDasset.update_time: dasset.update_time}) """ db.query(SysDasset) \ .filter(SysDasset.dasset_id == dasset.dasset_id) \ .delete() @classmethod def get_dasset_detail_by_dname(cls, db: Session, dasset_name: str): """ 根据数据资产名称查询当前数据资产的子数据资产列表信息 :param db: orm对象 :param dasset_id: 数据资产name :return: 数据资产信息对象 """ dasset_info = db.query(SysDasset) \ .filter(SysDasset.dasset_name == dasset_name, SysDasset.del_flag == 0) \ .first() return dasset_info