diff --git a/vue-fastapi-backend/module_admin/controller/data_asset_controller.py b/vue-fastapi-backend/module_admin/controller/data_asset_controller.py index 65c3211..cbece15 100644 --- a/vue-fastapi-backend/module_admin/controller/data_asset_controller.py +++ b/vue-fastapi-backend/module_admin/controller/data_asset_controller.py @@ -43,6 +43,29 @@ async def get_data_asset_list( return ResponseUtil.success(model_content=data_asset_page_query_result) +@dataAssetController.get('/listWithColumns', response_model=PageResponseModel) +async def get_data_asset_list_with_columns( + request: Request, + data_asset_page_query: DataAssetPageQueryModel = Depends(DataAssetPageQueryModel.as_query), + query_db: AsyncSession = Depends(get_db), +): + """ + 获取数据资产信息列表 + + :param request: 请求对象 + :param data_asset_page_query: 查询参数 + :param query_db: 数据库会话 + :return: 数据资产信息列表 + """ + # 获取分页数据 + data_asset_page_query_result = await DataAssetService.get_data_asset_list_with_columns_services( + query_db, data_asset_page_query, is_page=True + ) + logger.info('获取数据资产信息列表成功') + + return ResponseUtil.success(data=data_asset_page_query_result) + + @dataAssetController.post('/batch') async def batch_process_data_asset( request: Request, diff --git a/vue-fastapi-backend/module_admin/dao/cdplb_dao.py b/vue-fastapi-backend/module_admin/dao/cdplb_dao.py index bea50de..d64e5c7 100644 --- a/vue-fastapi-backend/module_admin/dao/cdplb_dao.py +++ b/vue-fastapi-backend/module_admin/dao/cdplb_dao.py @@ -20,7 +20,8 @@ class CdplbDao: select(SysCdplb).where( SysCdplb.bath_obj_tab_name.like(f'%{cdplb_query.bathObjTabName}%') if cdplb_query.bathObjTabName else True, SysCdplb.bath_obj_fld_name.like(f'%{cdplb_query.bathObjFldName}%') if cdplb_query.bathObjFldName else True, - SysCdplb.pos_name.like(f'%{cdplb_query.pos_name}%') if cdplb_query.pos_name else True, + SysCdplb.ssys_id == cdplb_query.ssys_id if cdplb_query.ssys_id else True, + SysCdplb.mdl_name == cdplb_query.mdl_name if cdplb_query.mdl_name else True, SysCdplb.status == cdplb_query.status if cdplb_query.status else True ).distinct() ) @@ -44,6 +45,21 @@ class CdplbDao: ) return result + @classmethod + async def get_cdplb_by_col(cls, db: AsyncSession, ssys_id: int, mdl_name: str, bath_obj_tab_name: str, bath_obj_fld_name: str): + result = ( + ( + await db.execute( + select(SysCdplb).where(SysCdplb.ssys_id == ssys_id, + SysCdplb.mdl_name == mdl_name, + SysCdplb.bath_obj_tab_name == bath_obj_tab_name, + SysCdplb.bath_obj_fld_name == bath_obj_fld_name + ).distinct() + ) + ).scalars().first() + ) + return result + @classmethod async def update_cdplb(cls, db: AsyncSession, saveObj: dict): await db.execute(update(SysCdplb), [saveObj]) diff --git a/vue-fastapi-backend/module_admin/dao/data_asset_dao.py b/vue-fastapi-backend/module_admin/dao/data_asset_dao.py index 929f037..d25ea8e 100644 --- a/vue-fastapi-backend/module_admin/dao/data_asset_dao.py +++ b/vue-fastapi-backend/module_admin/dao/data_asset_dao.py @@ -1,4 +1,4 @@ -from sqlalchemy import select, update, delete, insert,func ,Integer +from sqlalchemy import select, update, delete, insert, func, Integer, text from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.data_ast_content_do import DataAssetInfo from module_admin.entity.do.dataast_do import DataAssetInfoAppr @@ -38,6 +38,28 @@ class DataAssetDao: data_asset_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) return data_asset_list + @classmethod + async def getColumnsByTable(cls, db: AsyncSession, tableId: int): + sql = text("select a.onum as onum," + "a.ssys_id as ssys_id," + " a.mdl_name as mdl_name," + "a.fld_eng_name as fld_eng_name," + "case when a.fld_cn_name is null then b.fld_crrct_name" + " when a.fld_cn_name is not null then a.fld_cn_name end as fld_cn_name," + "a.fld_type as fld_type," + "case when a.pk_flag is null then b.crrct_pk_flag " + "when a.pk_flag is not null then a.pk_flag end as pk_flag" + " from t_metadata_fld_tab_extract_info a " + "left join t_metadata_fld_supp_info b " + "on a.ssys_id = b.ssys_id and a.mdl_name=b.mdl_name and" + " a.tab_eng_name=b.tab_eng_name and a.fld_eng_name=b.fld_eng_name " + "left join t_metadata_extract_info c " + "on a.ssys_id = c.ssys_id and a.mdl_name=c.mdl_name and a.tab_eng_name=c.tab_eng_name " + "where c.onum = :ssysId order by a.fld_no") + result = (await db.execute(sql, {"ssysId": tableId})) + data = [dict(row._mapping) for row in result] + return data + @classmethod async def get_data_asset_by_ast_no(cls, db: AsyncSession, ast_no: str): """ diff --git a/vue-fastapi-backend/module_admin/entity/do/dataint_do.py b/vue-fastapi-backend/module_admin/entity/do/dataint_do.py index 9d1fc8e..81359f9 100644 --- a/vue-fastapi-backend/module_admin/entity/do/dataint_do.py +++ b/vue-fastapi-backend/module_admin/entity/do/dataint_do.py @@ -52,9 +52,8 @@ class SysCdplb(Base): onum = Column(String(50, collation='utf8_general_ci'), primary_key=True, comment='序号') bath_obj_tab_name = Column(String(500, collation='utf8_general_ci'), comment='批量对象表名') bath_obj_fld_name = Column(String(500, collation='utf8_general_ci'), comment='批里对象字段名') - freq = Column(String(500, collation='utf8_general_ci'), comment='频次') - pos = Column(String(500, collation='utf8_general_ci'), comment='词性') - pos_name = Column(String(500, collation='utf8_general_ci'), comment='词性名称') + ssys_id = Column(Integer, comment='系统ID') + mdl_name = Column(String(50, collation='utf8_general_ci'), comment='模式名称') status = Column(String(1, collation='utf8_general_ci'), comment='状态(0正常 1停用)') create_by = Column(String(64, collation='utf8_general_ci'), comment='创建者') create_time = Column(DateTime, comment='创建时间') diff --git a/vue-fastapi-backend/module_admin/entity/vo/dataint_vo.py b/vue-fastapi-backend/module_admin/entity/vo/dataint_vo.py index fcc6f2a..e99b06b 100644 --- a/vue-fastapi-backend/module_admin/entity/vo/dataint_vo.py +++ b/vue-fastapi-backend/module_admin/entity/vo/dataint_vo.py @@ -55,21 +55,25 @@ class CdplbPageObject(BaseModel): page_size: int bathObjTabName: Optional[str] = None bathObjFldName: Optional[str] = None - pos_name: Optional[str] = None + ssys_id: Optional[int] = None + mdl_name: Optional[str] = None status: Optional[str] = None -class SaveCdplbModel(BaseModel): +class SaveCdplbObj(BaseModel): model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) onum: Optional[str] = None bath_obj_tab_name: Optional[str] = None bath_obj_fld_name: Optional[str] = None - freq: Optional[str] = None - pos: Optional[str] = None - pos_name: Optional[str] = None + ssys_id: Optional[int] = None + mdl_name: Optional[str] = None status: Optional[str] = None +class SaveCdplbModel(BaseModel): + list: List[Union[SaveCdplbObj, None]] + + @as_query class SscfPageObject(BaseModel): model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) @@ -93,6 +97,7 @@ class SaveSscfModel(BaseModel): type: Optional[str] = None supp_expl: Optional[str] = None + @as_query class VecsetPageObject(BaseModel): model_config = ConfigDict(alias_generator=to_camel, from_attributes=True) @@ -126,4 +131,3 @@ class TreeOperateModel(BaseModel): onum: Optional[str] = None dasset_code: Optional[str] = None dasset_name: Optional[str] = None - diff --git a/vue-fastapi-backend/module_admin/service/cdplb_service.py b/vue-fastapi-backend/module_admin/service/cdplb_service.py index bb232a8..9011516 100644 --- a/vue-fastapi-backend/module_admin/service/cdplb_service.py +++ b/vue-fastapi-backend/module_admin/service/cdplb_service.py @@ -9,11 +9,13 @@ from fastapi import UploadFile from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.user_vo import CurrentUserModel -from module_admin.entity.vo.dataint_vo import CdplbPageObject, SaveCdplbModel +from module_admin.entity.vo.dataint_vo import CdplbPageObject, SaveCdplbModel, SaveCdplbObj from module_admin.entity.do.dataint_do import SysCdplb from module_admin.dao.cdplb_dao import CdplbDao from exceptions.exception import ServiceException, ServiceWarning +from config.env import AppConfig from datetime import datetime +import requests from utils.common_util import * @@ -29,24 +31,27 @@ class CdplbService: @classmethod async def save_cdplb(cls, result_db: AsyncSession, saveCdplbModel: SaveCdplbModel, current_user: CurrentUserModel): - if saveCdplbModel.onum is None: - # add - saveCdplbModel.onum = uuid.uuid4() - addObj = SysCdplb(**saveCdplbModel.model_dump()) - addObj.create_by = current_user.user.user_name - addObj.create_time = datetime.now() - addObj.update_by = current_user.user.user_name - addObj.update_time = datetime.now() - await CdplbDao.insert_cdplb(result_db, addObj) - else: - # update - oldObj = await CdplbDao.get_cdplb_by_id(result_db, saveCdplbModel.onum) - if oldObj is None: - raise ServiceException(message='所改对象不存在') - saveObj = saveCdplbModel.model_dump(exclude_unset=True) - saveObj['update_by'] = current_user.user.user_name - saveObj['update_time'] = datetime.now() - await CdplbDao.update_cdplb(result_db, saveObj) + if saveCdplbModel.list and len(saveCdplbModel.list) > 0: + for item in saveCdplbModel.list: + if item.onum is None: + oldObj = await CdplbDao.get_cdplb_by_col(result_db, item.ssys_id, item.mdl_name, item.bath_obj_tab_name, item.bath_obj_fld_name) + if oldObj is None: + addObj = SysCdplb(**item.model_dump()) + addObj.onum = uuid.uuid4() + addObj.status = '1' + addObj.create_by = current_user.user.user_name + addObj.create_time = datetime.now() + addObj.update_by = current_user.user.user_name + addObj.update_time = datetime.now() + await CdplbDao.insert_cdplb(result_db, addObj) + else: + oldObj = await CdplbDao.get_cdplb_by_id(result_db, item.onum) + if oldObj is None: + raise ServiceException(message='所改对象不存在') + saveObj = item.model_dump(exclude_unset=True) + saveObj['update_by'] = current_user.user.user_name + saveObj['update_time'] = datetime.now() + await CdplbDao.update_cdplb(result_db, saveObj) await result_db.commit() return CrudResponseModel(is_success=True, message='操作成功') @@ -62,7 +67,7 @@ class CdplbService: 获取元数据导入模板service :return: 元数据导入模板excel的二进制数据 """ - table_header_list = ['批量对象表名', '批里对象字段名', '频次', '词性', '词性名称', '状态'] + table_header_list = ['系统名称', '模式名称', '批量对象表名', '批里对象字段名', '状态'] selector_header_list = ['状态'] option_list = [{'状态': ['正常', '停用']}] @@ -84,12 +89,21 @@ class CdplbService: file: UploadFile, overWrite: bool, current_user: CurrentUserModel): + url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources?pageNo=1&pageSize=100' + headers = {'dashUserName': current_user.user.user_name, 'dashPassword': current_user.user.password} + response = requests.get(url, headers=headers, verify=False) + dataSourceList = [] + if response.reason == 'OK': + response_text = response.text + data = json.loads(response_text) + dataSourceList = data["data"]["totalList"] + else: + raise ServiceException(message=f'系统异常,获取数据源失败') table_header_dict = { + '系统名称': 'ssys_cd', + '模式名称': 'mdl_name', '批量对象表名': 'bath_obj_tab_name', '批里对象字段名': 'bath_obj_fld_name', - '频次': 'freq', - '词性': 'pos', - '词性名称': 'pos_name', '状态': 'status' } contents = await file.read() @@ -108,6 +122,7 @@ class CdplbService: df.rename(columns=table_header_dict, inplace=True) for index, row in df.iterrows(): noneValid = '' + ssysId = next((item["id"] for item in dataSourceList if item["name"] == row['ssys_cd']), None) if row['bath_obj_tab_name'] is None or len(row['bath_obj_tab_name']) == 0: noneValid += "批量对象表名不能为空" if row['bath_obj_fld_name'] is None or len(row['bath_obj_fld_name']) == 0: @@ -115,40 +130,40 @@ class CdplbService: noneValid += ",批里对象字段名不能为空" else: noneValid += "批里对象字段名不能为空" - if row['freq'] is None or len(row['freq']) == 0: - if len(noneValid) > 0: - noneValid += ",频次不能为空" - else: - noneValid += "频次不能为空" - if row['pos'] is None or len(row['pos']) == 0: + if row['ssys_cd'] is None or len(row['ssys_cd']) == 0: if len(noneValid) > 0: - noneValid += ",词性不能为空" + noneValid += ",系统不能为空" else: - noneValid += "词性不能为空" - if row['pos_name'] is None or len(row['pos_name']) == 0: + noneValid += "系统不能为空" + if row['mdl_name'] is None or len(row['mdl_name']) == 0: if len(noneValid) > 0: - noneValid += ",词性名称不能为空" + noneValid += ",模式不能为空" else: - noneValid += "词性名称不能为空" + noneValid += "模式不能为空" if row['status'] is None or len(row['status']) == 0: if len(noneValid) > 0: noneValid += ",状态不能为空" else: noneValid += "状态不能为空" + if ssysId is None: + if len(noneValid) > 0: + noneValid += ",系统不存在,为无效系统" + else: + noneValid += "系统不存在,为无效系统" if len(noneValid) > 0: result_list['rows'].append({ "row": index + 2, "errorInfo": noneValid }) continue - cdplb = SaveCdplbModel() + cdplb = SaveCdplbObj() + cdplb.ssys_id = ssysId + cdplb.mdl_name = row['mdl_name'] cdplb.bath_obj_tab_name = row['bath_obj_tab_name'] cdplb.bath_obj_fld_name = row['bath_obj_fld_name'] - cdplb.freq = row['freq'] - cdplb.pos = row['pos'] - cdplb.pos_name = row['pos_name'] cdplb.status = '1' if row['status'] == '正常' else '0' - await cls.save_cdplb(result_db, cdplb, current_user) + mdl = SaveCdplbModel(list=[cdplb]) + await cls.save_cdplb(result_db, mdl, current_user) result_list['successCount'] += 1 return result_list diff --git a/vue-fastapi-backend/module_admin/service/data_asset_service.py b/vue-fastapi-backend/module_admin/service/data_asset_service.py index f367866..8372b68 100644 --- a/vue-fastapi-backend/module_admin/service/data_asset_service.py +++ b/vue-fastapi-backend/module_admin/service/data_asset_service.py @@ -26,6 +26,25 @@ class DataAssetService: data_asset_list_result = await DataAssetDao.get_data_asset_list(query_db, query_object, is_page) return data_asset_list_result + @classmethod + async def get_data_asset_list_with_columns_services( + cls, query_db: AsyncSession, query_object: DataAssetPageQueryModel, is_page: bool = False + ): + """ + 获取数据资产信息列表service + + :param query_db: orm对象 + :param query_object: 查询参数对象 + :param is_page: 是否开启分页 + :return: 数据资产信息列表对象 + """ + data_asset_list_result = await DataAssetDao.get_data_asset_list(query_db, query_object, is_page) + if data_asset_list_result and len(data_asset_list_result.rows) > 0: + for table in data_asset_list_result.rows: + columns = await DataAssetDao.getColumnsByTable(query_db, table['astNo']) + table['children'] = columns + return data_asset_list_result + @classmethod async def batch_process_data_asset_services( cls, query_db: AsyncSession, batch_object: DataAssetBatchModel diff --git a/vue-fastapi-frontend/src/api/dataAsset/assetDetail.js b/vue-fastapi-frontend/src/api/dataAsset/assetDetail.js index 91b047b..b6a498f 100644 --- a/vue-fastapi-frontend/src/api/dataAsset/assetDetail.js +++ b/vue-fastapi-frontend/src/api/dataAsset/assetDetail.js @@ -8,6 +8,14 @@ export function getSearch(params) { }) } +export function getSearchWithColumns(params) { + return request({ + url: '/default-api/system/dataAsset/listWithColumns', + method: 'get', + params, + }) +} + export function batch(data) { return request({ url: '/default-api/system/dataAsset/batch', diff --git a/vue-fastapi-frontend/src/views/dataint/cypz/cdplb.vue b/vue-fastapi-frontend/src/views/dataint/cypz/cdplb.vue index c3a4a39..6a4b1c8 100644 --- a/vue-fastapi-frontend/src/views/dataint/cypz/cdplb.vue +++ b/vue-fastapi-frontend/src/views/dataint/cypz/cdplb.vue @@ -1,15 +1,20 @@ \ No newline at end of file