diff --git a/vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py b/vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py index aba191f..52a16e4 100644 --- a/vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py +++ b/vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py @@ -88,7 +88,6 @@ class MetaSecurityApiModel(BaseModel): dbRId: Optional[int] = None username: Optional[str] = Field(default=None, description='用户名称') password: Optional[str] = Field(default=None, description='用户密码') - tableName: Optional[str] = Field(default=None, description='表名') sqlStr: Optional[str] = Field(default=None, description='sql') @NotBlank(field_name='username', message='用户名称不能为空') @@ -98,14 +97,10 @@ class MetaSecurityApiModel(BaseModel): @NotBlank(field_name='password', message='用户密码不能为空') def get_password(self): return self.password - @NotBlank(field_name='tableName', message='表名不能为空') - def get_tableName(self): - return self.tableName @NotBlank(field_name='sqlStr', message='sql不能为空') def get_sqlStr(self): return self.username def validate_fields(self): self.get_username() self.get_password() - self.get_tableName() self.get_sqlStr() diff --git a/vue-fastapi-backend/module_admin/service/metasecurity_service.py b/vue-fastapi-backend/module_admin/service/metasecurity_service.py index 6ff7298..f038fe5 100644 --- a/vue-fastapi-backend/module_admin/service/metasecurity_service.py +++ b/vue-fastapi-backend/module_admin/service/metasecurity_service.py @@ -12,7 +12,12 @@ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import text +from config.env import AppConfig +import requests + import json +import re +from decimal import Decimal class MetaSecurityService: """ @@ -249,7 +254,7 @@ class MetaSecurityService: raise e else: raise ServiceException(message='传入列配置ID为空') - + @classmethod async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ @@ -286,54 +291,64 @@ class MetaSecurityService: raise ServiceException(data='', message='用户密码错误!') query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id) role_id_list = [item.role_id for item in query_user.get('user_role_info')] - #2.查询用户及该用户角色下的所有行列配置 - # 列配置 - user_col_list=await MetaSecurityDao.get_api_col_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id) - role_col_list = [] - for role_id in role_id_list: - role_cols = await MetaSecurityDao.get_api_col_list( - query_db, page_object.dbRId, page_object.tableName, '1', role_id - ) - role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中 + #2.测试数据源连接是否正常 + # mysql + # dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"} + # postgresql + # dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"} + dsDataResource=await get_data_source_tree(request,page_object) + # dbConnent= cls.get_db_engine("postgresql",dataParams]) + dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"]) + #3.执行原始sql + result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始") + #4.获取sql中涉及的表名 + sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr) + #5.根据表名获取数据库中的字段名 + table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],) - # 行配置 - user_row_list=await MetaSecurityDao.get_api_row_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id) - role_row_list = [] - for role_id in role_id_list: - role_rows = await MetaSecurityDao.get_api_row_list( - query_db, page_object.dbRId, page_object.tableName, '1', role_id - ) - role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中 - result = { - "user_col_list": user_col_list, - "role_col_list": role_col_list, - "role_row_list": role_row_list, - "user_row_list": user_row_list - } - # return result - #3.根据行列配置控制原始sql - #4.测试数据源连接是否正常 - dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"} - dbConnent = cls.get_db_engine('mysql',dataParams) - query = "SELECT * FROM msq_table_constraints" - result = await cls.execute_sql(dbConnent, query) - return result - #5.执行原始sql - #6.执行控制后的sql - #7.执行结果 + #6.查询用户及该用户角色下的所有行列配置 + tablesRowCol = {} + + # 遍历每个表名,获取对应的配置 + for table_name in sqlTableNames: + table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name) + tablesRowCol[table_name] = table_configs + + # 返回最终的结果字典 + ctrSqlDict = await generate_sql(tablesRowCol,table_columns) + oldStrSql= page_object.sqlStr + #7.根据行列配置控制原始sql + newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql) + #8.执行结果 + result = await cls.execute_sql(dbConnent, page_object.sqlStr,"控制后") + resultDict={ + "ctrlSql": newStrSql, + "data": result, + "tablesRowCol":tablesRowCol + } + return resultDict + + def get_db_engine(db_type: str, db_params: dict): try: - if db_type == "mysql": - address = db_params['address'] - if address.startswith("jdbc:mysql://"): - # 去掉 jdbc:mysql:// 部分 - address = address[len("jdbc:mysql://"):] - db_params['address'] = address + address = db_params['address'] + jdbc_prefixes = { + "jdbc:mysql://": len("jdbc:mysql://"), + "jdbc:postgresql://": len("jdbc:postgresql://") + } + + # Check and remove the matching prefix + for prefix, length in jdbc_prefixes.items(): + if address.startswith(prefix): + address = address[length:] + db_params['address']=address + break # Once the correct prefix is found, exit the loop + if db_type.lower() == "mysql": conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}" print(f"数据库连接字符串: {conn_str}") # 输出调试信息 return create_async_engine(conn_str) - elif db_type == "postgresql": + elif db_type.lower() == "postgresql": return create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}") # 你可以根据需求添加更多数据库类型 else: @@ -366,8 +381,213 @@ class MetaSecurityService: # 将每一行转化为字典,键为列名 result_dict = [dict(zip(columns, row)) for row in rows] - + # # 使用 convert_decimal 处理数据 + # result_dict = [convert_decimal(row) for row in result_dict] # 转换为 JSON 字符串 - return json.dumps(result_dict, ensure_ascii=False, indent=4) + return result_dict except SQLAlchemyError as e: - raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}") \ No newline at end of file + raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}") + async def get_tables_from_sql(sql_query:str): + table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))" + + table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE) + + table_names = [match[1] for match in table_matches] + table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名 + + return table_names + + @classmethod + async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str): + """查询每个表的字段信息,根据数据库类型调整查询""" + columns = {} + query="" + for table_name in table_names: + if db_type.lower() == "postgresql": + # PostgreSQL: 使用 information_schema.columns 查询字段 + query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'" + elif db_type.lower() == "mysql": + # MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段 + query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'" + else: + raise ValueError(f"暂不支持数据库类型: {db_type}") + + # Execute the query for the specific table + result = await cls.execute_sql(dbConnent, query, "字段查询") + + # 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]} + columns[table_name] = [row["column_name"] for row in result] + + return columns + +def convert_decimal(obj): + if isinstance(obj, Decimal): + return float(obj) # 或者 str(obj) 来保留精度 + elif isinstance(obj, dict): + # 递归处理字典中的每个值 + return {key: convert_decimal(value) for key, value in obj.items()} + elif isinstance(obj, list): + # 递归处理列表中的每个元素 + return [convert_decimal(item) for item in obj] + return obj # 返回非 Decimal、dict 或 list 的值 +async def get_table_configs(query_db, page_object, user, role_id_list, table_name): + # 获取用户的列配置 + user_col_list = await MetaSecurityDao.get_api_col_list( + query_db, page_object.dbRId, table_name, '0', user[0].user_id + ) + + # 获取角色的列配置 + role_col_list = [] + for role_id in role_id_list: + role_cols = await MetaSecurityDao.get_api_col_list( + query_db, page_object.dbRId, table_name, '1', role_id + ) + role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中 + + # 获取用户的行配置 + user_row_list = await MetaSecurityDao.get_api_row_list( + query_db, page_object.dbRId, table_name, '0', user[0].user_id + ) + + # 获取角色的行配置 + role_row_list = [] + for role_id in role_id_list: + role_rows = await MetaSecurityDao.get_api_row_list( + query_db, page_object.dbRId, table_name, '1', role_id + ) + role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中 + return { + "user_col_list": user_col_list, + "role_col_list": role_col_list, + "role_row_list": role_row_list, + "user_row_list": user_row_list + } +async def generate_sql(tablesRowCol:dict, table_columns:dict): + sql_queries = {} + + # 1. 列控制 + # 遍历每个表 + for table_name, config in tablesRowCol.items(): + # 获取该表的字段名 + columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写 + + # 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名 + select_columns = {col: f"null {col}" for col in columns} + + # 处理角色列配置 + for col in config["role_col_list"]: + # If dbCName is "ALL", handle it as a special case + if col.dbCName == "ALL": + if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null + for db_column in columns: # Assuming 'user' is the table name + select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names + for db_column in columns: + select_columns[db_column] = db_column # 使用实际字段名 + else: + # Handle specific columns listed in dbCName + db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] + for db_column in db_columns: + db_column = db_column.strip() + if db_column in columns: # Check if the column exists in the table + if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null + select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name + select_columns[db_column] = db_column # 使用实际字段名 + # 处理用户列配置 + for col in config["user_col_list"]: + if col.dbCName == "ALL": # 如果 dbCName 为 "ALL" + if col.ctrl_type == "0": # ctrlType 为 0,字符串字段 + for db_column in columns: # 对所有字段加上 null + select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段 + for db_column in columns: # 使用实际字段名,不加 null + select_columns[db_column] = db_column # 使用实际字段名 + else: # 处理 dbCName 不为 "ALL" 的情况 + db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] + for db_column in db_columns: + db_column = db_column.strip() + if db_column in columns: + if col.ctrl_type == "0": + select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 + elif col.ctrl_type == "1": + select_columns[db_column] = db_column # 使用实际字段名 + # 生成 SQL 查询 + sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}" + # 2.行控制 + select_rows={} + # 处理角色行配置 + for row in config["role_row_list"]: + # 仅仅对固定值有效,不加行限制 + if row.ctrl_value == "ALL" and row.ctrl_type == '0': + # 控制方式 --固定值 + select_rows[row.dbCName] = "" + else: + if row.ctrl_type == '0': + # row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句 + if "," in row.ctrl_value: + # 将 ctrl_value 按逗号分割,并用单引号包裹每个值 + values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] + select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" + else: + select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" + if row.ctrl_type == '1': + tab_col_value=row.ctrl_value.split(".") + if len(tab_col_value) != 2: + raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") + select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" + # 处理用户行配置 + for row in config["user_row_list"]: + # 仅仅对固定值有效,不加行限制 + if row.ctrl_value == "ALL" and row.ctrl_type == '0': + # 控制方式 --固定值 + select_rows[row.dbCName] = "" + else: + if row.ctrl_type == '0': + # row.obj_value 是逗号分隔的字符串时,改为 IN 语句 + if "," in row.ctrl_value: + # 将 obj_value 按逗号分割,并用单引号包裹每个值 + values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] + select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" + else: + select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" + if row.ctrl_type == '1': + tab_col_value=row.ctrl_value.split(".") + if len(tab_col_value) != 2: + raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") + select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" + if select_rows: + where_conditions = " AND ".join(select_rows.values()) + sql_queries[table_name] += " WHERE " + where_conditions + return sql_queries + +async def replace_table_with_subquery(ctrSqlDict, oldStrSql): + # 遍历 ctrSqlDict 并替换 SQL 查询中的表名 + for table_name, subquery in ctrSqlDict.items(): + # 创建一个正则表达式,匹配原始 SQL 中的表名(注意大小写问题,正则会忽略大小写) + # 匹配类似 "模式名.tab1" 或 "tab1" 的表名 + table_name_pattern = r'\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' + re.escape(table_name) + r'\b' + + # 替换原始 SQL 中的表名为对应的子查询 + oldStrSql = re.sub(table_name_pattern, f"({subquery})", oldStrSql, flags=re.IGNORECASE) + + return oldStrSql + +async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel): + + url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100' + headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,} + response = requests.get(url, headers=headers) + if response.reason == 'OK': + response_text = response.text + data = json.loads(response_text) + total_list = data["data"]["totalList"] + # 解析 connectionParams 字符串为字典 + for item in total_list: + if item["id"]==current_user.dbRId: + item["connectionParams"] = json.loads(item["connectionParams"]) + return item + raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}') + + else: + raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}') diff --git a/vue-fastapi-backend/requirements.txt b/vue-fastapi-backend/requirements.txt index c09d7b8..a45da77 100644 --- a/vue-fastapi-backend/requirements.txt +++ b/vue-fastapi-backend/requirements.txt @@ -15,3 +15,5 @@ redis==5.0.7 requests==2.32.3 SQLAlchemy[asyncio]==2.0.31 user-agents==2.2.0 +asyncpg== 0.30.0 +aiomysql==0.2.0 \ No newline at end of file diff --git a/vue-fastapi-backend/sql/t_metadata_extract_info_tt.sql b/vue-fastapi-backend/sql/t_metadata_extract_info_tt.sql new file mode 100644 index 0000000..0fbe6ae --- /dev/null +++ b/vue-fastapi-backend/sql/t_metadata_extract_info_tt.sql @@ -0,0 +1,37 @@ +/* + Navicat Premium Data Transfer + + Source Server : 治理 + Source Server Type : MySQL + Source Server Version : 80041 (8.0.41-0ubuntu0.22.04.1) + Source Host : 47.113.147.166:3306 + Source Schema : dash_test_w + + Target Server Type : MySQL + Target Server Version : 80041 (8.0.41-0ubuntu0.22.04.1) + File Encoding : 65001 + + Date: 09/02/2025 23:39:48 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for t_metadata_extract_info_tt +-- ---------------------------- +DROP TABLE IF EXISTS `t_metadata_extract_info_tt`; +CREATE TABLE `t_metadata_extract_info_tt` ( + `extract_ver_num` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, + `ver_desc` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '版本描述', + `ssys_cd` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '源系统代码', + `data_whs_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '数据库名称', + `mdl_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '模式名称', + `tab_no` int NULL DEFAULT NULL COMMENT '表编号', + `tab_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '表类型', + `tab_eng_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '表英文名称', + `tab_cn_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '表中文名称', + `tab_rec_num` int NULL DEFAULT NULL COMMENT '表记录数' +) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; + +SET FOREIGN_KEY_CHECKS = 1;