You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
285 lines
12 KiB
285 lines
12 KiB
from sqlalchemy import delete, select, update, desc,or_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from module_admin.entity.do.datastd_do import DataStdCode,DataStdDict
|
|
from module_admin.entity.vo.datastd_vo import DataStdCodeModel,DataStdDictModel
|
|
from module_admin.entity.do.dept_do import SysDept
|
|
from utils.page_util import PageUtil
|
|
from sqlalchemy.orm import aliased
|
|
|
|
|
|
class DataStdDao:
|
|
|
|
# ----------------------------------------------------------------数据标准模块----------------------------------------------------------------------------------------------------
|
|
|
|
@classmethod
|
|
async def get_std_code_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False):
|
|
"""
|
|
获取 DataStdCode 的列表信息,支持模糊查询和分页
|
|
:param db: ORM对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 列表信息
|
|
"""
|
|
# 构建查询条件
|
|
filters = []
|
|
if query_object.code_name:
|
|
filters.append(DataStdCode.code_name.like(f"%{query_object.code_name}%"))
|
|
if query_object.code_num:
|
|
filters.append( or_(
|
|
DataStdCode.code_name.like(f"%{query_object.code_num}%"),
|
|
DataStdCode.code_num.like(f"%{query_object.code_num}%")
|
|
))
|
|
if query_object.code_status:
|
|
filters.append(DataStdCode.code_status==query_object.code_status)
|
|
if query_object.sys_id:
|
|
filters.append(DataStdCode.sys_id==query_object.sys_id)
|
|
if query_object.code_type:
|
|
filters.append(DataStdCode.code_type==query_object.code_type)
|
|
|
|
if query_object.class_id:
|
|
filters.append(DataStdCode.class_id==query_object.class_id)
|
|
if query_object.class_id=='codeItem' and not query_object.parent_id:
|
|
filters.append(1==2)
|
|
if query_object.parent_id:
|
|
filters.append(DataStdCode.parent_id==query_object.parent_id)
|
|
# 构建查询语句
|
|
query = (
|
|
select(DataStdCode)
|
|
.where(*filters)
|
|
.order_by(desc(DataStdCode.create_time)) # 按创建时间降序排序
|
|
)
|
|
|
|
# 分页处理
|
|
col_list = await PageUtil.paginate(
|
|
db, query, query_object.page_num, query_object.page_size, is_page
|
|
)
|
|
return col_list
|
|
@classmethod
|
|
async def get_std_code_map_list(cls, db: AsyncSession, query_object: DataStdCodeModel, is_page: bool = False):
|
|
# 构建查询条件
|
|
filters = []
|
|
# 系统级代码
|
|
c1 = aliased(DataStdCode) # 当前 codeItem 表(c1)
|
|
c2 = aliased(DataStdCode) # 别名 c2 表示父级(parent)
|
|
c3 = aliased(DataStdCode) # 别名 c3 表示映射的代码
|
|
c4 = aliased(DataStdCode) # 别名 c4 表示映射父级代码
|
|
if query_object.parent_id:
|
|
if query_object.code_type =='sys':
|
|
filters.append(c1.parent_id == query_object.parent_id)
|
|
else:
|
|
filters.append(c3.parent_id == query_object.parent_id)
|
|
# 公司级代码
|
|
# 2. 创建别名并进行连接
|
|
filters.append(c1.class_id == 'codeItem') # class_id 为 'codeItem'
|
|
# 3. 构建基础查询,使用连接和别名
|
|
query = (
|
|
select(
|
|
c2.sys_id.label("target_sys_id"), # 父级系统编号
|
|
c2.sys_name.label("target_sys_name"), # 父级系统名称
|
|
c2.code_num.label("target_code_num"), # 父级代码编号
|
|
c2.code_name.label("target_code_name"), # 父级代码名称
|
|
c1.code_num.label("target_code_num_item"), # 当前 codeItem 的编号
|
|
c1.code_name.label("target_code_name_item"), # 当前 codeItem 的名称
|
|
c4.code_num.label("resource_code_num"), # 映射的父级代码编号
|
|
c4.code_name.label("resource_code_name"), # 映射的父级代码名称
|
|
c3.code_num.label("resource_code_num_item"), # 映射的父级代码编号
|
|
c3.code_name.label("resource_code_name_item") # 映射的父级代码名称
|
|
)
|
|
.select_from(c1) # 从 c1 开始查询
|
|
.join(c2, c1.parent_id == c2.id, isouter=True) # 连接 c2
|
|
.join(c3, c1.code_map_id == c3.id, isouter=True) # 连接 c3
|
|
.join(c4, c3.parent_id == c4.id, isouter=True) # 连接 c4
|
|
.where(*filters) # 使用所有过滤条件
|
|
.order_by(desc(c1.create_time)) # 按照创建时间降序排列
|
|
)
|
|
# 4. 分页逻辑
|
|
col_list = await PageUtil.paginate(
|
|
db, query, query_object.page_num, query_object.page_size, is_page
|
|
)
|
|
return col_list
|
|
@classmethod
|
|
async def get_data_code_list_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel):
|
|
List = (
|
|
await db.execute(
|
|
select(DataStdCode)
|
|
.where(
|
|
DataStdCode.code_name == query_object.code_name if query_object.code_name else True,
|
|
DataStdCode.code_status == query_object.code_status if query_object.code_status else True,
|
|
DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True,
|
|
DataStdCode.code_type == query_object.code_type if query_object.code_type else True,
|
|
DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True,
|
|
DataStdCode.class_id == query_object.class_id if query_object.class_id else True
|
|
)
|
|
)
|
|
).scalars().all()
|
|
return List
|
|
@classmethod
|
|
async def get_data_code_by_info(cls, db: AsyncSession, query_object: DataStdCodeModel):
|
|
List = (
|
|
await db.execute(
|
|
select(DataStdCode)
|
|
.where(
|
|
DataStdCode.code_name == query_object.code_name if query_object.code_name else True,
|
|
DataStdCode.code_num == query_object.code_num if query_object.code_num else True,
|
|
DataStdCode.code_status == query_object.code_status if query_object.code_status else True,
|
|
DataStdCode.sys_id == query_object.sys_id if query_object.sys_id else True,
|
|
DataStdCode.code_type == query_object.code_type if query_object.code_type else True,
|
|
DataStdCode.parent_id == query_object.parent_id if query_object.parent_id else True,
|
|
DataStdCode.class_id == query_object.class_id if query_object.class_id else True
|
|
)
|
|
)
|
|
).scalars().first()
|
|
return List
|
|
@classmethod
|
|
async def get_std_code_by_id(cls, db: AsyncSession, Id: str):
|
|
col = (
|
|
await db.execute(
|
|
select(DataStdCode)
|
|
.where(DataStdCode.id == Id)
|
|
)
|
|
).scalars().first()
|
|
return col
|
|
|
|
|
|
@classmethod
|
|
async def add_std_code(cls, db: AsyncSession,model:DataStdCodeModel):
|
|
col = DataStdCode(
|
|
**model.model_dump()
|
|
)
|
|
db.add(col)
|
|
await db.flush()
|
|
return col
|
|
|
|
@classmethod
|
|
async def delete_std_code(cls, db: AsyncSession, Id: str):
|
|
await db.execute(delete(DataStdCode).where(DataStdCode.id == Id))
|
|
|
|
|
|
@classmethod
|
|
async def update_std_code(cls, db: AsyncSession, update_data: DataStdCodeModel):
|
|
|
|
await db.execute(update(DataStdCode), [update_data])
|
|
await db.flush()
|
|
|
|
@classmethod
|
|
async def check_code_num_exists(cls, query_db: AsyncSession, code_num: str) -> bool:
|
|
"""
|
|
检查标准代码或代码项的code_num是否已经存在
|
|
"""
|
|
# 查询标准代码表中是否存在相同的code_num
|
|
result = await query_db.execute(select(DataStdCodeModel).filter(DataStdCodeModel.code_num == code_num))
|
|
existing_code = result.scalar_one_or_none()
|
|
|
|
# 如果不存在,则检查代码项表中是否存在相同的code_num
|
|
if not existing_code:
|
|
result = await query_db.execute(select(DataStdCode).filter(DataStdCodeModel.code_num == code_num))
|
|
existing_code_item = result.scalar_one_or_none()
|
|
return existing_code_item is not None
|
|
|
|
return True
|
|
# ----------------------------------------------------------------数据字典----------------------------------------------------------------------------------------------------
|
|
@classmethod
|
|
async def get_std_dict_list(cls, db: AsyncSession, query_object: DataStdDictModel, is_page: bool = False):
|
|
"""
|
|
获取 DataStdDict 的列表信息,支持模糊查询和分页
|
|
:param db: ORM对象
|
|
:param query_object: 查询参数对象
|
|
:param is_page: 是否开启分页
|
|
:return: 列表信息
|
|
"""
|
|
# 构建查询条件
|
|
filters = []
|
|
if query_object.dict_name:
|
|
filters.append(DataStdDict.dict_name.like(f"%{query_object.dict_name}%"))
|
|
if query_object.dict_code:
|
|
filters.append(DataStdDict.dict_code.like(f"%{query_object.dict_code}%"))
|
|
if query_object.dict_level:
|
|
filters.append(DataStdDict.dict_level == query_object.dict_level)
|
|
if query_object.sys_id:
|
|
filters.append(DataStdDict.sys_id == query_object.sys_id)
|
|
if query_object.dict_type:
|
|
filters.append(DataStdDict.dict_type == query_object.dict_type)
|
|
if query_object.dict_status:
|
|
filters.append(DataStdDict.dict_status == query_object.dict_status)
|
|
c1 = aliased(SysDept) # 技术部门
|
|
c2 = aliased(SysDept) # 业务部门
|
|
# 构建查询语句
|
|
query = (
|
|
select(DataStdDict.id,
|
|
DataStdDict.create_by,
|
|
DataStdDict.create_time,
|
|
DataStdDict.update_by,
|
|
DataStdDict.update_time,
|
|
DataStdDict.dict_num,
|
|
DataStdDict.dict_code,
|
|
DataStdDict.dict_name,
|
|
DataStdDict.dict_level,
|
|
DataStdDict.dict_type,
|
|
DataStdDict.sys_name,
|
|
DataStdDict.sys_id,
|
|
DataStdDict.dict_menu,
|
|
DataStdDict.data_type,
|
|
DataStdDict.std_code,
|
|
DataStdDict.std_name,
|
|
DataStdDict.dict_status,
|
|
DataStdDict.buss_dept_id,
|
|
DataStdDict.tech_dept_id,
|
|
DataStdDict.buss_user,
|
|
DataStdDict.tech_user,
|
|
c1.dept_name.label("tech_dept_name"),
|
|
c2.dept_name.label("buss_dept_name")).select_from(DataStdDict)
|
|
.join(c1,c1.dept_id ==DataStdDict.tech_dept_id, isouter=True)
|
|
.join(c2,c2.dept_id ==DataStdDict.buss_dept_id, isouter=True)
|
|
.where(*filters)
|
|
.order_by(desc(DataStdDict.create_time)) # 按创建时间降序排序
|
|
)
|
|
|
|
# 分页处理
|
|
col_list = await PageUtil.paginate(
|
|
db, query, query_object.page_num, query_object.page_size, is_page
|
|
)
|
|
|
|
return col_list
|
|
@classmethod
|
|
async def add_std_dict(cls, db: AsyncSession,model:DataStdDictModel):
|
|
col = DataStdDict(
|
|
**model.model_dump()
|
|
)
|
|
db.add(col)
|
|
await db.flush()
|
|
return col
|
|
|
|
@classmethod
|
|
async def delete_std_dict(cls, db: AsyncSession, Id: str):
|
|
await db.execute(delete(DataStdDict).where(DataStdDict.id == Id))
|
|
|
|
@classmethod
|
|
async def update_std_dict(cls, db: AsyncSession, update_data: DataStdDictModel):
|
|
|
|
await db.execute(update(DataStdDict), [update_data])
|
|
await db.flush()
|
|
|
|
@classmethod
|
|
async def get_std_dict_by_id(cls, db: AsyncSession, Id: str):
|
|
col = (
|
|
await db.execute(
|
|
select(DataStdDict)
|
|
.where(DataStdDict.id == Id)
|
|
)
|
|
).scalars().first()
|
|
return col
|
|
@classmethod
|
|
async def get_data_dict_by_info(cls, db: AsyncSession, query_object: DataStdDictModel):
|
|
List = (
|
|
await db.execute(
|
|
select(DataStdDict)
|
|
.where(
|
|
DataStdDict.dict_name == query_object.dict_name if query_object.dict_name else True,
|
|
DataStdDict.dict_num == query_object.dict_num if query_object.dict_num else True,
|
|
DataStdDict.dict_status == query_object.dict_status if query_object.dict_status else True,
|
|
DataStdDict.sys_id == query_object.sys_id if query_object.sys_id else True,
|
|
DataStdDict.dict_type == query_object.dict_type if query_object.dict_type else True,
|
|
)
|
|
)
|
|
).scalars().first()
|
|
return List
|