You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

166 lines
6.0 KiB

from sqlalchemy import select, update, delete, insert
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.data_ast_content_do import DataAssetInfo
from module_admin.entity.vo.data_asset_vo import DataAssetItemModel, DataAssetPageQueryModel
from utils.page_util import PageUtil
from typing import List, Dict, Any
class DataAssetDao:
"""
数据资产信息模块数据库操作层
"""
@classmethod
async def get_data_asset_list(cls, db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False):
"""
根据查询参数获取数据资产信息列表
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 数据资产信息列表对象
"""
query = (
select(DataAssetInfo)
.where(
DataAssetInfo.ast_no == query_object.ast_no if query_object.ast_no else True,
DataAssetInfo.data_ast_eng_name.like(f'%{query_object.data_ast_eng_name}%') if query_object.data_ast_eng_name else True,
DataAssetInfo.data_ast_cn_name.like(f'%{query_object.data_ast_cn_name}%') if query_object.data_ast_cn_name else True,
DataAssetInfo.data_ast_type == query_object.data_ast_type if query_object.data_ast_type else True,
DataAssetInfo.data_ast_stat == query_object.data_ast_stat if query_object.data_ast_stat else True,
DataAssetInfo.data_ast_src == query_object.data_ast_src if query_object.data_ast_src else True,
)
.order_by(DataAssetInfo.data_ast_estb_time.desc())
)
data_asset_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return data_asset_list
@classmethod
async def get_data_asset_by_ast_no(cls, db: AsyncSession, ast_no: str):
"""
根据资产编号获取数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:return: 数据资产信息对象
"""
data_asset = (
await db.execute(select(DataAssetInfo).where(DataAssetInfo.ast_no == ast_no))
).scalars().first()
return data_asset
@classmethod
async def insert_data_asset(cls, db: AsyncSession, data_asset: Dict[str, Any]):
"""
插入数据资产信息
:param db: orm对象
:param data_asset: 数据资产信息字典
:return: 插入的数据资产对象
"""
db_data_asset = DataAssetInfo(**data_asset)
db.add(db_data_asset)
await db.flush()
return db_data_asset
@classmethod
async def update_data_asset(cls, db: AsyncSession, ast_no: str, data_asset: Dict[str, Any]):
"""
更新数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:param data_asset: 数据资产信息字典
:return: 更新结果
"""
result = await db.execute(
update(DataAssetInfo)
.where(DataAssetInfo.ast_no == ast_no)
.values(**data_asset)
)
return result.rowcount
@classmethod
async def delete_data_asset(cls, db: AsyncSession, ast_no: str):
"""
删除数据资产信息
:param db: orm对象
:param ast_no: 资产编号
:return: 删除结果
"""
result = await db.execute(
delete(DataAssetInfo)
.where(DataAssetInfo.ast_no == ast_no)
)
return result.rowcount
@classmethod
async def get_data_asset_sources(cls, db: AsyncSession):
"""
获取所有数据资产来源列表
:param db: orm对象
:return: 数据资产来源列表
"""
# 使用distinct查询不重复的数据资产来源
result = await db.execute(
select(DataAssetInfo.data_ast_src)
.distinct()
.where(DataAssetInfo.data_ast_src.is_not(None))
.order_by(DataAssetInfo.data_ast_src)
)
sources = result.scalars().all()
return sources
@classmethod
async def search_data_assets(cls, db: AsyncSession, search_params: dict, is_page: bool = False, page_num: int = 1, page_size: int = 10):
"""
综合查询数据资产信息
:param db: orm对象
:param search_params: 查询参数字典
:param is_page: 是否分页
:param page_num: 页码
:param page_size: 每页大小
:return: 查询结果
"""
query = select(DataAssetInfo)
# 构建查询条件
conditions = []
# 名称查询(支持英文名称和中文名称)
if search_params.get('name'):
name_keyword = f"%{search_params['name']}%"
conditions.append(
(DataAssetInfo.data_ast_eng_name.like(name_keyword)) |
(DataAssetInfo.data_ast_cn_name.like(name_keyword))
)
# 数据资产类型
if search_params.get('data_ast_type'):
conditions.append(DataAssetInfo.data_ast_type == search_params['data_ast_type'])
# 资产应用场景
if search_params.get('data_ast_screen'):
conditions.append(DataAssetInfo.data_ast_screen == search_params['data_ast_screen'])
# 数据资产标签
if search_params.get('data_ast_clas'):
conditions.append(DataAssetInfo.data_ast_clas.like(f"%{search_params['data_ast_clas']}%"))
# 数据资产来源
if search_params.get('data_ast_src'):
conditions.append(DataAssetInfo.data_ast_src == search_params['data_ast_src'])
# 将所有条件添加到查询
for condition in conditions:
query = query.where(condition)
# 排序
query = query.order_by(DataAssetInfo.data_ast_estb_time.desc())
# 分页查询
result = await PageUtil.paginate(db, query, page_num, page_size, is_page)
return result