You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1137 lines
39 KiB
1137 lines
39 KiB
from datetime import datetime
|
|
from sqlalchemy import case
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy import delete, func, not_, select, update, or_, and_, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.exc import IntegrityError
|
|
from module_admin.entity.do.data_ast_content_do import DataAstContent,DataAstContentRela,DataAstInfo,DataAstBookmarkRela,DataAstIndx
|
|
from module_admin.entity.do.user_do import SysUser
|
|
from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogChild,DataAstBookmarkRelaRequest,DataAstIndxRequest,DataAstIndxResponse
|
|
from utils.page_util import PageUtil
|
|
from utils.log_util import logger
|
|
|
|
|
|
class DataCatalogDAO:
|
|
"""
|
|
数据目录管理模块数据库操作层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int):
|
|
"""
|
|
根据目录ID获取目录详细信息
|
|
|
|
:param db: orm对象
|
|
:param content_onum: 目录ID
|
|
:return: 目录信息对象
|
|
"""
|
|
catalog_info = (
|
|
(await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum , DataAstContent.content_stat == 1)))
|
|
.scalars()
|
|
.first()
|
|
)
|
|
|
|
return catalog_info
|
|
|
|
@classmethod
|
|
async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel):
|
|
"""
|
|
根据目录参数获取目录信息
|
|
|
|
:param db: orm对象
|
|
:param catalog: 目录参数对象
|
|
:return: 目录信息对象
|
|
"""
|
|
catalog_info = (
|
|
(
|
|
await db.execute(
|
|
select(DataAstContent).where(
|
|
DataAstContent.content_name == catalog.content_name if catalog.content_name else True,
|
|
DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True,
|
|
DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True,
|
|
DataAstContent.content_stat == 1,
|
|
)
|
|
)
|
|
)
|
|
.scalars()
|
|
.first()
|
|
)
|
|
|
|
return catalog_info
|
|
|
|
|
|
|
|
# @classmethod
|
|
# async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False):
|
|
# """
|
|
# 根据查询参数获取数据资产目录列表
|
|
|
|
# :param db: 异步会话对象
|
|
# :param query_object: 分页查询参数对象
|
|
# :param is_page: 是否分页
|
|
# :return: 数据资产目录分页列表
|
|
# """
|
|
# 创建别名对象
|
|
# t1 = aliased(DataAstContentRela, name='t1')
|
|
# t2 = aliased(DataAstInfo, name='t2')
|
|
# t3 = aliased(DataAstBookmarkRela, name='t3')
|
|
|
|
# # 修改子查询部分
|
|
# subquery_t1 = (
|
|
# select(DataAstContentRela)
|
|
# .where(DataAstContentRela.upd_prsn == query_object.upd_prsn, DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1')
|
|
# .union_all(
|
|
# select(DataAstContentRela)
|
|
# .where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1')
|
|
# )
|
|
# ).alias('subquery_t1') # 为子查询分配唯一别名
|
|
|
|
# query = (
|
|
# select(
|
|
# DataAstContent.content_onum,
|
|
# DataAstContent.content_name,
|
|
# DataAstContent.content_stat,
|
|
# DataAstContent.content_intr,
|
|
# DataAstContent.content_pic,
|
|
# DataAstContent.supr_content_onum,
|
|
# DataAstContent.leaf_node_flag,
|
|
# DataAstContent.upd_prsn,
|
|
# DataAstContent.upd_time,
|
|
|
|
# subquery_t1.c.rela_onum, # 明确指定子查询的字段
|
|
# subquery_t1.c.ast_onum,
|
|
# subquery_t1.c.rela_type,
|
|
# subquery_t1.c.rela_eff_begn_date,
|
|
# subquery_t1.c.rela_eff_end_date,
|
|
# subquery_t1.c.upd_prsn,
|
|
|
|
# t2.data_ast_no,
|
|
# t2.data_ast_eng_name,
|
|
# t2.data_ast_cn_name,
|
|
# t2.data_ast_type,
|
|
# t2.data_ast_stat,
|
|
# t2.data_ast_desc,
|
|
# t2.data_ast_clas,
|
|
# t2.data_ast_cont,
|
|
# t2.data_ast_faq,
|
|
# t2.data_ast_estb_time,
|
|
# t2.data_ast_upd_time,
|
|
# t2.data_ast_src,
|
|
# t2.ast_no,
|
|
# t3.bookmark_orde,
|
|
# case(
|
|
# (t3.rela_onum.isnot(None), 1),
|
|
# else_=0
|
|
# ).label('bookmark_flag')
|
|
# )
|
|
# .distinct()
|
|
# .select_from(DataAstContent)
|
|
# .outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名
|
|
# .outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no)
|
|
# .outerjoin(t3, and_(
|
|
# subquery_t1.c.ast_onum == t3.data_ast_no,
|
|
# t3.user_id == user_id
|
|
# ))
|
|
# .where(DataAstContent.content_stat == 1)
|
|
# .order_by(DataAstContent.content_onum)
|
|
# )
|
|
|
|
# # 使用分页工具进行查询
|
|
# data_ast_list = await PageUtil.paginate(
|
|
# db,
|
|
# query,
|
|
# page_num=query_object.page_num,
|
|
# page_size=query_object.page_size,
|
|
# is_page=is_page
|
|
# )
|
|
|
|
# return data_ast_list
|
|
|
|
@classmethod
|
|
async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, user_name: str, is_page: bool = False):
|
|
# 创建别名对象
|
|
t1 = aliased(DataAstContentRela, name='t1')
|
|
t2 = aliased(DataAstInfo, name='t2')
|
|
t3 = aliased(DataAstBookmarkRela, name='t3')
|
|
|
|
# 构建子查询1(对应subquery_t1)
|
|
subquery_t1 = (
|
|
select(t1)
|
|
.where(
|
|
t1.upd_prsn == user_name,
|
|
t1.content_onum == '2',
|
|
t1.rela_status == '1'
|
|
)
|
|
.union_all(
|
|
select(t1)
|
|
.where(
|
|
t1.content_onum != '2',
|
|
t1.rela_status == '1'
|
|
)
|
|
)
|
|
).alias('subquery_t1')
|
|
|
|
# 新增子查询2(对应subquery_t2)
|
|
subquery_t2 = (
|
|
select(t1.rela_onum)
|
|
.where(
|
|
t1.rela_status == 1,
|
|
t1.upd_prsn == user_name, #query_object.upd_prsn
|
|
t1.content_onum.in_(
|
|
select(DataAstContent.content_onum)
|
|
.where(
|
|
or_(
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_onum == 2
|
|
),
|
|
DataAstContent.content_stat == 1,
|
|
DataAstContent.upd_prsn == user_name
|
|
)
|
|
)
|
|
)
|
|
).alias('subquery_t2')
|
|
|
|
# 主查询构建
|
|
query = (
|
|
select(
|
|
# 原有字段保持不变
|
|
DataAstContent.content_onum,
|
|
DataAstContent.content_name,
|
|
DataAstContent.content_stat,
|
|
DataAstContent.content_intr,
|
|
DataAstContent.content_pic,
|
|
DataAstContent.supr_content_onum,
|
|
DataAstContent.leaf_node_flag,
|
|
DataAstContent.upd_prsn,
|
|
DataAstContent.upd_time,
|
|
|
|
subquery_t1.c.rela_onum,
|
|
subquery_t1.c.ast_onum,
|
|
subquery_t1.c.rela_type,
|
|
subquery_t1.c.rela_eff_begn_date,
|
|
subquery_t1.c.rela_eff_end_date,
|
|
subquery_t1.c.upd_prsn,
|
|
|
|
# 修正:直接使用t2的属性,而非t2.c
|
|
t2.data_ast_no,
|
|
t2.data_ast_eng_name,
|
|
t2.data_ast_cn_name,
|
|
t2.data_ast_type,
|
|
t2.data_ast_stat,
|
|
t2.data_ast_desc,
|
|
t2.data_ast_clas,
|
|
t2.data_ast_cont,
|
|
t2.data_ast_faq,
|
|
t2.data_ast_estb_time,
|
|
t2.data_ast_upd_time,
|
|
t2.data_ast_src,
|
|
t2.ast_no,
|
|
|
|
t3.bookmark_orde,
|
|
case(
|
|
(t3.rela_onum != None, 1),
|
|
else_=0
|
|
).label('bookmark_flag'),
|
|
case(
|
|
(subquery_t2.c.rela_onum != None, 1),
|
|
else_=0
|
|
).label('sczc_flag')
|
|
)
|
|
.distinct()
|
|
.select_from(DataAstContent)
|
|
.outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum)
|
|
.outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no)
|
|
.outerjoin(t3, and_(
|
|
subquery_t1.c.ast_onum == t3.data_ast_no,
|
|
t3.user_id == user_id
|
|
))
|
|
.outerjoin(subquery_t2, subquery_t1.c.rela_onum == subquery_t2.c.rela_onum)
|
|
.where(
|
|
DataAstContent.content_stat == 1
|
|
)
|
|
.order_by(DataAstContent.content_onum)
|
|
)
|
|
# 分页处理保持不变
|
|
data_ast_list = await PageUtil.paginate(
|
|
db,
|
|
query,
|
|
page_num=query_object.page_num,
|
|
page_size=query_object.page_size,
|
|
is_page=is_page
|
|
)
|
|
return data_ast_list
|
|
|
|
|
|
@classmethod
|
|
async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict):
|
|
"""
|
|
新增目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 目录对象
|
|
:return:
|
|
"""
|
|
db_catalog = DataAstContent(**catalog1)
|
|
db.add(db_catalog)
|
|
await db.flush()
|
|
|
|
|
|
# 处理子关系(统一转换为 ORM 模型)
|
|
for child in catalog2.get('children', []):
|
|
# 如果是 Pydantic 模型实例,先转换为字典
|
|
if isinstance(child, DataCatalogChild):
|
|
child_dict = child.model_dump()
|
|
elif isinstance(child, dict):
|
|
child_dict = child
|
|
else:
|
|
raise TypeError("不支持的子关系数据类型")
|
|
|
|
# 创建 ORM 模型实例
|
|
processed_child = dict(child_dict)
|
|
processed_child['content_onum'] = db_catalog.content_onum
|
|
db_child = DataAstContentRela(**processed_child)
|
|
|
|
db.add(db_child)
|
|
await db.flush()
|
|
|
|
return db_catalog
|
|
|
|
|
|
@classmethod
|
|
async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict):
|
|
"""
|
|
编辑叶子节点目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
content_onum = catalog['content_onum']
|
|
stmt = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
.values(
|
|
leaf_node_flag=catalog['leaf_node_flag']
|
|
)
|
|
)
|
|
|
|
await db.execute(stmt)
|
|
|
|
|
|
@classmethod
|
|
async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict):
|
|
"""
|
|
编辑目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
content_onum = catalog['content_onum']
|
|
|
|
stmt = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
.values(
|
|
content_name=catalog['content_name'],
|
|
content_stat=catalog['content_stat'],
|
|
content_intr=catalog['content_intr'],
|
|
content_pic=catalog['content_pic'],
|
|
supr_content_onum=catalog['supr_content_onum'],
|
|
leaf_node_flag=catalog['leaf_node_flag'],
|
|
upd_prsn=catalog['upd_prsn'],
|
|
upd_time=datetime.now()
|
|
) )
|
|
|
|
await db.execute(stmt)
|
|
|
|
# 处理子关系
|
|
for child in catalog.get('children', []):
|
|
rela_onum = child.get('rela_onum')
|
|
if rela_onum:
|
|
st = (
|
|
update(DataAstContentRela)
|
|
.where(DataAstContentRela.rela_onum == rela_onum)
|
|
.values(
|
|
content_onum=child.get('content_onum'),
|
|
ast_onum=child.get('ast_onum'),
|
|
rela_type=child.get('rela_type'),
|
|
rela_eff_begn_date=child.get('rela_eff_begn_date'),
|
|
rela_eff_end_date=child.get('rela_eff_end_date'),
|
|
upd_prsn=child.get('upd_prsn'))
|
|
)
|
|
|
|
await db.execute(st)
|
|
await cls.update_leaf_node_flag(db)
|
|
else:
|
|
child['content_onum'] = content_onum
|
|
db_child = DataAstContentRela(**child)
|
|
db.add(db_child)
|
|
await db.flush()
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
async def edit_catalog_dao(cls, db: AsyncSession, catalog1: dict):
|
|
"""
|
|
编辑目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
content_onum = catalog1['content_onum']
|
|
stmt = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
.values(
|
|
content_name=catalog1['content_name'],
|
|
content_stat=catalog1['content_stat'],
|
|
content_intr=catalog1['content_intr'],
|
|
content_pic=catalog1['content_pic'],
|
|
supr_content_onum=catalog1['supr_content_onum'],
|
|
leaf_node_flag=catalog1['leaf_node_flag'],
|
|
upd_prsn=catalog1['upd_prsn'],
|
|
upd_time=datetime.now()
|
|
)
|
|
)
|
|
|
|
await db.execute(stmt)
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
@classmethod
|
|
async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel):
|
|
"""
|
|
删除目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 目录对象
|
|
:content_stat=0 作废
|
|
:return:
|
|
"""
|
|
content_onums = catalog.content_onums.split(',')
|
|
logger.info(f"Updating DataAstContent with supr_content_onum in {content_onums}")
|
|
await db.execute(
|
|
update(DataAstContentRela)
|
|
.where(DataAstContentRela.content_onum.in_(content_onums))
|
|
.values(
|
|
rela_status=0,
|
|
rela_eff_end_date=datetime.now()
|
|
)
|
|
)
|
|
await db.execute(
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum.in_(content_onums))
|
|
.values(
|
|
content_stat=0,
|
|
upd_time=datetime.now()
|
|
)
|
|
)
|
|
|
|
|
|
logger.info("更新叶子节点标志成功")
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
async def get_data_asset_catalog_tree(cls, db: AsyncSession):
|
|
"""
|
|
获取数据资产树
|
|
|
|
:param db: 异步会话对象
|
|
:return: 去重后的数据资产树数据
|
|
"""
|
|
# 创建别名对象
|
|
a = aliased(DataAstInfo, name='a')
|
|
b = aliased(DataAstContentRela, name='b')
|
|
|
|
# 构建查询
|
|
query = (
|
|
select(
|
|
a.data_ast_src,
|
|
a.data_ast_eng_name,
|
|
a.data_ast_cn_name,
|
|
a.ast_no,
|
|
case(
|
|
(b.ast_onum.isnot(None), 1),
|
|
else_=0
|
|
).label('rel_status')
|
|
)
|
|
.distinct()
|
|
.select_from(a)
|
|
.outerjoin(b, and_(
|
|
a.ast_no == b.ast_onum,
|
|
b.rela_status == 1
|
|
))
|
|
.where(a.data_ast_stat == 1)
|
|
)
|
|
result = await db.execute(query)
|
|
rows = result.fetchall()
|
|
|
|
return rows
|
|
|
|
|
|
|
|
@classmethod
|
|
async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict):
|
|
"""
|
|
编辑目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
# content_onum = moved_catalog_data['content_onum']
|
|
|
|
stmt = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum == moved_catalog_data['content_onum'] , DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum'])
|
|
.values(
|
|
supr_content_onum=moved_catalog_data['supr_content_onum_after'],
|
|
upd_time=datetime.now()
|
|
) )
|
|
|
|
await db.execute(stmt)
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
|
|
@classmethod
|
|
async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict):
|
|
"""
|
|
编辑目录数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
|
|
stmt1 = (
|
|
update(DataAstContentRela)
|
|
.where( DataAstContentRela.content_onum == merge_catalog_data['content_onum'] and DataAstContentRela.rela_status == 1 )
|
|
.values(
|
|
content_onum=merge_catalog_data['content_onum_after'],
|
|
rela_eff_begn_date=datetime.now()
|
|
)
|
|
)
|
|
await db.execute(stmt1)
|
|
|
|
stmt2 = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum'])
|
|
.values( content_stat = '0' )
|
|
)
|
|
await db.execute(stmt2)
|
|
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
@classmethod
|
|
async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict):
|
|
"""
|
|
编辑资产关系数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
|
|
stmt = (
|
|
update(DataAstContentRela)
|
|
.where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == removerel_catalog_data['content_onum'])
|
|
.values(
|
|
rela_status=removerel_catalog_data['rela_status']
|
|
) )
|
|
|
|
await db.execute(stmt)
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
@classmethod
|
|
async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict):
|
|
"""
|
|
编辑资产关系数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 需要更新的目录字典
|
|
:return:
|
|
"""
|
|
|
|
stmt = (
|
|
update(DataAstContentRela)
|
|
.where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == moverel_catalog_data['content_onum'])
|
|
.values(
|
|
content_onum=moverel_catalog_data['content_onum_after'],
|
|
rela_eff_end_date=datetime.now()
|
|
) )
|
|
|
|
await db.execute(stmt)
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
@classmethod
|
|
async def update_leaf_node_flag(cls, db: AsyncSession):
|
|
"""
|
|
更新leaf_node_flag字段
|
|
"""
|
|
# 创建别名对象
|
|
t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名
|
|
subquery = (
|
|
select(DataAstContent.content_onum)
|
|
.where(
|
|
DataAstContent.content_stat == '1',
|
|
DataAstContent.leaf_node_flag == 0,
|
|
not_(
|
|
select(1)
|
|
.select_from(t2) # 使用别名后的表
|
|
.where(
|
|
t2.supr_content_onum == DataAstContent.content_onum,
|
|
t2.content_stat == '1'
|
|
)
|
|
.exists() # 添加exists()方法
|
|
)
|
|
)
|
|
).alias('temp')
|
|
|
|
stmt = (
|
|
update(DataAstContent)
|
|
.where(DataAstContent.content_onum.in_(subquery))
|
|
.values(leaf_node_flag=1, upd_time=datetime.now())
|
|
)
|
|
await db.execute(stmt)
|
|
|
|
|
|
@classmethod
|
|
async def delete_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str, user_id: str):
|
|
"""
|
|
取消收藏数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 收藏对象
|
|
:return: 操作结果字典(包含成功状态和提示信息)
|
|
"""
|
|
try:
|
|
# 打印传入的参数
|
|
logger.info(f"开始处理取消收藏请求,传入参数:catalog={catalog}, user_name={user_name}, user_id={user_id}")
|
|
|
|
# 创建子查询:获取需要删除的资产编号
|
|
ast_onum_subquery = (
|
|
select(DataAstContentRela.ast_onum)
|
|
.where(
|
|
DataAstContentRela.rela_onum == catalog['rela_onum'],
|
|
DataAstContentRela.upd_prsn == user_name,
|
|
DataAstContentRela.rela_status == '1'
|
|
)
|
|
).subquery()
|
|
|
|
# 创建子查询:获取在收藏目录下的序号content_onum
|
|
content_onum_subquery = (
|
|
select(DataAstContent.content_onum)
|
|
.where(
|
|
DataAstContent.content_onum == '2' or DataAstContent.supr_content_onum == '2',
|
|
DataAstContent.content_stat == '1',
|
|
DataAstContent.upd_prsn == user_name
|
|
).subquery()
|
|
)
|
|
|
|
# 创建子查询
|
|
content_onum_subquery = (
|
|
select(DataAstContent.content_onum)
|
|
.where(
|
|
and_(
|
|
or_(
|
|
DataAstContent.content_onum == '2',
|
|
DataAstContent.supr_content_onum == '2'
|
|
),
|
|
DataAstContent.content_stat == '1',
|
|
DataAstContent.upd_prsn == user_name
|
|
)
|
|
)
|
|
.subquery()
|
|
)
|
|
# 打印子查询的SQL语句
|
|
logger.info(f"子查询SQL: {str(ast_onum_subquery),str(content_onum_subquery)}")
|
|
|
|
# 构建删除语句
|
|
stmt1 = (
|
|
delete(DataAstContentRela)
|
|
.where(
|
|
DataAstContentRela.upd_prsn == user_name,
|
|
DataAstContentRela.rela_status == 1,
|
|
DataAstContentRela.content_onum.in_(content_onum_subquery),
|
|
DataAstContentRela.ast_onum.in_(ast_onum_subquery)
|
|
)
|
|
)
|
|
|
|
# 打印删除语句1的SQL
|
|
logger.info(f"删除语句1 SQL: {str(stmt1)}")
|
|
|
|
stmt2 = (
|
|
delete(DataAstBookmarkRela)
|
|
.where(
|
|
DataAstBookmarkRela.data_ast_no.in_(ast_onum_subquery),
|
|
DataAstBookmarkRela.user_id == user_id
|
|
)
|
|
)
|
|
|
|
# 打印删除语句2的SQL
|
|
logger.info(f"删除语句2 SQL: {str(stmt2)}")
|
|
|
|
# 执行删除操作
|
|
logger.info("开始执行删除操作...")
|
|
|
|
await db.execute(stmt2)
|
|
await db.execute(stmt1)
|
|
await db.commit()
|
|
|
|
logger.info(f"成功删除收藏关系")
|
|
return {"is_success": True}
|
|
|
|
except IntegrityError as ie:
|
|
await db.rollback()
|
|
logger.error(f"删除收藏关系失败(完整性错误): {str(ie)}")
|
|
return {"is_success": False, "message": "数据库完整性错误"}
|
|
except Exception as e:
|
|
await db.rollback()
|
|
logger.error(f"删除收藏关系失败: {str(e)}")
|
|
return {"is_success": False, "message": "未知错误"}
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
async def delete_ast_book_mark_rela_by_content_onum(cls, db: AsyncSession, content_onum: int, user_id: int):
|
|
"""
|
|
根据目录ID和用户ID删除收藏关系
|
|
|
|
:param db: orm对象
|
|
:param content_onum: 目录ID
|
|
:param user_id: 用户ID
|
|
:return:
|
|
"""
|
|
# 创建别名对象
|
|
t1 = aliased(DataAstContentRela, name='t1')
|
|
t2 = aliased(DataAstInfo, name='t2')
|
|
t3 = aliased(DataAstBookmarkRela, name='t3')
|
|
|
|
|
|
cte = (
|
|
select(t3.rela_onum)
|
|
.select_from(DataAstContent)
|
|
.outerjoin(t1, DataAstContent.content_onum == t1.content_onum)
|
|
.outerjoin(t2, t1.ast_onum == t2.ast_no)
|
|
.outerjoin(t3, and_(
|
|
t2.data_ast_no == t3.data_ast_no,
|
|
t3.user_id == user_id
|
|
))
|
|
.where(DataAstContent.content_onum == content_onum)
|
|
).cte('cte')
|
|
|
|
stmt = (
|
|
delete(DataAstBookmarkRela)
|
|
.where(DataAstBookmarkRela.rela_onum.in_(select(cte.c.rela_onum)))
|
|
)
|
|
|
|
await db.execute(stmt)
|
|
await db.commit()
|
|
logger.info(" 删除收藏关系,操作成功")
|
|
|
|
|
|
|
|
@classmethod
|
|
async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str):
|
|
"""
|
|
添加收藏数据库操作
|
|
|
|
:param db: orm对象
|
|
:param catalog: 收藏对象
|
|
:return: 操作结果字典(包含成功状态和提示信息)
|
|
"""
|
|
try:
|
|
logger.info(f"开始处理收藏请求,用户ID: {catalog['user_id']}, 资产编号: {catalog['data_ast_no']}")
|
|
|
|
# 获取通过资产序号,获取资产编号
|
|
para_ast_no = (
|
|
select(
|
|
DataAstInfo.ast_no
|
|
)
|
|
.where(
|
|
DataAstInfo.data_ast_no == catalog['data_ast_no']
|
|
)
|
|
)
|
|
|
|
# 1. 检查是否已存在相同收藏
|
|
logger.info("检查是否已存在相同收藏...")
|
|
exists = await db.execute(
|
|
select(DataAstBookmarkRela)
|
|
.where(
|
|
DataAstBookmarkRela.user_id == catalog['user_id'],
|
|
DataAstBookmarkRela.data_ast_no == para_ast_no
|
|
)
|
|
)
|
|
if exists.scalar():
|
|
logger.warning(f"用户 {catalog['user_id']} 已收藏资产 {para_ast_no}")
|
|
return {"is_success": False, "message": "该资产已被收藏"}
|
|
|
|
# 2. 获取当前最大顺序号
|
|
logger.info("获取当前最大顺序号...")
|
|
max_order = await db.execute(
|
|
select(func.max(DataAstBookmarkRela.bookmark_orde))
|
|
.where(DataAstBookmarkRela.user_id == catalog['user_id'])
|
|
)
|
|
max_bookmark_orde = max_order.scalar() or 0
|
|
logger.info(f"当前最大顺序号: {max_bookmark_orde}")
|
|
|
|
# 3. 设置新顺序号
|
|
logger.info("设置新顺序号...")
|
|
catalog_dict = {
|
|
'user_id': catalog["user_id"],
|
|
'data_ast_no': para_ast_no,
|
|
'bookmark_orde': max_bookmark_orde + 1,
|
|
'bookmark_time': datetime.now()
|
|
}
|
|
|
|
# 4. 创建并保存新记录
|
|
logger.info("创建并保存新记录...")
|
|
|
|
db_catalog = DataAstBookmarkRela(**catalog_dict)
|
|
db.add(db_catalog)
|
|
# 添加收藏关系内容
|
|
logger.info("添加收藏关系内容...")
|
|
|
|
new_rela = DataAstContentRela(
|
|
content_onum=catalog["content_onum"],
|
|
ast_onum=para_ast_no,
|
|
rela_type=catalog["rela_type"],
|
|
rela_eff_begn_date=catalog["rela_eff_begn_date"],
|
|
rela_eff_end_date=catalog["rela_eff_end_date"],
|
|
upd_prsn=user_name,
|
|
rela_status="1"
|
|
)
|
|
db.add(new_rela)
|
|
|
|
# 提交事务
|
|
logger.info("提交事务...")
|
|
await db.flush()
|
|
logger.info(f"用户 {catalog['user_id']} 收藏资产 {para_ast_no} 成功")
|
|
return {"is_success": True, "message": "收藏成功"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"收藏操作失败: {str(e)}", exc_info=True)
|
|
await db.rollback()
|
|
return {"is_success": False, "message": "收藏操作失败"}
|
|
|
|
@classmethod
|
|
async def get_data_ast_indx_list(cls, db: AsyncSession, query_object: DataAstIndxRequest):
|
|
"""
|
|
根据查询参数获取数据资产指标列表
|
|
"""
|
|
query = (
|
|
select(
|
|
DataAstIndx.ast_no,
|
|
DataAstIndx.indx_no,
|
|
DataAstIndx.indx_name,
|
|
DataAstIndx.indx_val
|
|
)
|
|
.where(
|
|
DataAstIndx.ast_no == query_object.ast_no if query_object.ast_no else True,
|
|
DataAstIndx.indx_no == query_object.indx_no if query_object.indx_no else True,
|
|
DataAstIndx.indx_name.like(f"%{query_object.indx_name}%") if query_object.indx_name else True
|
|
)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
rows = result.mappings().all() # 直接获取字典列表
|
|
|
|
return rows
|
|
|
|
|
|
|
|
@classmethod
|
|
async def get_bookmark_folder_by_name(cls, db: AsyncSession, folder_name: str, user_name: str):
|
|
"""
|
|
根据名称和用户名获取收藏目录
|
|
|
|
:param db: 数据库会话
|
|
:param folder_name: 目录名称
|
|
:param user_name: 用户名
|
|
:return: 目录对象
|
|
"""
|
|
query = (
|
|
select(DataAstContent)
|
|
.where(
|
|
DataAstContent.content_name == folder_name,
|
|
DataAstContent.upd_prsn == user_name,
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_stat == "1"
|
|
)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
return result.scalars().first()
|
|
|
|
|
|
@classmethod
|
|
async def get_bookmark_folder_by_id(cls, db: AsyncSession, content_onum: int, user_name: str):
|
|
"""
|
|
根据ID和用户名获取收藏目录
|
|
|
|
:param db: 数据库会话
|
|
:param content_onum: 目录ID
|
|
:param user_name: 用户名
|
|
:return: 目录对象
|
|
"""
|
|
query = (
|
|
select(DataAstContent)
|
|
.where(
|
|
DataAstContent.content_onum == content_onum,
|
|
DataAstContent.upd_prsn == user_name,
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_stat == "1"
|
|
)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
return result.scalars().first()
|
|
|
|
|
|
@classmethod
|
|
async def check_folder_has_relations(cls, db: AsyncSession, content_onum: int):
|
|
"""
|
|
检查目录下是否有资产关系
|
|
|
|
:param db: 数据库会话
|
|
:param content_onum: 目录ID
|
|
:return: 是否存在关系
|
|
"""
|
|
query = (
|
|
select(func.count(DataAstContentRela.rela_onum))
|
|
.where(
|
|
DataAstContentRela.content_onum == content_onum,
|
|
DataAstContentRela.rela_status == "1"
|
|
)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
count = result.scalar()
|
|
return count > 0
|
|
|
|
@classmethod
|
|
async def get_bookmark_folders(cls, db: AsyncSession, user_name: str):
|
|
"""
|
|
获取用户的收藏目录列表
|
|
|
|
:param db: 数据库会话
|
|
:param user_name: 用户名
|
|
:return: 目录列表
|
|
"""
|
|
# 构建联合查询
|
|
combined_query = (
|
|
select(
|
|
DataAstContent.content_onum,
|
|
DataAstContent.content_name,
|
|
DataAstContent.content_intr,
|
|
DataAstContent.content_pic,
|
|
DataAstContent.upd_time,
|
|
DataAstContent.supr_content_onum,
|
|
DataAstContent.leaf_node_flag
|
|
)
|
|
# 根目录条件
|
|
.where(
|
|
DataAstContent.content_onum == 2,
|
|
DataAstContent.content_stat == "1"
|
|
)
|
|
# 合并子目录条件
|
|
.union_all(
|
|
select(
|
|
DataAstContent.content_onum,
|
|
DataAstContent.content_name,
|
|
DataAstContent.content_intr,
|
|
DataAstContent.content_pic,
|
|
DataAstContent.upd_time,
|
|
DataAstContent.supr_content_onum,
|
|
DataAstContent.leaf_node_flag
|
|
)
|
|
.where(
|
|
DataAstContent.upd_prsn == user_name,
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_stat == "1"
|
|
)
|
|
.order_by(DataAstContent.upd_time.desc())
|
|
)
|
|
)
|
|
|
|
result = await db.execute(combined_query)
|
|
return [dict(row) for row in result.mappings().all()]
|
|
|
|
|
|
@classmethod
|
|
async def get_bookmark_folder_by_name_exclude_id(cls, db: AsyncSession, folder_name: str, user_name: str, exclude_id: int):
|
|
"""
|
|
根据名称和用户名获取收藏目录,排除指定ID
|
|
|
|
:param db: 数据库会话
|
|
:param folder_name: 目录名称
|
|
:param user_name: 用户名
|
|
:param exclude_id: 排除的目录ID
|
|
:return: 目录对象
|
|
"""
|
|
query = (
|
|
select(DataAstContent)
|
|
.where(
|
|
DataAstContent.content_name == folder_name,
|
|
DataAstContent.upd_prsn == user_name,
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_stat == "1",
|
|
DataAstContent.content_onum != exclude_id
|
|
)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
return result.scalars().first()
|
|
|
|
|
|
|
|
@classmethod
|
|
async def is_bookmark_folder(cls, db: AsyncSession, content_onum: int, user_name: str = None):
|
|
"""
|
|
检查目录是否是收藏目录或其子目录
|
|
|
|
:param db: 数据库会话
|
|
:param content_onum: 目录ID
|
|
:param user_name: 用户名(如果需要验证所有权)
|
|
:return: 是否是收藏目录
|
|
"""
|
|
if content_onum == 2: # "我的收藏"根目录
|
|
return True
|
|
|
|
query = select(DataAstContent).where(
|
|
DataAstContent.content_onum == content_onum,
|
|
DataAstContent.supr_content_onum == 2,
|
|
DataAstContent.content_stat == "1"
|
|
)
|
|
|
|
if user_name:
|
|
query = query.where(DataAstContent.upd_prsn == user_name)
|
|
|
|
result = await db.execute(query)
|
|
return result.scalars().first() is not None
|
|
|
|
@classmethod
|
|
async def move_bookmark_asset_dao(cls, db: AsyncSession, moverel_catalog_data: dict):
|
|
"""
|
|
在收藏目录间移动资产的数据库操作
|
|
"""
|
|
stmt = (
|
|
update(DataAstContentRela)
|
|
.where(
|
|
DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'],
|
|
DataAstContentRela.content_onum == moverel_catalog_data['content_onum'],
|
|
DataAstContentRela.rela_status == "1"
|
|
)
|
|
.values(
|
|
content_onum=moverel_catalog_data['content_onum_after'],
|
|
upd_prsn=moverel_catalog_data['upd_prsn'],
|
|
rela_eff_end_date=datetime.now()
|
|
)
|
|
)
|
|
|
|
await db.execute(stmt)
|
|
await cls.update_leaf_node_flag(db)
|
|
|
|
|
|
@classmethod
|
|
async def delete_bookmark_folder_dao(cls, db: AsyncSession, content_onum: int, user_name: str,user_id:str):
|
|
"""
|
|
删除收藏目录及其资产关系的专用数据库操作
|
|
|
|
:param db: orm对象
|
|
:param content_onum: 收藏目录ID
|
|
:param user_name: 用户名(用于权限验证)
|
|
:return: 操作结果字典
|
|
"""
|
|
try:
|
|
logger.info(f"开始删除用户 {user_name} 的收藏目录 {content_onum}")
|
|
|
|
# 1. 验证目录是否存在且属于当前用户和收藏体系
|
|
folder_query = select(DataAstContent).where(
|
|
DataAstContent.content_onum == content_onum,
|
|
DataAstContent.upd_prsn == user_name, # 确保只能删除自己的收藏目录
|
|
DataAstContent.supr_content_onum == 2, # 确保是收藏子目录
|
|
DataAstContent.content_stat == '1'
|
|
)
|
|
folder_result = await db.execute(folder_query)
|
|
folder = folder_result.scalars().first()
|
|
|
|
if not folder:
|
|
logger.warning(f"未找到用户 {user_name} 的收藏目录 {content_onum} 或无权限删除")
|
|
return {"success": False, "message": "收藏目录不存在或无权限删除"}
|
|
|
|
# 创建子查询,获取符合条件的 ast_onum
|
|
ast_onum_subs = (
|
|
select(DataAstContentRela.ast_onum)
|
|
.where(
|
|
DataAstContentRela.content_onum == content_onum,
|
|
DataAstContentRela.rela_status == '1',
|
|
DataAstContentRela.upd_prsn == user_name
|
|
)
|
|
.subquery()
|
|
)
|
|
|
|
# 删除收藏关系表数据
|
|
delete_stmt = (
|
|
delete(DataAstBookmarkRela)
|
|
.where(
|
|
DataAstBookmarkRela.data_ast_no.in_(ast_onum_subs),
|
|
DataAstBookmarkRela.user_id == user_id
|
|
)
|
|
)
|
|
# 执行删除操作
|
|
await db.execute(delete_stmt)
|
|
|
|
# 2. 更新关联的资产关系状态
|
|
rela_update = await db.execute(
|
|
update(DataAstContentRela)
|
|
.where(
|
|
DataAstContentRela.content_onum == content_onum,
|
|
DataAstContentRela.upd_prsn == user_name,
|
|
DataAstContentRela.rela_status == '1'
|
|
)
|
|
.values(
|
|
rela_status='0',
|
|
rela_eff_end_date=datetime.now()
|
|
)
|
|
)
|
|
rela_count = rela_update.rowcount
|
|
logger.info(f"已更新 {rela_count} 个资产关系状态")
|
|
|
|
# 3. 更新目录状态为无效
|
|
folder_update = await db.execute(
|
|
update(DataAstContent)
|
|
.where(
|
|
DataAstContent.content_onum == content_onum,
|
|
DataAstContent.upd_prsn == user_name,
|
|
DataAstContent.content_stat == '1'
|
|
)
|
|
.values(
|
|
content_stat='0',
|
|
upd_time=datetime.now()
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
# 4. 更新叶子节点标志
|
|
await cls.update_leaf_node_flag(db)
|
|
await db.commit()
|
|
|
|
logger.info(f"成功删除用户 {user_name} 的收藏目录 {content_onum}")
|
|
return {"success": True, "message": "收藏目录删除成功"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"删除收藏目录时发生错误: {str(e)}", exc_info=True)
|
|
return {"success": False, "message": f"删除操作出错: {str(e)}"}
|