You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
166 lines
6.0 KiB
166 lines
6.0 KiB
from sqlalchemy import select, update, delete, insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
|
|
from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetPageQueryModel
|
|
from utils.page_util import PageUtil
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
class DataAssetDao:
|
|
"""
|
|
数据资产信息模块数据库操作层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_data_asset_list(cls, db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False):
|
|
"""
|
|
根据查询参数获取数据资产信息列表
|
|
|
|
:param db: orm对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 数据资产信息列表对象
|
|
"""
|
|
query = (
|
|
select(DataAssetInfo)
|
|
.where(
|
|
DataAssetInfo.ast_no == query_object.ast_no if query_object.ast_no else True,
|
|
DataAssetInfo.data_ast_eng_name.like(f'%{query_object.data_ast_eng_name}%') if query_object.data_ast_eng_name else True,
|
|
DataAssetInfo.data_ast_cn_name.like(f'%{query_object.data_ast_cn_name}%') if query_object.data_ast_cn_name else True,
|
|
DataAssetInfo.data_ast_type == query_object.data_ast_type if query_object.data_ast_type else True,
|
|
DataAssetInfo.data_ast_stat == query_object.data_ast_stat if query_object.data_ast_stat else True,
|
|
DataAssetInfo.data_ast_src == query_object.data_ast_src if query_object.data_ast_src else True,
|
|
)
|
|
.order_by(DataAssetInfo.data_ast_estb_time.desc())
|
|
)
|
|
|
|
data_asset_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
|
|
return data_asset_list
|
|
|
|
@classmethod
|
|
async def get_data_asset_by_ast_no(cls, db: AsyncSession, ast_no: str):
|
|
"""
|
|
根据资产编号获取数据资产信息
|
|
|
|
:param db: orm对象
|
|
:param ast_no: 资产编号
|
|
:return: 数据资产信息对象
|
|
"""
|
|
data_asset = (
|
|
await db.execute(select(DataAssetInfo).where(DataAssetInfo.ast_no == ast_no))
|
|
).scalars().first()
|
|
return data_asset
|
|
|
|
@classmethod
|
|
async def insert_data_asset(cls, db: AsyncSession, data_asset: Dict[str, Any]):
|
|
"""
|
|
插入数据资产信息
|
|
|
|
:param db: orm对象
|
|
:param data_asset: 数据资产信息字典
|
|
:return: 插入的数据资产对象
|
|
"""
|
|
db_data_asset = DataAssetInfo(**data_asset)
|
|
db.add(db_data_asset)
|
|
await db.flush()
|
|
return db_data_asset
|
|
|
|
@classmethod
|
|
async def update_data_asset(cls, db: AsyncSession, ast_no: str, data_asset: Dict[str, Any]):
|
|
"""
|
|
更新数据资产信息
|
|
|
|
:param db: orm对象
|
|
:param ast_no: 资产编号
|
|
:param data_asset: 数据资产信息字典
|
|
:return: 更新结果
|
|
"""
|
|
result = await db.execute(
|
|
update(DataAssetInfo)
|
|
.where(DataAssetInfo.ast_no == ast_no)
|
|
.values(**data_asset)
|
|
)
|
|
return result.rowcount
|
|
|
|
@classmethod
|
|
async def delete_data_asset(cls, db: AsyncSession, ast_no: str):
|
|
"""
|
|
删除数据资产信息
|
|
|
|
:param db: orm对象
|
|
:param ast_no: 资产编号
|
|
:return: 删除结果
|
|
"""
|
|
result = await db.execute(
|
|
delete(DataAssetInfo)
|
|
.where(DataAssetInfo.ast_no == ast_no)
|
|
)
|
|
return result.rowcount
|
|
@classmethod
|
|
async def get_data_asset_sources(cls, db: AsyncSession):
|
|
"""
|
|
获取所有数据资产来源列表
|
|
|
|
:param db: orm对象
|
|
:return: 数据资产来源列表
|
|
"""
|
|
# 使用distinct查询不重复的数据资产来源
|
|
result = await db.execute(
|
|
select(DataAssetInfo.data_ast_src)
|
|
.distinct()
|
|
.where(DataAssetInfo.data_ast_src.is_not(None))
|
|
.order_by(DataAssetInfo.data_ast_src)
|
|
)
|
|
sources = result.scalars().all()
|
|
return sources
|
|
@classmethod
|
|
async def search_data_assets(cls, db: AsyncSession, search_params: dict, is_page: bool = False, page_num: int = 1, page_size: int = 10):
|
|
"""
|
|
综合查询数据资产信息
|
|
|
|
:param db: orm对象
|
|
:param search_params: 查询参数字典
|
|
:param is_page: 是否分页
|
|
:param page_num: 页码
|
|
:param page_size: 每页大小
|
|
:return: 查询结果
|
|
"""
|
|
query = select(DataAssetInfo)
|
|
|
|
# 构建查询条件
|
|
conditions = []
|
|
|
|
# 名称查询(支持英文名称和中文名称)
|
|
if search_params.get('name'):
|
|
name_keyword = f"%{search_params['name']}%"
|
|
conditions.append(
|
|
(DataAssetInfo.data_ast_eng_name.like(name_keyword)) |
|
|
(DataAssetInfo.data_ast_cn_name.like(name_keyword))
|
|
)
|
|
|
|
# 数据资产类型
|
|
if search_params.get('data_ast_type'):
|
|
conditions.append(DataAssetInfo.data_ast_type == search_params['data_ast_type'])
|
|
|
|
# 资产应用场景
|
|
if search_params.get('data_ast_screen'):
|
|
conditions.append(DataAssetInfo.data_ast_screen == search_params['data_ast_screen'])
|
|
|
|
# 数据资产标签
|
|
if search_params.get('data_ast_clas'):
|
|
conditions.append(DataAssetInfo.data_ast_clas.like(f"%{search_params['data_ast_clas']}%"))
|
|
|
|
# 数据资产来源
|
|
if search_params.get('data_ast_src'):
|
|
conditions.append(DataAssetInfo.data_ast_src == search_params['data_ast_src'])
|
|
|
|
# 将所有条件添加到查询
|
|
for condition in conditions:
|
|
query = query.where(condition)
|
|
|
|
# 排序
|
|
query = query.order_by(DataAssetInfo.data_ast_estb_time.desc())
|
|
|
|
# 分页查询
|
|
result = await PageUtil.paginate(db, query, page_num, page_size, is_page)
|
|
return result
|