You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1137 lines
39 KiB

from datetime import datetime
from sqlalchemy import case
from sqlalchemy.orm import aliased
from sqlalchemy import delete, func, not_, select, update, or_, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from module_admin.entity.do.data_ast_content_do import DataAstContent,DataAstContentRela,DataAstInfo,DataAstBookmarkRela,DataAstIndx
from module_admin.entity.do.user_do import SysUser
from module_admin.entity.vo.data_ast_content_vo import DataCatalogPageQueryModel, DeleteDataCatalogModel,DataCatalogChild,DataAstBookmarkRelaRequest,DataAstIndxRequest,DataAstIndxResponse
from utils.page_util import PageUtil
from utils.log_util import logger
class DataCatalogDAO:
"""
数据目录管理模块数据库操作层
"""
@classmethod
async def get_catalog_by_id(cls, db: AsyncSession, content_onum: int):
"""
根据目录ID获取目录详细信息
:param db: orm对象
:param content_onum: 目录ID
:return: 目录信息对象
"""
catalog_info = (
(await db.execute(select(DataAstContent).where(DataAstContent.content_onum == content_onum , DataAstContent.content_stat == 1)))
.scalars()
.first()
)
return catalog_info
@classmethod
async def get_catalog_detail_by_info(cls, db: AsyncSession, catalog: DataCatalogPageQueryModel):
"""
根据目录参数获取目录信息
:param db: orm对象
:param catalog: 目录参数对象
:return: 目录信息对象
"""
catalog_info = (
(
await db.execute(
select(DataAstContent).where(
DataAstContent.content_name == catalog.content_name if catalog.content_name else True,
DataAstContent.content_stat == catalog.content_stat if catalog.content_stat else True,
DataAstContent.content_pic == catalog.content_pic if catalog.content_pic else True,
DataAstContent.content_stat == 1,
)
)
)
.scalars()
.first()
)
return catalog_info
# @classmethod
# async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, is_page: bool = False):
# """
# 根据查询参数获取数据资产目录列表
# :param db: 异步会话对象
# :param query_object: 分页查询参数对象
# :param is_page: 是否分页
# :return: 数据资产目录分页列表
# """
# 创建别名对象
# t1 = aliased(DataAstContentRela, name='t1')
# t2 = aliased(DataAstInfo, name='t2')
# t3 = aliased(DataAstBookmarkRela, name='t3')
# # 修改子查询部分
# subquery_t1 = (
# select(DataAstContentRela)
# .where(DataAstContentRela.upd_prsn == query_object.upd_prsn, DataAstContentRela.content_onum == '2' and DataAstContentRela.rela_status == '1')
# .union_all(
# select(DataAstContentRela)
# .where(DataAstContentRela.content_onum != '2' and DataAstContentRela.rela_status == '1')
# )
# ).alias('subquery_t1') # 为子查询分配唯一别名
# query = (
# select(
# DataAstContent.content_onum,
# DataAstContent.content_name,
# DataAstContent.content_stat,
# DataAstContent.content_intr,
# DataAstContent.content_pic,
# DataAstContent.supr_content_onum,
# DataAstContent.leaf_node_flag,
# DataAstContent.upd_prsn,
# DataAstContent.upd_time,
# subquery_t1.c.rela_onum, # 明确指定子查询的字段
# subquery_t1.c.ast_onum,
# subquery_t1.c.rela_type,
# subquery_t1.c.rela_eff_begn_date,
# subquery_t1.c.rela_eff_end_date,
# subquery_t1.c.upd_prsn,
# t2.data_ast_no,
# t2.data_ast_eng_name,
# t2.data_ast_cn_name,
# t2.data_ast_type,
# t2.data_ast_stat,
# t2.data_ast_desc,
# t2.data_ast_clas,
# t2.data_ast_cont,
# t2.data_ast_faq,
# t2.data_ast_estb_time,
# t2.data_ast_upd_time,
# t2.data_ast_src,
# t2.ast_no,
# t3.bookmark_orde,
# case(
# (t3.rela_onum.isnot(None), 1),
# else_=0
# ).label('bookmark_flag')
# )
# .distinct()
# .select_from(DataAstContent)
# .outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum) # 明确使用子查询别名
# .outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no)
# .outerjoin(t3, and_(
# subquery_t1.c.ast_onum == t3.data_ast_no,
# t3.user_id == user_id
# ))
# .where(DataAstContent.content_stat == 1)
# .order_by(DataAstContent.content_onum)
# )
# # 使用分页工具进行查询
# data_ast_list = await PageUtil.paginate(
# db,
# query,
# page_num=query_object.page_num,
# page_size=query_object.page_size,
# is_page=is_page
# )
# return data_ast_list
@classmethod
async def get_catalog_list(cls, db: AsyncSession, query_object: DataCatalogPageQueryModel, user_id: int, user_name: str, is_page: bool = False):
# 创建别名对象
t1 = aliased(DataAstContentRela, name='t1')
t2 = aliased(DataAstInfo, name='t2')
t3 = aliased(DataAstBookmarkRela, name='t3')
# 构建子查询1(对应subquery_t1)
subquery_t1 = (
select(t1)
.where(
t1.upd_prsn == user_name,
t1.content_onum == '2',
t1.rela_status == '1'
)
.union_all(
select(t1)
.where(
t1.content_onum != '2',
t1.rela_status == '1'
)
)
).alias('subquery_t1')
# 新增子查询2(对应subquery_t2)
subquery_t2 = (
select(t1.rela_onum)
.where(
t1.rela_status == 1,
t1.upd_prsn == user_name, #query_object.upd_prsn
t1.content_onum.in_(
select(DataAstContent.content_onum)
.where(
or_(
DataAstContent.supr_content_onum == 2,
DataAstContent.content_onum == 2
),
DataAstContent.content_stat == 1,
DataAstContent.upd_prsn == user_name
)
)
)
).alias('subquery_t2')
# 主查询构建
query = (
select(
# 原有字段保持不变
DataAstContent.content_onum,
DataAstContent.content_name,
DataAstContent.content_stat,
DataAstContent.content_intr,
DataAstContent.content_pic,
DataAstContent.supr_content_onum,
DataAstContent.leaf_node_flag,
DataAstContent.upd_prsn,
DataAstContent.upd_time,
subquery_t1.c.rela_onum,
subquery_t1.c.ast_onum,
subquery_t1.c.rela_type,
subquery_t1.c.rela_eff_begn_date,
subquery_t1.c.rela_eff_end_date,
subquery_t1.c.upd_prsn,
# 修正:直接使用t2的属性,而非t2.c
t2.data_ast_no,
t2.data_ast_eng_name,
t2.data_ast_cn_name,
t2.data_ast_type,
t2.data_ast_stat,
t2.data_ast_desc,
t2.data_ast_clas,
t2.data_ast_cont,
t2.data_ast_faq,
t2.data_ast_estb_time,
t2.data_ast_upd_time,
t2.data_ast_src,
t2.ast_no,
t3.bookmark_orde,
case(
(t3.rela_onum != None, 1),
else_=0
).label('bookmark_flag'),
case(
(subquery_t2.c.rela_onum != None, 1),
else_=0
).label('sczc_flag')
)
.distinct()
.select_from(DataAstContent)
.outerjoin(subquery_t1, DataAstContent.content_onum == subquery_t1.c.content_onum)
.outerjoin(t2, subquery_t1.c.ast_onum == t2.ast_no)
.outerjoin(t3, and_(
subquery_t1.c.ast_onum == t3.data_ast_no,
t3.user_id == user_id
))
.outerjoin(subquery_t2, subquery_t1.c.rela_onum == subquery_t2.c.rela_onum)
.where(
DataAstContent.content_stat == 1
)
.order_by(DataAstContent.content_onum)
)
# 分页处理保持不变
data_ast_list = await PageUtil.paginate(
db,
query,
page_num=query_object.page_num,
page_size=query_object.page_size,
is_page=is_page
)
return data_ast_list
@classmethod
async def add_catalog_dao(cls, db: AsyncSession, catalog1: dict, catalog2: dict):
"""
新增目录数据库操作
:param db: orm对象
:param catalog: 目录对象
:return:
"""
db_catalog = DataAstContent(**catalog1)
db.add(db_catalog)
await db.flush()
# 处理子关系(统一转换为 ORM 模型)
for child in catalog2.get('children', []):
# 如果是 Pydantic 模型实例,先转换为字典
if isinstance(child, DataCatalogChild):
child_dict = child.model_dump()
elif isinstance(child, dict):
child_dict = child
else:
raise TypeError("不支持的子关系数据类型")
# 创建 ORM 模型实例
processed_child = dict(child_dict)
processed_child['content_onum'] = db_catalog.content_onum
db_child = DataAstContentRela(**processed_child)
db.add(db_child)
await db.flush()
return db_catalog
@classmethod
async def edit_catalog_leaf_dao(cls, db: AsyncSession, catalog: dict):
"""
编辑叶子节点目录数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
content_onum = catalog['content_onum']
stmt = (
update(DataAstContent)
.where(DataAstContent.content_onum == content_onum)
.values(
leaf_node_flag=catalog['leaf_node_flag']
)
)
await db.execute(stmt)
@classmethod
async def edit_catalog_child_dao(cls, db: AsyncSession, catalog: dict):
"""
编辑目录数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
content_onum = catalog['content_onum']
stmt = (
update(DataAstContent)
.where(DataAstContent.content_onum == content_onum)
.values(
content_name=catalog['content_name'],
content_stat=catalog['content_stat'],
content_intr=catalog['content_intr'],
content_pic=catalog['content_pic'],
supr_content_onum=catalog['supr_content_onum'],
leaf_node_flag=catalog['leaf_node_flag'],
upd_prsn=catalog['upd_prsn'],
upd_time=datetime.now()
) )
await db.execute(stmt)
# 处理子关系
for child in catalog.get('children', []):
rela_onum = child.get('rela_onum')
if rela_onum:
st = (
update(DataAstContentRela)
.where(DataAstContentRela.rela_onum == rela_onum)
.values(
content_onum=child.get('content_onum'),
ast_onum=child.get('ast_onum'),
rela_type=child.get('rela_type'),
rela_eff_begn_date=child.get('rela_eff_begn_date'),
rela_eff_end_date=child.get('rela_eff_end_date'),
upd_prsn=child.get('upd_prsn'))
)
await db.execute(st)
await cls.update_leaf_node_flag(db)
else:
child['content_onum'] = content_onum
db_child = DataAstContentRela(**child)
db.add(db_child)
await db.flush()
await cls.update_leaf_node_flag(db)
@classmethod
async def edit_catalog_dao(cls, db: AsyncSession, catalog1: dict):
"""
编辑目录数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
content_onum = catalog1['content_onum']
stmt = (
update(DataAstContent)
.where(DataAstContent.content_onum == content_onum)
.values(
content_name=catalog1['content_name'],
content_stat=catalog1['content_stat'],
content_intr=catalog1['content_intr'],
content_pic=catalog1['content_pic'],
supr_content_onum=catalog1['supr_content_onum'],
leaf_node_flag=catalog1['leaf_node_flag'],
upd_prsn=catalog1['upd_prsn'],
upd_time=datetime.now()
)
)
await db.execute(stmt)
await cls.update_leaf_node_flag(db)
@classmethod
async def delete_catalog_dao(cls, db: AsyncSession, catalog: DeleteDataCatalogModel):
"""
删除目录数据库操作
:param db: orm对象
:param catalog: 目录对象
:content_stat=0 作废
:return:
"""
content_onums = catalog.content_onums.split(',')
logger.info(f"Updating DataAstContent with supr_content_onum in {content_onums}")
await db.execute(
update(DataAstContentRela)
.where(DataAstContentRela.content_onum.in_(content_onums))
.values(
rela_status=0,
rela_eff_end_date=datetime.now()
)
)
await db.execute(
update(DataAstContent)
.where(DataAstContent.content_onum.in_(content_onums))
.values(
content_stat=0,
upd_time=datetime.now()
)
)
logger.info("更新叶子节点标志成功")
await cls.update_leaf_node_flag(db)
@classmethod
async def get_data_asset_catalog_tree(cls, db: AsyncSession):
"""
获取数据资产树
:param db: 异步会话对象
:return: 去重后的数据资产树数据
"""
# 创建别名对象
a = aliased(DataAstInfo, name='a')
b = aliased(DataAstContentRela, name='b')
# 构建查询
query = (
select(
a.data_ast_src,
a.data_ast_eng_name,
a.data_ast_cn_name,
a.ast_no,
case(
(b.ast_onum.isnot(None), 1),
else_=0
).label('rel_status')
)
.distinct()
.select_from(a)
.outerjoin(b, and_(
a.ast_no == b.ast_onum,
b.rela_status == 1
))
.where(a.data_ast_stat == 1)
)
result = await db.execute(query)
rows = result.fetchall()
return rows
@classmethod
async def moved_catalog_instr_dao(cls, db: AsyncSession, moved_catalog_data: dict):
"""
编辑目录数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
# content_onum = moved_catalog_data['content_onum']
stmt = (
update(DataAstContent)
.where(DataAstContent.content_onum == moved_catalog_data['content_onum'] , DataAstContent.supr_content_onum == moved_catalog_data['supr_content_onum'])
.values(
supr_content_onum=moved_catalog_data['supr_content_onum_after'],
upd_time=datetime.now()
) )
await db.execute(stmt)
await cls.update_leaf_node_flag(db)
@classmethod
async def merge_catalog_instr_dao(cls, db: AsyncSession, merge_catalog_data: dict):
"""
编辑目录数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
stmt1 = (
update(DataAstContentRela)
.where( DataAstContentRela.content_onum == merge_catalog_data['content_onum'] and DataAstContentRela.rela_status == 1 )
.values(
content_onum=merge_catalog_data['content_onum_after'],
rela_eff_begn_date=datetime.now()
)
)
await db.execute(stmt1)
stmt2 = (
update(DataAstContent)
.where(DataAstContent.content_onum == merge_catalog_data['content_onum'] , DataAstContent.supr_content_onum == merge_catalog_data['supr_content_onum'])
.values( content_stat = '0' )
)
await db.execute(stmt2)
await cls.update_leaf_node_flag(db)
@classmethod
async def removerel_data_ast_catalog_dao(cls, db: AsyncSession, removerel_catalog_data: dict):
"""
编辑资产关系数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
stmt = (
update(DataAstContentRela)
.where(DataAstContentRela.rela_onum == removerel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == removerel_catalog_data['content_onum'])
.values(
rela_status=removerel_catalog_data['rela_status']
) )
await db.execute(stmt)
await cls.update_leaf_node_flag(db)
@classmethod
async def moverel_data_ast_catalog_dao(cls, db: AsyncSession, moverel_catalog_data: dict):
"""
编辑资产关系数据库操作
:param db: orm对象
:param catalog: 需要更新的目录字典
:return:
"""
stmt = (
update(DataAstContentRela)
.where(DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'] , DataAstContentRela.content_onum == moverel_catalog_data['content_onum'])
.values(
content_onum=moverel_catalog_data['content_onum_after'],
rela_eff_end_date=datetime.now()
) )
await db.execute(stmt)
await cls.update_leaf_node_flag(db)
@classmethod
async def update_leaf_node_flag(cls, db: AsyncSession):
"""
更新leaf_node_flag字段
"""
# 创建别名对象
t2 = aliased(DataAstContent, name='t2') # 正确使用aliased创建别名
subquery = (
select(DataAstContent.content_onum)
.where(
DataAstContent.content_stat == '1',
DataAstContent.leaf_node_flag == 0,
not_(
select(1)
.select_from(t2) # 使用别名后的表
.where(
t2.supr_content_onum == DataAstContent.content_onum,
t2.content_stat == '1'
)
.exists() # 添加exists()方法
)
)
).alias('temp')
stmt = (
update(DataAstContent)
.where(DataAstContent.content_onum.in_(subquery))
.values(leaf_node_flag=1, upd_time=datetime.now())
)
await db.execute(stmt)
@classmethod
async def delete_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str, user_id: str):
"""
取消收藏数据库操作
:param db: orm对象
:param catalog: 收藏对象
:return: 操作结果字典(包含成功状态和提示信息)
"""
try:
# 打印传入的参数
logger.info(f"开始处理取消收藏请求,传入参数:catalog={catalog}, user_name={user_name}, user_id={user_id}")
# 创建子查询:获取需要删除的资产编号
ast_onum_subquery = (
select(DataAstContentRela.ast_onum)
.where(
DataAstContentRela.rela_onum == catalog['rela_onum'],
DataAstContentRela.upd_prsn == user_name,
DataAstContentRela.rela_status == '1'
)
).subquery()
# 创建子查询:获取在收藏目录下的序号content_onum
content_onum_subquery = (
select(DataAstContent.content_onum)
.where(
DataAstContent.content_onum == '2' or DataAstContent.supr_content_onum == '2',
DataAstContent.content_stat == '1',
DataAstContent.upd_prsn == user_name
).subquery()
)
# 创建子查询
content_onum_subquery = (
select(DataAstContent.content_onum)
.where(
and_(
or_(
DataAstContent.content_onum == '2',
DataAstContent.supr_content_onum == '2'
),
DataAstContent.content_stat == '1',
DataAstContent.upd_prsn == user_name
)
)
.subquery()
)
# 打印子查询的SQL语句
logger.info(f"子查询SQL: {str(ast_onum_subquery),str(content_onum_subquery)}")
# 构建删除语句
stmt1 = (
delete(DataAstContentRela)
.where(
DataAstContentRela.upd_prsn == user_name,
DataAstContentRela.rela_status == 1,
DataAstContentRela.content_onum.in_(content_onum_subquery),
DataAstContentRela.ast_onum.in_(ast_onum_subquery)
)
)
# 打印删除语句1的SQL
logger.info(f"删除语句1 SQL: {str(stmt1)}")
stmt2 = (
delete(DataAstBookmarkRela)
.where(
DataAstBookmarkRela.data_ast_no.in_(ast_onum_subquery),
DataAstBookmarkRela.user_id == user_id
)
)
# 打印删除语句2的SQL
logger.info(f"删除语句2 SQL: {str(stmt2)}")
# 执行删除操作
logger.info("开始执行删除操作...")
await db.execute(stmt2)
await db.execute(stmt1)
await db.commit()
logger.info(f"成功删除收藏关系")
return {"is_success": True}
except IntegrityError as ie:
await db.rollback()
logger.error(f"删除收藏关系失败(完整性错误): {str(ie)}")
return {"is_success": False, "message": "数据库完整性错误"}
except Exception as e:
await db.rollback()
logger.error(f"删除收藏关系失败: {str(e)}")
return {"is_success": False, "message": "未知错误"}
@classmethod
async def delete_ast_book_mark_rela_by_content_onum(cls, db: AsyncSession, content_onum: int, user_id: int):
"""
根据目录ID和用户ID删除收藏关系
:param db: orm对象
:param content_onum: 目录ID
:param user_id: 用户ID
:return:
"""
# 创建别名对象
t1 = aliased(DataAstContentRela, name='t1')
t2 = aliased(DataAstInfo, name='t2')
t3 = aliased(DataAstBookmarkRela, name='t3')
cte = (
select(t3.rela_onum)
.select_from(DataAstContent)
.outerjoin(t1, DataAstContent.content_onum == t1.content_onum)
.outerjoin(t2, t1.ast_onum == t2.ast_no)
.outerjoin(t3, and_(
t2.data_ast_no == t3.data_ast_no,
t3.user_id == user_id
))
.where(DataAstContent.content_onum == content_onum)
).cte('cte')
stmt = (
delete(DataAstBookmarkRela)
.where(DataAstBookmarkRela.rela_onum.in_(select(cte.c.rela_onum)))
)
await db.execute(stmt)
await db.commit()
logger.info(" 删除收藏关系,操作成功")
@classmethod
async def add_ast_book_mark_rela_dao(cls, db: AsyncSession, catalog: DataAstBookmarkRelaRequest, user_name: str):
"""
添加收藏数据库操作
:param db: orm对象
:param catalog: 收藏对象
:return: 操作结果字典(包含成功状态和提示信息)
"""
try:
logger.info(f"开始处理收藏请求,用户ID: {catalog['user_id']}, 资产编号: {catalog['data_ast_no']}")
# 获取通过资产序号,获取资产编号
para_ast_no = (
select(
DataAstInfo.ast_no
)
.where(
DataAstInfo.data_ast_no == catalog['data_ast_no']
)
)
# 1. 检查是否已存在相同收藏
logger.info("检查是否已存在相同收藏...")
exists = await db.execute(
select(DataAstBookmarkRela)
.where(
DataAstBookmarkRela.user_id == catalog['user_id'],
DataAstBookmarkRela.data_ast_no == para_ast_no
)
)
if exists.scalar():
logger.warning(f"用户 {catalog['user_id']} 已收藏资产 {para_ast_no}")
return {"is_success": False, "message": "该资产已被收藏"}
# 2. 获取当前最大顺序号
logger.info("获取当前最大顺序号...")
max_order = await db.execute(
select(func.max(DataAstBookmarkRela.bookmark_orde))
.where(DataAstBookmarkRela.user_id == catalog['user_id'])
)
max_bookmark_orde = max_order.scalar() or 0
logger.info(f"当前最大顺序号: {max_bookmark_orde}")
# 3. 设置新顺序号
logger.info("设置新顺序号...")
catalog_dict = {
'user_id': catalog["user_id"],
'data_ast_no': para_ast_no,
'bookmark_orde': max_bookmark_orde + 1,
'bookmark_time': datetime.now()
}
# 4. 创建并保存新记录
logger.info("创建并保存新记录...")
db_catalog = DataAstBookmarkRela(**catalog_dict)
db.add(db_catalog)
# 添加收藏关系内容
logger.info("添加收藏关系内容...")
new_rela = DataAstContentRela(
content_onum=catalog["content_onum"],
ast_onum=para_ast_no,
rela_type=catalog["rela_type"],
rela_eff_begn_date=catalog["rela_eff_begn_date"],
rela_eff_end_date=catalog["rela_eff_end_date"],
upd_prsn=user_name,
rela_status="1"
)
db.add(new_rela)
# 提交事务
logger.info("提交事务...")
await db.flush()
logger.info(f"用户 {catalog['user_id']} 收藏资产 {para_ast_no} 成功")
return {"is_success": True, "message": "收藏成功"}
except Exception as e:
logger.error(f"收藏操作失败: {str(e)}", exc_info=True)
await db.rollback()
return {"is_success": False, "message": "收藏操作失败"}
@classmethod
async def get_data_ast_indx_list(cls, db: AsyncSession, query_object: DataAstIndxRequest):
"""
根据查询参数获取数据资产指标列表
"""
query = (
select(
DataAstIndx.ast_no,
DataAstIndx.indx_no,
DataAstIndx.indx_name,
DataAstIndx.indx_val
)
.where(
DataAstIndx.ast_no == query_object.ast_no if query_object.ast_no else True,
DataAstIndx.indx_no == query_object.indx_no if query_object.indx_no else True,
DataAstIndx.indx_name.like(f"%{query_object.indx_name}%") if query_object.indx_name else True
)
)
result = await db.execute(query)
rows = result.mappings().all() # 直接获取字典列表
return rows
@classmethod
async def get_bookmark_folder_by_name(cls, db: AsyncSession, folder_name: str, user_name: str):
"""
根据名称和用户名获取收藏目录
:param db: 数据库会话
:param folder_name: 目录名称
:param user_name: 用户名
:return: 目录对象
"""
query = (
select(DataAstContent)
.where(
DataAstContent.content_name == folder_name,
DataAstContent.upd_prsn == user_name,
DataAstContent.supr_content_onum == 2,
DataAstContent.content_stat == "1"
)
)
result = await db.execute(query)
return result.scalars().first()
@classmethod
async def get_bookmark_folder_by_id(cls, db: AsyncSession, content_onum: int, user_name: str):
"""
根据ID和用户名获取收藏目录
:param db: 数据库会话
:param content_onum: 目录ID
:param user_name: 用户名
:return: 目录对象
"""
query = (
select(DataAstContent)
.where(
DataAstContent.content_onum == content_onum,
DataAstContent.upd_prsn == user_name,
DataAstContent.supr_content_onum == 2,
DataAstContent.content_stat == "1"
)
)
result = await db.execute(query)
return result.scalars().first()
@classmethod
async def check_folder_has_relations(cls, db: AsyncSession, content_onum: int):
"""
检查目录下是否有资产关系
:param db: 数据库会话
:param content_onum: 目录ID
:return: 是否存在关系
"""
query = (
select(func.count(DataAstContentRela.rela_onum))
.where(
DataAstContentRela.content_onum == content_onum,
DataAstContentRela.rela_status == "1"
)
)
result = await db.execute(query)
count = result.scalar()
return count > 0
@classmethod
async def get_bookmark_folders(cls, db: AsyncSession, user_name: str):
"""
获取用户的收藏目录列表
:param db: 数据库会话
:param user_name: 用户名
:return: 目录列表
"""
# 构建联合查询
combined_query = (
select(
DataAstContent.content_onum,
DataAstContent.content_name,
DataAstContent.content_intr,
DataAstContent.content_pic,
DataAstContent.upd_time,
DataAstContent.supr_content_onum,
DataAstContent.leaf_node_flag
)
# 根目录条件
.where(
DataAstContent.content_onum == 2,
DataAstContent.content_stat == "1"
)
# 合并子目录条件
.union_all(
select(
DataAstContent.content_onum,
DataAstContent.content_name,
DataAstContent.content_intr,
DataAstContent.content_pic,
DataAstContent.upd_time,
DataAstContent.supr_content_onum,
DataAstContent.leaf_node_flag
)
.where(
DataAstContent.upd_prsn == user_name,
DataAstContent.supr_content_onum == 2,
DataAstContent.content_stat == "1"
)
.order_by(DataAstContent.upd_time.desc())
)
)
result = await db.execute(combined_query)
return [dict(row) for row in result.mappings().all()]
@classmethod
async def get_bookmark_folder_by_name_exclude_id(cls, db: AsyncSession, folder_name: str, user_name: str, exclude_id: int):
"""
根据名称和用户名获取收藏目录,排除指定ID
:param db: 数据库会话
:param folder_name: 目录名称
:param user_name: 用户名
:param exclude_id: 排除的目录ID
:return: 目录对象
"""
query = (
select(DataAstContent)
.where(
DataAstContent.content_name == folder_name,
DataAstContent.upd_prsn == user_name,
DataAstContent.supr_content_onum == 2,
DataAstContent.content_stat == "1",
DataAstContent.content_onum != exclude_id
)
)
result = await db.execute(query)
return result.scalars().first()
@classmethod
async def is_bookmark_folder(cls, db: AsyncSession, content_onum: int, user_name: str = None):
"""
检查目录是否是收藏目录或其子目录
:param db: 数据库会话
:param content_onum: 目录ID
:param user_name: 用户名(如果需要验证所有权)
:return: 是否是收藏目录
"""
if content_onum == 2: # "我的收藏"根目录
return True
query = select(DataAstContent).where(
DataAstContent.content_onum == content_onum,
DataAstContent.supr_content_onum == 2,
DataAstContent.content_stat == "1"
)
if user_name:
query = query.where(DataAstContent.upd_prsn == user_name)
result = await db.execute(query)
return result.scalars().first() is not None
@classmethod
async def move_bookmark_asset_dao(cls, db: AsyncSession, moverel_catalog_data: dict):
"""
在收藏目录间移动资产的数据库操作
"""
stmt = (
update(DataAstContentRela)
.where(
DataAstContentRela.rela_onum == moverel_catalog_data['rela_onum'],
DataAstContentRela.content_onum == moverel_catalog_data['content_onum'],
DataAstContentRela.rela_status == "1"
)
.values(
content_onum=moverel_catalog_data['content_onum_after'],
upd_prsn=moverel_catalog_data['upd_prsn'],
rela_eff_end_date=datetime.now()
)
)
await db.execute(stmt)
await cls.update_leaf_node_flag(db)
@classmethod
async def delete_bookmark_folder_dao(cls, db: AsyncSession, content_onum: int, user_name: str,user_id:str):
"""
删除收藏目录及其资产关系的专用数据库操作
:param db: orm对象
:param content_onum: 收藏目录ID
:param user_name: 用户名(用于权限验证)
:return: 操作结果字典
"""
try:
logger.info(f"开始删除用户 {user_name} 的收藏目录 {content_onum}")
# 1. 验证目录是否存在且属于当前用户和收藏体系
folder_query = select(DataAstContent).where(
DataAstContent.content_onum == content_onum,
DataAstContent.upd_prsn == user_name, # 确保只能删除自己的收藏目录
DataAstContent.supr_content_onum == 2, # 确保是收藏子目录
DataAstContent.content_stat == '1'
)
folder_result = await db.execute(folder_query)
folder = folder_result.scalars().first()
if not folder:
logger.warning(f"未找到用户 {user_name} 的收藏目录 {content_onum} 或无权限删除")
return {"success": False, "message": "收藏目录不存在或无权限删除"}
# 创建子查询,获取符合条件的 ast_onum
ast_onum_subs = (
select(DataAstContentRela.ast_onum)
.where(
DataAstContentRela.content_onum == content_onum,
DataAstContentRela.rela_status == '1',
DataAstContentRela.upd_prsn == user_name
)
.subquery()
)
# 删除收藏关系表数据
delete_stmt = (
delete(DataAstBookmarkRela)
.where(
DataAstBookmarkRela.data_ast_no.in_(ast_onum_subs),
DataAstBookmarkRela.user_id == user_id
)
)
# 执行删除操作
await db.execute(delete_stmt)
# 2. 更新关联的资产关系状态
rela_update = await db.execute(
update(DataAstContentRela)
.where(
DataAstContentRela.content_onum == content_onum,
DataAstContentRela.upd_prsn == user_name,
DataAstContentRela.rela_status == '1'
)
.values(
rela_status='0',
rela_eff_end_date=datetime.now()
)
)
rela_count = rela_update.rowcount
logger.info(f"已更新 {rela_count} 个资产关系状态")
# 3. 更新目录状态为无效
folder_update = await db.execute(
update(DataAstContent)
.where(
DataAstContent.content_onum == content_onum,
DataAstContent.upd_prsn == user_name,
DataAstContent.content_stat == '1'
)
.values(
content_stat='0',
upd_time=datetime.now()
)
)
# 4. 更新叶子节点标志
await cls.update_leaf_node_flag(db)
await db.commit()
logger.info(f"成功删除用户 {user_name} 的收藏目录 {content_onum}")
return {"success": True, "message": "收藏目录删除成功"}
except Exception as e:
logger.error(f"删除收藏目录时发生错误: {str(e)}", exc_info=True)
return {"success": False, "message": f"删除操作出错: {str(e)}"}