You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
7.4 KiB

1 week ago
from sqlalchemy import delete, func, select, update, desc
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.metasecurity_do import MetaSecurityCol, MetaSecurityRow
from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel
from utils.page_util import PageUtil
1 week ago
from module_admin.entity.do.meta_do import MetadataExtractInfo, MetadataSuppInfo
class MetaSecurityDao:
"""
数据源安全管理模块数据库操作层
"""
@classmethod
async def get_meta_security_col_list(cls, db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False):
"""
获取 MetaSecurityCol 的列表信息支持模糊查询和分页
:param db: ORM对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 列表信息
"""
# 构建查询条件
filters = []
if query_object.dbCName:
filters.append(MetaSecurityCol.dbCName.like(f"%{query_object.dbCName}%"))
if query_object.obj_value:
filters.append(MetaSecurityCol.obj_value.like(f"%{query_object.obj_value}%"))
if query_object.dbTName:
filters.append(MetaSecurityCol.dbTName.like(f"%{query_object.dbTName}%"))
if query_object.obj_name:
filters.append(MetaSecurityCol.obj_name.like(f"%{query_object.obj_name}%"))
if query_object.dbSName:
filters.append(MetaSecurityCol.dbSName.like(f"%{query_object.dbSName}%"))
if query_object.dbRID:
filters.append(MetaSecurityCol.dbRID==query_object.dbRID)
# 构建查询语句
query = (
select(MetaSecurityCol)
.where(*filters)
.order_by(desc(MetaSecurityCol.create_time)) # 按创建时间降序排序
)
# 分页处理
col_list = await PageUtil.paginate(
db, query, query_object.page_num, query_object.page_size, is_page
)
return col_list
@classmethod
async def get_meta_security_row_list(cls, db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False):
"""
获取 MetaSecurityRow 的列表信息支持模糊查询和分页
:param db: ORM对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 列表信息
"""
# 构建查询条件
filters = []
if query_object.dbCName:
filters.append(MetaSecurityRow.dbCName.like(f"%{query_object.dbCName}%"))
if query_object.obj_value:
filters.append(MetaSecurityRow.obj_value.like(f"%{query_object.obj_value}%"))
if query_object.dbTName:
filters.append(MetaSecurityRow.dbTName.like(f"%{query_object.dbTName}%"))
if query_object.dbSName:
filters.append(MetaSecurityRow.dbSName.like(f"%{query_object.dbSName}%"))
if query_object.obj_name:
filters.append(MetaSecurityRow.obj_name.like(f"%{query_object.obj_name}%"))
if query_object.dbRID:
filters.append(MetaSecurityRow.dbRID==query_object.dbRID)
# 构建查询语句
query = (
select(MetaSecurityRow)
.where(*filters)
.order_by(desc(MetaSecurityRow.create_time)) # 按创建时间降序排序
)
# 分页处理
row_list = await PageUtil.paginate(
db, query, query_object.page_num, query_object.page_size, is_page
)
return row_list
1 week ago
@classmethod
async def get_schema_by_system(cls, db: AsyncSession, ssys_id: int):
"""
获取指定系统下的模式名schema从两个表取合集去重
"""
if not ssys_id:
raise ValueError("系统名(ssys_id)为必填参数")
# 第一个表
q1 = (
select(MetadataExtractInfo.mdl_name.label("schema"))
.where(MetadataExtractInfo.ssys_id == ssys_id)
)
# 第二个表
q2 = (
select(MetadataSuppInfo.mdl_name.label("schema"))
.where(MetadataSuppInfo.ssys_id == ssys_id)
)
# UNION ALL + DISTINCT(正确写法)
union_sub = q1.union_all(q2).subquery()
union_query = (
select(func.distinct(union_sub.c.schema).label("schema"))
.order_by(union_sub.c.schema)
)
result = await db.execute(union_query)
return [row.schema for row in result.fetchall() if row.schema]
@classmethod
async def get_api_col_list(cls, db: AsyncSession, dbRCode: str,dbSName: str,dbTname:str,objType:str,objValue:str):
colList = (
await db.execute(
select(MetaSecurityCol)
.where(
(MetaSecurityCol.isStop == 0) &
(MetaSecurityCol.dbRName == dbRCode) &
(MetaSecurityCol.obj_type == objType) &
(MetaSecurityCol.dbSName == dbSName) &
(MetaSecurityCol.dbTName == dbTname) &
(MetaSecurityCol.obj_value == objValue)
)
)
).scalars().all()
return colList
@classmethod
async def get_api_row_list(cls, db: AsyncSession, dbRCode: str,dbSName: str,dbTname:str,objType:str,objValue:str):
colList = (
await db.execute(
select(MetaSecurityRow)
.where(
(MetaSecurityRow.isStop == 0) &
(MetaSecurityRow.dbRName == dbRCode) &
(MetaSecurityRow.obj_type == objType) &
(MetaSecurityRow.dbSName == dbSName) &
(MetaSecurityRow.dbTName == dbTname) &
(MetaSecurityRow.obj_value == objValue)
)
)
).scalars().all()
return colList
@classmethod
async def get_meta_security_col_by_id(cls, db: AsyncSession, colId: str):
col = (
await db.execute(
select(MetaSecurityCol)
.where(MetaSecurityCol.colId == colId)
)
).scalars().first()
return col
@classmethod
async def get_meta_security_row_by_id(cls, db: AsyncSession, rowId: str):
row = (
await db.execute(
select(MetaSecurityRow)
.where(MetaSecurityRow.rowId == rowId)
)
).scalars().first()
return row
@classmethod
async def add_meta_security_col(cls, db: AsyncSession,colModel:MetaSecurityColModel):
col = MetaSecurityCol(
**colModel.model_dump()
)
db.add(col)
await db.flush()
return col
@classmethod
async def add_meta_security_row(cls, db: AsyncSession,rowModel:MetaSecurityRowModel):
row = MetaSecurityRow(**rowModel.model_dump())
db.add(row)
await db.flush()
return row
@classmethod
async def delete_meta_security_col(cls, db: AsyncSession, colId: str):
await db.execute(delete(MetaSecurityCol).where(MetaSecurityCol.colId == colId))
@classmethod
async def delete_meta_security_row(cls, db: AsyncSession, rowId: str):
await db.execute(delete(MetaSecurityRow).where(MetaSecurityRow.rowId == rowId))
@classmethod
async def update_meta_security_col(cls, db: AsyncSession, update_data: MetaSecurityColModel):
await db.execute(update(MetaSecurityCol), [update_data])
await db.flush()
@classmethod
async def update_meta_security_row(cls, db: AsyncSession, update_data: MetaSecurityRowModel):
await db.execute(update(MetaSecurityRow), [update_data])
await db.flush()