You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

163 lines
5.7 KiB

from sqlalchemy import delete, select, update, desc
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.datastd_do import DataStdCode, DataStdCodeItem
from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DataStdCodeItemModel
from utils.page_util import PageUtil
class DataStdDao:
"""
数据标准模块
"""
@classmethod
async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False):
"""
获取 DataStdCode 的列表信息,支持模糊查询和分页
:param db: ORM对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 列表信息
"""
# 构建查询条件
filters = []
if query_object.code_name:
filters.append(DataStdCode.code_name.like(f"%{query_object.code_name}%"))
if query_object.code_status:
filters.append(DataStdCode.code_status==query_object.code_status)
if query_object.sys_id:
filters.append(DataStdCode.sys_id==query_object.sys_id)
if query_object.code_type:
filters.append(DataStdCode.code_type==query_object.code_type)
# 构建查询语句
query = (
select(DataStdCode)
.where(*filters)
.order_by(desc(DataStdCode.create_time)) # 按创建时间降序排序
)
# 分页处理
col_list = await PageUtil.paginate(
db, query, query_object.page_num, query_object.page_size, is_page
)
return col_list
@classmethod
async def get_std_code_item_list(cls, db: AsyncSession, query_object: DataStdCodeItemModel, is_page: bool = False):
"""
获取 DataStdCodeItem 的列表信息,支持模糊查询和分页
:param db: ORM对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 列表信息
"""
# 构建查询条件
filters = []
if query_object.code_name:
filters.append(DataStdCodeItem.code_name.like(f"%{query_object.code_name}%") or DataStdCodeItem.code_num.like(f"%{query_object.code_name}%"))
if query_object.code_status:
filters.append(DataStdCodeItem.code_status==query_object.code_status)
if query_object.code_mean:
filters.append(DataStdCodeItem.code_mean==query_object.code_mean)
if query_object.parent_id:
filters.append(DataStdCodeItem.parent_id==query_object.parent_id)
else :
filters.append(1==2)
# 构建查询语句
query = (
select(DataStdCodeItem)
.where(*filters)
.order_by(desc(DataStdCodeItem.create_time)) # 按创建时间降序排序
)
# 分页处理
col_list = await PageUtil.paginate(
db, query, query_object.page_num, query_object.page_size, is_page
)
return col_list
@classmethod
async def get_data_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel):
List = (
await db.execute(
select(DataStdCode)
.where(
DataStdCode.code_name == query_object.code_name if query_object.code_name else True,
DataStdCode.code_status == query_object.code_status if query_object.code_status else True,
DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True,
DataStdCode.code_type == query_object.code_type if query_object.code_type else True
)
)
).scalars().all()
return List
@classmethod
async def get_data_code_item_list(cls, db: AsyncSession, query_object: DataStdCodeItemModel):
List = (
await db.execute(
select(DataStdCodeItem)
.where(
DataStdCodeItem.code_name == query_object.code_name if query_object.code_name else True,
DataStdCodeItem.code_status == query_object.code_status if query_object.code_status else True,
)
)
).scalars().all()
return List
@classmethod
async def get_std_code_by_id(cls, db: AsyncSession, Id: str):
col = (
await db.execute(
select(DataStdCode)
.where(DataStdCode.id == Id)
)
).scalars().first()
return col
@classmethod
async def get_std_code_item_by_id(cls, db: AsyncSession, Id: str):
col = (
await db.execute(
select(DataStdCodeItem)
.where(DataStdCodeItem.id == Id)
)
).scalars().first()
return col
@classmethod
async def add_std_code(cls, db: AsyncSession,model:DataStdCodeModel):
col = DataStdCode(
**model.model_dump()
)
db.add(col)
await db.flush()
return col
@classmethod
async def add_std_code_item(cls, db: AsyncSession,model:DataStdCodeItemModel):
col = DataStdCodeItem(
**model.model_dump()
)
db.add(col)
await db.flush()
return col
@classmethod
async def delete_std_code(cls, db: AsyncSession, Id: str):
await db.execute(delete(DataStdCode).where(DataStdCode.id == Id))
@classmethod
async def delete_std_code_Item(cls, db: AsyncSession, Id: str):
await db.execute(delete(DataStdCodeItem).where(DataStdCodeItem.id == Id))
@classmethod
async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel):
await db.execute(update(DataStdCode), [update_data])
await db.flush()
@classmethod
async def update_std_code_Item(cls, db: AsyncSession, update_data: DataStdCodeItemModel):
await db.execute(update(DataStdCodeItem), [update_data])
await db.flush()