Browse Source

style: 使用ruff格式化字典管理模块,优化导入

master
insistence 7 months ago
parent
commit
eac920fcae
  1. 147
      ruoyi-fastapi-backend/module_admin/controller/dict_controller.py
  2. 150
      ruoyi-fastapi-backend/module_admin/dao/dict_dao.py
  3. 10
      ruoyi-fastapi-backend/module_admin/entity/do/dict_do.py
  4. 20
      ruoyi-fastapi-backend/module_admin/entity/vo/dict_vo.py
  5. 135
      ruoyi-fastapi-backend/module_admin/service/dict_service.py

147
ruoyi-fastapi-backend/module_admin/controller/dict_controller.py

@ -1,34 +1,58 @@
from fastapi import APIRouter
from fastapi import Depends
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.enums import BusinessType
from config.get_db import get_db
from module_admin.service.login_service import LoginService, CurrentUserModel
from module_admin.service.dict_service import *
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.annotation.log_annotation import log_decorator
from config.enums import BusinessType
from utils.response_util import *
from utils.log_util import *
from utils.page_util import PageResponseModel
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.entity.vo.dict_vo import (
DeleteDictDataModel,
DeleteDictTypeModel,
DictDataModel,
DictDataPageQueryModel,
DictTypeModel,
DictTypePageQueryModel,
)
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_admin.service.dict_service import DictDataService, DictTypeService
from module_admin.service.login_service import LoginService
from utils.common_util import bytes2file_response
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
dictController = APIRouter(prefix='/system/dict', dependencies=[Depends(LoginService.get_current_user)])
@dictController.get("/type/list", response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:list'))])
async def get_system_dict_type_list(request: Request, dict_type_page_query: DictTypePageQueryModel = Depends(DictTypePageQueryModel.as_query), query_db: AsyncSession = Depends(get_db)):
@dictController.get(
'/type/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:list'))]
)
async def get_system_dict_type_list(
request: Request,
dict_type_page_query: DictTypePageQueryModel = Depends(DictTypePageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
dict_type_page_query_result = await DictTypeService.get_dict_type_list_services(query_db, dict_type_page_query, is_page=True)
dict_type_page_query_result = await DictTypeService.get_dict_type_list_services(
query_db, dict_type_page_query, is_page=True
)
logger.info('获取成功')
return ResponseUtil.success(model_content=dict_type_page_query_result)
@dictController.post("/type", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:add'))])
@dictController.post('/type', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:add'))])
@ValidateFields(validate_model='add_dict_type')
@log_decorator(title='字典管理', business_type=BusinessType.INSERT)
async def add_system_dict_type(request: Request, add_dict_type: DictTypeModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
async def add_system_dict_type(
request: Request,
add_dict_type: DictTypeModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
add_dict_type.create_by = current_user.user.user_name
add_dict_type.create_time = datetime.now()
add_dict_type.update_by = current_user.user.user_name
@ -39,10 +63,15 @@ async def add_system_dict_type(request: Request, add_dict_type: DictTypeModel, q
return ResponseUtil.success(msg=add_dict_type_result.message)
@dictController.put("/type", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:edit'))])
@dictController.put('/type', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:edit'))])
@ValidateFields(validate_model='edit_dict_type')
@log_decorator(title='字典管理', business_type=BusinessType.UPDATE)
async def edit_system_dict_type(request: Request, edit_dict_type: DictTypeModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
async def edit_system_dict_type(
request: Request,
edit_dict_type: DictTypeModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_dict_type.update_by = current_user.user.user_name
edit_dict_type.update_time = datetime.now()
edit_dict_type_result = await DictTypeService.edit_dict_type_services(request, query_db, edit_dict_type)
@ -51,7 +80,7 @@ async def edit_system_dict_type(request: Request, edit_dict_type: DictTypeModel,
return ResponseUtil.success(msg=edit_dict_type_result.message)
@dictController.delete("/type/refreshCache", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@dictController.delete('/type/refreshCache', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@log_decorator(title='字典管理', business_type=BusinessType.UPDATE)
async def refresh_system_dict(request: Request, query_db: AsyncSession = Depends(get_db)):
refresh_dict_result = await DictTypeService.refresh_sys_dict_services(request, query_db)
@ -60,7 +89,7 @@ async def refresh_system_dict(request: Request, query_db: AsyncSession = Depends
return ResponseUtil.success(msg=refresh_dict_result.message)
@dictController.delete("/type/{dict_ids}", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@dictController.delete('/type/{dict_ids}', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@log_decorator(title='字典管理', business_type=BusinessType.DELETE)
async def delete_system_dict_type(request: Request, dict_ids: str, query_db: AsyncSession = Depends(get_db)):
delete_dict_type = DeleteDictTypeModel(dictIds=dict_ids)
@ -70,15 +99,19 @@ async def delete_system_dict_type(request: Request, dict_ids: str, query_db: Asy
return ResponseUtil.success(msg=delete_dict_type_result.message)
@dictController.get("/type/optionselect", response_model=List[DictTypeModel])
@dictController.get('/type/optionselect', response_model=List[DictTypeModel])
async def query_system_dict_type_options(request: Request, query_db: AsyncSession = Depends(get_db)):
dict_type_query_result = await DictTypeService.get_dict_type_list_services(query_db, DictTypePageQueryModel(**dict()), is_page=False)
logger.info(f'获取成功')
dict_type_query_result = await DictTypeService.get_dict_type_list_services(
query_db, DictTypePageQueryModel(**dict()), is_page=False
)
logger.info('获取成功')
return ResponseUtil.success(data=dict_type_query_result)
@dictController.get("/type/{dict_id}", response_model=DictTypeModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:query'))])
@dictController.get(
'/type/{dict_id}', response_model=DictTypeModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:query'))]
)
async def query_detail_system_dict_type(request: Request, dict_id: int, query_db: AsyncSession = Depends(get_db)):
dict_type_detail_result = await DictTypeService.dict_type_detail_services(query_db, dict_id)
logger.info(f'获取dict_id为{dict_id}的信息成功')
@ -86,39 +119,60 @@ async def query_detail_system_dict_type(request: Request, dict_id: int, query_db
return ResponseUtil.success(data=dict_type_detail_result)
@dictController.post("/type/export", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:export'))])
@dictController.post('/type/export', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:export'))])
@log_decorator(title='字典管理', business_type=BusinessType.EXPORT)
async def export_system_dict_type_list(request: Request, dict_type_page_query: DictTypePageQueryModel = Depends(DictTypePageQueryModel.as_form), query_db: AsyncSession = Depends(get_db)):
async def export_system_dict_type_list(
request: Request,
dict_type_page_query: DictTypePageQueryModel = Depends(DictTypePageQueryModel.as_form),
query_db: AsyncSession = Depends(get_db),
):
# 获取全量数据
dict_type_query_result = await DictTypeService.get_dict_type_list_services(query_db, dict_type_page_query, is_page=False)
dict_type_query_result = await DictTypeService.get_dict_type_list_services(
query_db, dict_type_page_query, is_page=False
)
dict_type_export_result = await DictTypeService.export_dict_type_list_services(dict_type_query_result)
logger.info('导出成功')
return ResponseUtil.streaming(data=bytes2file_response(dict_type_export_result))
@dictController.get("/data/type/{dict_type}")
@dictController.get('/data/type/{dict_type}')
async def query_system_dict_type_data(request: Request, dict_type: str, query_db: AsyncSession = Depends(get_db)):
# 获取全量数据
dict_data_query_result = await DictDataService.query_dict_data_list_from_cache_services(request.app.state.redis, dict_type)
dict_data_query_result = await DictDataService.query_dict_data_list_from_cache_services(
request.app.state.redis, dict_type
)
logger.info('获取成功')
return ResponseUtil.success(data=dict_data_query_result)
@dictController.get("/data/list", response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:list'))])
async def get_system_dict_data_list(request: Request, dict_data_page_query: DictDataPageQueryModel = Depends(DictDataPageQueryModel.as_query), query_db: AsyncSession = Depends(get_db)):
@dictController.get(
'/data/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:list'))]
)
async def get_system_dict_data_list(
request: Request,
dict_data_page_query: DictDataPageQueryModel = Depends(DictDataPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
dict_data_page_query_result = await DictDataService.get_dict_data_list_services(query_db, dict_data_page_query, is_page=True)
dict_data_page_query_result = await DictDataService.get_dict_data_list_services(
query_db, dict_data_page_query, is_page=True
)
logger.info('获取成功')
return ResponseUtil.success(model_content=dict_data_page_query_result)
@dictController.post("/data", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:add'))])
@dictController.post('/data', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:add'))])
@ValidateFields(validate_model='add_dict_data')
@log_decorator(title='字典管理', business_type=BusinessType.INSERT)
async def add_system_dict_data(request: Request, add_dict_data: DictDataModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
async def add_system_dict_data(
request: Request,
add_dict_data: DictDataModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
add_dict_data.create_by = current_user.user.user_name
add_dict_data.create_time = datetime.now()
add_dict_data.update_by = current_user.user.user_name
@ -129,10 +183,15 @@ async def add_system_dict_data(request: Request, add_dict_data: DictDataModel, q
return ResponseUtil.success(msg=add_dict_data_result.message)
@dictController.put("/data", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:edit'))])
@dictController.put('/data', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:edit'))])
@ValidateFields(validate_model='edit_dict_data')
@log_decorator(title='字典管理', business_type=BusinessType.UPDATE)
async def edit_system_dict_data(request: Request, edit_dict_data: DictDataModel, query_db: AsyncSession = Depends(get_db), current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
async def edit_system_dict_data(
request: Request,
edit_dict_data: DictDataModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_dict_data.update_by = current_user.user.user_name
edit_dict_data.update_time = datetime.now()
edit_dict_data_result = await DictDataService.edit_dict_data_services(request, query_db, edit_dict_data)
@ -141,7 +200,7 @@ async def edit_system_dict_data(request: Request, edit_dict_data: DictDataModel,
return ResponseUtil.success(msg=edit_dict_data_result.message)
@dictController.delete("/data/{dict_codes}", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@dictController.delete('/data/{dict_codes}', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:remove'))])
@log_decorator(title='字典管理', business_type=BusinessType.DELETE)
async def delete_system_dict_data(request: Request, dict_codes: str, query_db: AsyncSession = Depends(get_db)):
delete_dict_data = DeleteDictDataModel(dictCodes=dict_codes)
@ -151,7 +210,11 @@ async def delete_system_dict_data(request: Request, dict_codes: str, query_db: A
return ResponseUtil.success(msg=delete_dict_data_result.message)
@dictController.get("/data/{dict_code}", response_model=DictDataModel, dependencies=[Depends(CheckUserInterfaceAuth('system:dict:query'))])
@dictController.get(
'/data/{dict_code}',
response_model=DictDataModel,
dependencies=[Depends(CheckUserInterfaceAuth('system:dict:query'))],
)
async def query_detail_system_dict_data(request: Request, dict_code: int, query_db: AsyncSession = Depends(get_db)):
detail_dict_data_result = await DictDataService.dict_data_detail_services(query_db, dict_code)
logger.info(f'获取dict_code为{dict_code}的信息成功')
@ -159,11 +222,17 @@ async def query_detail_system_dict_data(request: Request, dict_code: int, query_
return ResponseUtil.success(data=detail_dict_data_result)
@dictController.post("/data/export", dependencies=[Depends(CheckUserInterfaceAuth('system:dict:export'))])
@dictController.post('/data/export', dependencies=[Depends(CheckUserInterfaceAuth('system:dict:export'))])
@log_decorator(title='字典管理', business_type=BusinessType.EXPORT)
async def export_system_dict_data_list(request: Request, dict_data_page_query: DictDataPageQueryModel = Depends(DictDataPageQueryModel.as_form), query_db: AsyncSession = Depends(get_db)):
async def export_system_dict_data_list(
request: Request,
dict_data_page_query: DictDataPageQueryModel = Depends(DictDataPageQueryModel.as_form),
query_db: AsyncSession = Depends(get_db),
):
# 获取全量数据
dict_data_query_result = await DictDataService.get_dict_data_list_services(query_db, dict_data_page_query, is_page=False)
dict_data_query_result = await DictDataService.get_dict_data_list_services(
query_db, dict_data_page_query, is_page=False
)
dict_data_export_result = await DictDataService.export_dict_data_list_services(dict_data_query_result)
logger.info('导出成功')

150
ruoyi-fastapi-backend/module_admin/dao/dict_dao.py

@ -1,10 +1,10 @@
from sqlalchemy import select, update, delete, and_, func
from datetime import datetime, time
from sqlalchemy import and_, delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.dict_do import SysDictType, SysDictData
from module_admin.entity.vo.dict_vo import *
from utils.time_format_util import list_format_datetime
from module_admin.entity.vo.dict_vo import DictDataModel, DictDataPageQueryModel, DictTypeModel, DictTypePageQueryModel
from utils.page_util import PageUtil
from datetime import datetime, time
from utils.time_format_util import list_format_datetime
class DictTypeDao:
@ -20,10 +20,7 @@ class DictTypeDao:
:param dict_id: 字典类型id
:return: 字典类型信息对象
"""
dict_type_info = (await db.execute(
select(SysDictType)
.where(SysDictType.dict_id == dict_id)
)).scalars().first()
dict_type_info = (await db.execute(select(SysDictType).where(SysDictType.dict_id == dict_id))).scalars().first()
return dict_type_info
@ -35,11 +32,18 @@ class DictTypeDao:
:param dict_type: 字典类型参数对象
:return: 字典类型信息对象
"""
dict_type_info = (await db.execute(
select(SysDictType)
.where(SysDictType.dict_type == dict_type.dict_type if dict_type.dict_type else True,
SysDictType.dict_name == dict_type.dict_name if dict_type.dict_name else True)
)).scalars().first()
dict_type_info = (
(
await db.execute(
select(SysDictType).where(
SysDictType.dict_type == dict_type.dict_type if dict_type.dict_type else True,
SysDictType.dict_name == dict_type.dict_name if dict_type.dict_name else True,
)
)
)
.scalars()
.first()
)
return dict_type_info
@ -50,9 +54,7 @@ class DictTypeDao:
:param db: orm对象
:return: 字典类型信息列表对象
"""
dict_type_info = (await db.execute(
select(SysDictType)
)).scalars().all()
dict_type_info = (await db.execute(select(SysDictType))).scalars().all()
return list_format_datetime(dict_type_info)
@ -65,15 +67,21 @@ class DictTypeDao:
:param is_page: 是否开启分页
:return: 字典类型列表信息对象
"""
query = select(SysDictType) \
.where(SysDictType.dict_name.like(f'%{query_object.dict_name}%') if query_object.dict_name else True,
SysDictType.dict_type.like(f'%{query_object.dict_type}%') if query_object.dict_type else True,
SysDictType.status == query_object.status if query_object.status else True,
SysDictType.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)))
if query_object.begin_time and query_object.end_time else True) \
query = (
select(SysDictType)
.where(
SysDictType.dict_name.like(f'%{query_object.dict_name}%') if query_object.dict_name else True,
SysDictType.dict_type.like(f'%{query_object.dict_type}%') if query_object.dict_type else True,
SysDictType.status == query_object.status if query_object.status else True,
SysDictType.create_time.between(
datetime.combine(datetime.strptime(query_object.begin_time, '%Y-%m-%d'), time(00, 00, 00)),
datetime.combine(datetime.strptime(query_object.end_time, '%Y-%m-%d'), time(23, 59, 59)),
)
if query_object.begin_time and query_object.end_time
else True,
)
.distinct()
)
dict_type_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return dict_type_list
@ -100,10 +108,7 @@ class DictTypeDao:
:param dict_type: 需要更新的字典类型字典
:return:
"""
await db.execute(
update(SysDictType),
[dict_type]
)
await db.execute(update(SysDictType), [dict_type])
@classmethod
async def delete_dict_type_dao(cls, db: AsyncSession, dict_type: DictTypeModel):
@ -113,10 +118,7 @@ class DictTypeDao:
:param dict_type: 字典类型对象
:return:
"""
await db.execute(
delete(SysDictType)
.where(SysDictType.dict_id.in_([dict_type.dict_id]))
)
await db.execute(delete(SysDictType).where(SysDictType.dict_id.in_([dict_type.dict_id])))
class DictDataDao:
@ -132,10 +134,9 @@ class DictDataDao:
:param dict_code: 字典数据id
:return: 字典数据信息对象
"""
dict_data_info = (await db.execute(
select(SysDictData)
.where(SysDictData.dict_code == dict_code)
)).scalars().first()
dict_data_info = (
(await db.execute(select(SysDictData).where(SysDictData.dict_code == dict_code))).scalars().first()
)
return dict_data_info
@ -147,12 +148,19 @@ class DictDataDao:
:param dict_data: 字典数据参数对象
:return: 字典数据信息对象
"""
dict_data_info = (await db.execute(
select(SysDictData)
.where(SysDictData.dict_type == dict_data.dict_type,
SysDictData.dict_label == dict_data.dict_label,
SysDictData.dict_value == dict_data.dict_value)
)).scalars().first()
dict_data_info = (
(
await db.execute(
select(SysDictData).where(
SysDictData.dict_type == dict_data.dict_type,
SysDictData.dict_label == dict_data.dict_label,
SysDictData.dict_value == dict_data.dict_value,
)
)
)
.scalars()
.first()
)
return dict_data_info
@ -165,12 +173,16 @@ class DictDataDao:
:param is_page: 是否开启分页
:return: 字典数据列表信息对象
"""
query = select(SysDictData) \
.where(SysDictData.dict_type == query_object.dict_type if query_object.dict_type else True,
SysDictData.dict_label.like(f'%{query_object.dict_label}%') if query_object.dict_label else True,
SysDictData.status == query_object.status if query_object.status else True)\
.order_by(SysDictData.dict_sort)\
query = (
select(SysDictData)
.where(
SysDictData.dict_type == query_object.dict_type if query_object.dict_type else True,
SysDictData.dict_label.like(f'%{query_object.dict_label}%') if query_object.dict_label else True,
SysDictData.status == query_object.status if query_object.status else True,
)
.order_by(SysDictData.dict_sort)
.distinct()
)
dict_data_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return dict_data_list
@ -183,14 +195,24 @@ class DictDataDao:
:param dict_type: 字典类型
:return: 字典数据列表信息对象
"""
dict_data_list = (await db.execute(
select(SysDictData)
.select_from(SysDictType)
.where(SysDictType.dict_type == dict_type if dict_type else True, SysDictType.status == '0')
.join(SysDictData, and_(SysDictType.dict_type == SysDictData.dict_type, SysDictData.status == '0'), isouter=True)
.order_by(SysDictData.dict_sort)
.distinct()
)).scalars().all()
dict_data_list = (
(
await db.execute(
select(SysDictData)
.select_from(SysDictType)
.where(SysDictType.dict_type == dict_type if dict_type else True, SysDictType.status == '0')
.join(
SysDictData,
and_(SysDictType.dict_type == SysDictData.dict_type, SysDictData.status == '0'),
isouter=True,
)
.order_by(SysDictData.dict_sort)
.distinct()
)
)
.scalars()
.all()
)
return dict_data_list
@ -216,10 +238,7 @@ class DictDataDao:
:param dict_data: 需要更新的字典数据字典
:return:
"""
await db.execute(
update(SysDictData),
[dict_data]
)
await db.execute(update(SysDictData), [dict_data])
@classmethod
async def delete_dict_data_dao(cls, db: AsyncSession, dict_data: DictDataModel):
@ -229,10 +248,7 @@ class DictDataDao:
:param dict_data: 字典数据对象
:return:
"""
await db.execute(
delete(SysDictData)
.where(SysDictData.dict_code.in_([dict_data.dict_code]))
)
await db.execute(delete(SysDictData).where(SysDictData.dict_code.in_([dict_data.dict_code])))
@classmethod
async def count_dict_data_dao(cls, db: AsyncSession, dict_type: str):
@ -242,10 +258,8 @@ class DictDataDao:
:param dict_type: 字典类型
:return: 字典类型关联的字典数据数量
"""
dict_data_count = (await db.execute(
select(func.count('*'))
.select_from(SysDictData)
.where(SysDictData.dict_type == dict_type)
)).scalar()
dict_data_count = (
await db.execute(select(func.count('*')).select_from(SysDictData).where(SysDictData.dict_type == dict_type))
).scalar()
return dict_data_count

10
ruoyi-fastapi-backend/module_admin/entity/do/dict_do.py

@ -1,12 +1,13 @@
from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint
from config.database import Base
from datetime import datetime
from sqlalchemy import Column, DateTime, Integer, String, UniqueConstraint
from config.database import Base
class SysDictType(Base):
"""
字典类型表
"""
__tablename__ = 'sys_dict_type'
dict_id = Column(Integer, primary_key=True, autoincrement=True, comment='字典主键')
@ -19,15 +20,14 @@ class SysDictType(Base):
update_time = Column(DateTime, nullable=True, default=datetime.now(), comment='更新时间')
remark = Column(String(500), nullable=True, default='', comment='备注')
__table_args__ = (
UniqueConstraint('dict_type', name='uq_sys_dict_type_dict_type'),
)
__table_args__ = (UniqueConstraint('dict_type', name='uq_sys_dict_type_dict_type'),)
class SysDictData(Base):
"""
字典数据表
"""
__tablename__ = 'sys_dict_data'
dict_code = Column(Integer, primary_key=True, autoincrement=True, comment='字典编码')

20
ruoyi-fastapi-backend/module_admin/entity/vo/dict_vo.py

@ -1,15 +1,16 @@
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
from pydantic_validation_decorator import NotBlank, Pattern, Size
from typing import Union, Optional, List, Literal
from datetime import datetime
from module_admin.annotation.pydantic_annotation import as_query, as_form
from typing import Literal, Optional
from module_admin.annotation.pydantic_annotation import as_form, as_query
class DictTypeModel(BaseModel):
"""
字典类型表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
dict_id: Optional[int] = Field(default=None, description='字典主键')
@ -29,7 +30,11 @@ class DictTypeModel(BaseModel):
@NotBlank(field_name='dict_type', message='字典类型不能为空')
@Size(field_name='dict_type', min_length=0, max_length=100, message='字典类型类型长度不能超过100个字符')
@Pattern(field_name='dict_type', regexp='^[a-z][a-z0-9_]*$', message='字典类型必须以字母开头,且只能为(小写字母,数字,下滑线)')
@Pattern(
field_name='dict_type',
regexp='^[a-z][a-z0-9_]*$',
message='字典类型必须以字母开头,且只能为(小写字母,数字,下滑线)',
)
def get_dict_type(self):
return self.dict_type
@ -42,6 +47,7 @@ class DictDataModel(BaseModel):
"""
字典数据表对应pydantic模型
"""
model_config = ConfigDict(alias_generator=to_camel, from_attributes=True)
dict_code: Optional[int] = Field(default=None, description='字典编码')
@ -89,6 +95,7 @@ class DictTypeQueryModel(DictTypeModel):
"""
字典类型管理不分页查询模型
"""
begin_time: Optional[str] = Field(default=None, description='开始时间')
end_time: Optional[str] = Field(default=None, description='结束时间')
@ -99,6 +106,7 @@ class DictTypePageQueryModel(DictTypeQueryModel):
"""
字典类型管理分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
@ -107,6 +115,7 @@ class DeleteDictTypeModel(BaseModel):
"""
删除字典类型模型
"""
model_config = ConfigDict(alias_generator=to_camel)
dict_ids: str = Field(description='需要删除的字典主键')
@ -116,6 +125,7 @@ class DictDataQueryModel(DictDataModel):
"""
字典数据管理不分页查询模型
"""
begin_time: Optional[str] = Field(default=None, description='开始时间')
end_time: Optional[str] = Field(default=None, description='结束时间')
@ -126,6 +136,7 @@ class DictDataPageQueryModel(DictDataQueryModel):
"""
字典数据管理分页查询模型
"""
page_num: int = Field(default=1, description='当前页码')
page_size: int = Field(default=10, description='每页记录数')
@ -134,6 +145,7 @@ class DeleteDictDataModel(BaseModel):
"""
删除字典数据模型
"""
model_config = ConfigDict(alias_generator=to_camel)
dict_codes: str = Field(description='需要删除的字典编码')

135
ruoyi-fastapi-backend/module_admin/service/dict_service.py

@ -1,11 +1,21 @@
from fastapi import Request
import json
from module_admin.dao.dict_dao import *
from module_admin.entity.vo.common_vo import CrudResponseModel
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import CommonConstant
from config.env import RedisInitKeyConfig
from exceptions.exception import ServiceException
from utils.common_util import export_list2excel, CamelCaseUtil
from module_admin.dao.dict_dao import DictDataDao, DictTypeDao
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.entity.vo.dict_vo import (
DeleteDictDataModel,
DeleteDictTypeModel,
DictDataModel,
DictDataPageQueryModel,
DictTypeModel,
DictTypePageQueryModel,
)
from utils.common_util import CamelCaseUtil, export_list2excel
class DictTypeService:
@ -14,7 +24,9 @@ class DictTypeService:
"""
@classmethod
async def get_dict_type_list_services(cls, query_db: AsyncSession, query_object: DictTypePageQueryModel, is_page: bool = False):
async def get_dict_type_list_services(
cls, query_db: AsyncSession, query_object: DictTypePageQueryModel, is_page: bool = False
):
"""
获取字典类型列表信息service
:param query_db: orm对象
@ -35,7 +47,9 @@ class DictTypeService:
:return: 校验结果
"""
dict_id = -1 if page_object.dict_id is None else page_object.dict_id
dict_type = await DictTypeDao.get_dict_type_detail_by_info(query_db, DictTypeModel(dictType=page_object.dict_type))
dict_type = await DictTypeDao.get_dict_type_detail_by_info(
query_db, DictTypeModel(dictType=page_object.dict_type)
)
if dict_type and dict_type.dict_id != dict_id:
return CommonConstant.NOT_UNIQUE
return CommonConstant.UNIQUE
@ -55,7 +69,9 @@ class DictTypeService:
try:
await DictTypeDao.add_dict_type_dao(query_db, page_object)
await query_db.commit()
await request.app.state.redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}", '')
await request.app.state.redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}", ''
)
result = dict(is_success=True, message='新增成功')
except Exception as e:
await query_db.rollback()
@ -83,13 +99,20 @@ class DictTypeService:
dict_data_list = await DictDataDao.get_dict_data_list(query_db, query_dict_data, is_page=False)
if dict_type_info.dict_type != page_object.dict_type:
for dict_data in dict_data_list:
edit_dict_data = DictDataModel(dictCode=dict_data.dict_code, dictType=page_object.dict_type, updateBy=page_object.update_by).model_dump(exclude_unset=True)
edit_dict_data = DictDataModel(
dictCode=dict_data.dict_code,
dictType=page_object.dict_type,
updateBy=page_object.update_by,
).model_dump(exclude_unset=True)
await DictDataDao.edit_dict_data_dao(query_db, edit_dict_data)
await DictTypeDao.edit_dict_type_dao(query_db, edit_dict_type)
await query_db.commit()
if dict_type_info.dict_type != page_object.dict_type:
dict_data = [CamelCaseUtil.transform_result(row) for row in dict_data_list if row]
await request.app.state.redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}", json.dumps(dict_data, ensure_ascii=False, default=str))
await request.app.state.redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}",
json.dumps(dict_data, ensure_ascii=False, default=str),
)
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
@ -98,7 +121,9 @@ class DictTypeService:
raise ServiceException(message='字典类型不存在')
@classmethod
async def delete_dict_type_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteDictTypeModel):
async def delete_dict_type_services(
cls, request: Request, query_db: AsyncSession, page_object: DeleteDictTypeModel
):
"""
删除字典类型信息service
:param request: Request对象
@ -151,15 +176,15 @@ class DictTypeService:
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
"dictId": "字典编号",
"dictName": "字典名称",
"dictType": "字典类型",
"status": "状态",
"createBy": "创建者",
"createTime": "创建时间",
"updateBy": "更新者",
"updateTime": "更新时间",
"remark": "备注",
'dictId': '字典编号',
'dictName': '字典名称',
'dictType': '字典类型',
'status': '状态',
'createBy': '创建者',
'createTime': '创建时间',
'updateBy': '更新者',
'updateTime': '更新时间',
'remark': '备注',
}
data = dict_type_list
@ -169,7 +194,9 @@ class DictTypeService:
item['status'] = '正常'
else:
item['status'] = '停用'
new_data = [{mapping_dict.get(key): value for key, value in item.items() if mapping_dict.get(key)} for item in data]
new_data = [
{mapping_dict.get(key): value for key, value in item.items() if mapping_dict.get(key)} for item in data
]
binary_data = export_list2excel(new_data)
return binary_data
@ -194,7 +221,9 @@ class DictDataService:
"""
@classmethod
async def get_dict_data_list_services(cls, query_db: AsyncSession, query_object: DictDataPageQueryModel, is_page: bool = False):
async def get_dict_data_list_services(
cls, query_db: AsyncSession, query_object: DictDataPageQueryModel, is_page: bool = False
):
"""
获取字典数据列表信息service
:param query_db: orm对象
@ -236,7 +265,10 @@ class DictDataService:
dict_type = dict_type_obj.dict_type
dict_data_list = await DictDataDao.query_dict_data_list(query_db, dict_type)
dict_data = [CamelCaseUtil.transform_result(row) for row in dict_data_list if row]
await redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{dict_type}", json.dumps(dict_data, ensure_ascii=False, default=str))
await redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{dict_type}",
json.dumps(dict_data, ensure_ascii=False, default=str),
)
@classmethod
async def query_dict_data_list_from_cache_services(cls, redis, dict_type: str):
@ -277,13 +309,18 @@ class DictDataService:
:return: 新增字典数据校验结果
"""
if not await cls.check_dict_data_unique_services(query_db, page_object):
raise ServiceException(message=f'新增字典数据{page_object.dict_label}失败,{page_object.dict_type}下已存在该字典数据')
raise ServiceException(
message=f'新增字典数据{page_object.dict_label}失败,{page_object.dict_type}下已存在该字典数据'
)
else:
try:
await DictDataDao.add_dict_data_dao(query_db, page_object)
await query_db.commit()
dict_data_list = await cls.query_dict_data_list_services(query_db, page_object.dict_type)
await request.app.state.redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}", json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str))
await request.app.state.redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}",
json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str),
)
return CrudResponseModel(is_success=True, message='新增成功')
except Exception as e:
await query_db.rollback()
@ -302,13 +339,18 @@ class DictDataService:
dict_data_info = await cls.dict_data_detail_services(query_db, page_object.dict_code)
if dict_data_info.dict_code:
if not await cls.check_dict_data_unique_services(query_db, page_object):
raise ServiceException(message=f'新增字典数据{page_object.dict_label}失败,{page_object.dict_type}下已存在该字典数据')
raise ServiceException(
message=f'新增字典数据{page_object.dict_label}失败,{page_object.dict_type}下已存在该字典数据'
)
else:
try:
await DictDataDao.edit_dict_data_dao(query_db, edit_data_type)
await query_db.commit()
dict_data_list = await cls.query_dict_data_list_services(query_db, page_object.dict_type)
await request.app.state.redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}", json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str))
await request.app.state.redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{page_object.dict_type}",
json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str),
)
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
@ -317,7 +359,9 @@ class DictDataService:
raise ServiceException(message='字典数据不存在')
@classmethod
async def delete_dict_data_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteDictDataModel):
async def delete_dict_data_services(
cls, request: Request, query_db: AsyncSession, page_object: DeleteDictDataModel
):
"""
删除字典数据信息service
:param request: Request对象
@ -336,7 +380,10 @@ class DictDataService:
await query_db.commit()
for dict_type in list(set(delete_dict_type_list)):
dict_data_list = await cls.query_dict_data_list_services(query_db, dict_type)
await request.app.state.redis.set(f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{dict_type}", json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str))
await request.app.state.redis.set(
f"{RedisInitKeyConfig.SYS_DICT.get('key')}:{dict_type}",
json.dumps(CamelCaseUtil.transform_result(dict_data_list), ensure_ascii=False, default=str),
)
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
@ -369,20 +416,20 @@ class DictDataService:
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
"dictCode": "字典编码",
"dictSort": "字典标签",
"dictLabel": "字典键值",
"dictValue": "字典排序",
"dictType": "字典类型",
"cssClass": "样式属性",
"listClass": "表格回显样式",
"isDefault": "是否默认",
"status": "状态",
"createBy": "创建者",
"createTime": "创建时间",
"updateBy": "更新者",
"updateTime": "更新时间",
"remark": "备注",
'dictCode': '字典编码',
'dictSort': '字典标签',
'dictLabel': '字典键值',
'dictValue': '字典排序',
'dictType': '字典类型',
'cssClass': '样式属性',
'listClass': '表格回显样式',
'isDefault': '是否默认',
'status': '状态',
'createBy': '创建者',
'createTime': '创建时间',
'updateBy': '更新者',
'updateTime': '更新时间',
'remark': '备注',
}
data = dict_data_list
@ -396,7 +443,9 @@ class DictDataService:
item['isDefault'] = ''
else:
item['isDefault'] = ''
new_data = [{mapping_dict.get(key): value for key, value in item.items() if mapping_dict.get(key)} for item in data]
new_data = [
{mapping_dict.get(key): value for key, value in item.items() if mapping_dict.get(key)} for item in data
]
binary_data = export_list2excel(new_data)
return binary_data

Loading…
Cancel
Save