You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
7.9 KiB

from sqlalchemy import select, update, delete, insert, func, Integer, text
10 months ago
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
from module_admin.entity.do.dataast_do import DataAssetInfoAppr
10 months ago
from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetPageQueryModel
from utils.page_util import PageUtil
from typing import List, Dict, Any
class DataAssetDao:
"""
数据资产信息模块数据库操作层
"""
@classmethod
async def get_data_asset_list(cls, db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False):
"""
根据查询参数获取数据资产信息列表
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据资产信息列表对象
"""
query = (
select(DataAssetInfo)
.where(
DataAssetInfo.ast_no == query_object.ast_no if query_object.ast_no else True,
DataAssetInfo.data_ast_eng_name.like(f'%{query_object.data_ast_eng_name}%') if query_object.data_ast_eng_name else True,
DataAssetInfo.data_ast_cn_name.like(f'%{query_object.data_ast_cn_name}%') if query_object.data_ast_cn_name else True,
DataAssetInfo.data_ast_type == query_object.data_ast_type if query_object.data_ast_type else True,
DataAssetInfo.data_ast_stat == query_object.data_ast_stat if query_object.data_ast_stat else True,
DataAssetInfo.data_ast_src == query_object.data_ast_src if query_object.data_ast_src else True,
)
.order_by(DataAssetInfo.data_ast_estb_time.desc())
)
data_asset_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return data_asset_list
@classmethod
async def getColumnsByTable(cls, db: AsyncSession, tableId: int):
sql = text("select a.onum as onum,"
"a.ssys_id as ssys_id,"
" a.mdl_name as mdl_name,"
"a.fld_eng_name as fld_eng_name,"
"case when a.fld_cn_name is null then b.fld_crrct_name"
" when a.fld_cn_name is not null then a.fld_cn_name end as fld_cn_name,"
"a.fld_type as fld_type,"
"case when a.pk_flag is null then b.crrct_pk_flag "
"when a.pk_flag is not null then a.pk_flag end as pk_flag"
" from t_metadata_fld_tab_extract_info a "
"left join t_metadata_fld_supp_info b "
"on a.ssys_id = b.ssys_id and a.mdl_name=b.mdl_name and"
" a.tab_eng_name=b.tab_eng_name and a.fld_eng_name=b.fld_eng_name "
"left join t_metadata_extract_info c "
"on a.ssys_id = c.ssys_id and a.mdl_name=c.mdl_name and a.tab_eng_name=c.tab_eng_name "
"where c.onum = :ssysId order by a.fld_no")
result = (await db.execute(sql, {"ssysId": tableId}))
data = [dict(row._mapping) for row in result]
return data
10 months ago
@classmethod
async def get_data_asset_by_ast_no(cls, db: AsyncSession, ast_no: str):
"""
根据资产编号获取数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:return: 数据资产信息对象
"""
data_asset = (
await db.execute(select(DataAssetInfo).where(DataAssetInfo.ast_no == ast_no))
).scalars().first()
return data_asset
@classmethod
async def insert_data_asset(cls, db: AsyncSession, data_asset: Dict[str, Any]):
"""
插入数据资产信息
:param db: orm对象
:param data_asset: 数据资产信息字典
:return: 插入的数据资产对象
"""
db_data_asset = DataAssetInfo(**data_asset)
db.add(db_data_asset)
await db.flush()
return db_data_asset
@classmethod
async def update_data_asset(cls, db: AsyncSession, ast_no: str, data_asset: Dict[str, Any]):
"""
更新数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:param data_asset: 数据资产信息字典
:return: 更新结果
"""
result = await db.execute(
update(DataAssetInfo)
.where(DataAssetInfo.ast_no == ast_no)
.values(**data_asset)
)
return result.rowcount
10 months ago
@classmethod
async def delete_data_asset(cls, db: AsyncSession, ast_no: str):
"""
删除数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:return: 删除结果
"""
# 更新t_data_ast_info_appr表中对应记录的状态为0(无效)
await db.execute(
update(DataAssetInfoAppr)
.where(DataAssetInfoAppr.ast_no == ast_no)
.values(data_ast_stat='0')
)
# 删除t_data_ast_info表中的记录
10 months ago
result = await db.execute(
delete(DataAssetInfo)
.where(DataAssetInfo.ast_no == ast_no)
)
# 提交事务
await db.commit()
return result.rowcount
10 months ago
@classmethod
async def get_data_asset_sources(cls, db: AsyncSession):
"""
获取所有数据资产来源列表
:param db: orm对象
:return: 数据资产来源列表
"""
# 使用distinct查询不重复的数据资产来源
result = await db.execute(
select(DataAssetInfo.data_ast_src)
.distinct()
.where(DataAssetInfo.data_ast_src.is_not(None))
.order_by(DataAssetInfo.data_ast_src)
)
sources = result.scalars().all()
return sources
@classmethod
async def search_data_assets(cls, db: AsyncSession, search_params: dict, is_page: bool = False, page_num: int = 1, page_size: int = 10):
"""
综合查询数据资产信息
:param db: orm对象
:param search_params: 查询参数字典
:param is_page: 是否分页
:param page_num: 页码
:param page_size: 每页大小
:return: 查询结果
"""
query = select(DataAssetInfo)
# 构建查询条件
conditions = []
# 名称查询(支持英文名称和中文名称)
if search_params.get('name'):
name_keyword = f"%{search_params['name']}%"
conditions.append(
(DataAssetInfo.data_ast_eng_name.like(name_keyword)) |
(DataAssetInfo.data_ast_cn_name.like(name_keyword))
)
# 数据资产类型
if search_params.get('data_ast_type'):
conditions.append(DataAssetInfo.data_ast_type == search_params['data_ast_type'])
# 资产应用场景
if search_params.get('data_ast_screen'):
conditions.append(DataAssetInfo.data_ast_screen == search_params['data_ast_screen'])
# 数据资产标签
if search_params.get('data_ast_clas'):
conditions.append(DataAssetInfo.data_ast_clas.like(f"%{search_params['data_ast_clas']}%"))
# 数据资产来源
if search_params.get('data_ast_src'):
if search_params['data_ast_src'] == '表数据资产':
conditions.append(
DataAssetInfo.data_ast_src.op("regexp")("^[0-9]+$")
)
else:
conditions.append(DataAssetInfo.data_ast_src == search_params['data_ast_src'])
10 months ago
# 将所有条件添加到查询
for condition in conditions:
query = query.where(condition)
# 排序
query = query.order_by(DataAssetInfo.data_ast_estb_time.desc())
# 分页查询
result = await PageUtil.paginate(db, query, page_num, page_size, is_page)
return result