Browse Source

数据安全接口提交

master
si@aidatagov.com 2 months ago
parent
commit
e4d9405f0b
  1. 5
      vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py
  2. 302
      vue-fastapi-backend/module_admin/service/metasecurity_service.py
  3. 2
      vue-fastapi-backend/requirements.txt
  4. 37
      vue-fastapi-backend/sql/t_metadata_extract_info_tt.sql

5
vue-fastapi-backend/module_admin/entity/vo/metasecurity_vo.py

@ -88,7 +88,6 @@ class MetaSecurityApiModel(BaseModel):
dbRId: Optional[int] = None
username: Optional[str] = Field(default=None, description='用户名称')
password: Optional[str] = Field(default=None, description='用户密码')
tableName: Optional[str] = Field(default=None, description='表名')
sqlStr: Optional[str] = Field(default=None, description='sql')
@NotBlank(field_name='username', message='用户名称不能为空')
@ -98,14 +97,10 @@ class MetaSecurityApiModel(BaseModel):
@NotBlank(field_name='password', message='用户密码不能为空')
def get_password(self):
return self.password
@NotBlank(field_name='tableName', message='表名不能为空')
def get_tableName(self):
return self.tableName
@NotBlank(field_name='sqlStr', message='sql不能为空')
def get_sqlStr(self):
return self.username
def validate_fields(self):
self.get_username()
self.get_password()
self.get_tableName()
self.get_sqlStr()

302
vue-fastapi-backend/module_admin/service/metasecurity_service.py

@ -12,7 +12,12 @@ from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text
from config.env import AppConfig
import requests
import json
import re
from decimal import Decimal
class MetaSecurityService:
"""
@ -286,54 +291,64 @@ class MetaSecurityService:
raise ServiceException(data='', message='用户密码错误!')
query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id)
role_id_list = [item.role_id for item in query_user.get('user_role_info')]
#2.查询用户及该用户角色下的所有行列配置
# 列配置
user_col_list=await MetaSecurityDao.get_api_col_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id)
role_col_list = []
for role_id in role_id_list:
role_cols = await MetaSecurityDao.get_api_col_list(
query_db, page_object.dbRId, page_object.tableName, '1', role_id
)
role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中
# 行配置
user_row_list=await MetaSecurityDao.get_api_row_list(query_db, page_object.dbRId,page_object.tableName, '0',user[0].user_id)
role_row_list = []
for role_id in role_id_list:
role_rows = await MetaSecurityDao.get_api_row_list(
query_db, page_object.dbRId, page_object.tableName, '1', role_id
)
role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中
result = {
"user_col_list": user_col_list,
"role_col_list": role_col_list,
"role_row_list": role_row_list,
"user_row_list": user_row_list
#2.测试数据源连接是否正常
# mysql
# dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"}
# postgresql
# dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"}
dsDataResource=await get_data_source_tree(request,page_object)
# dbConnent= cls.get_db_engine("postgresql",dataParams])
dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"])
#3.执行原始sql
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始")
#4.获取sql中涉及的表名
sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr)
#5.根据表名获取数据库中的字段名
table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],)
#6.查询用户及该用户角色下的所有行列配置
tablesRowCol = {}
# 遍历每个表名,获取对应的配置
for table_name in sqlTableNames:
table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name)
tablesRowCol[table_name] = table_configs
# 返回最终的结果字典
ctrSqlDict = await generate_sql(tablesRowCol,table_columns)
oldStrSql= page_object.sqlStr
#7.根据行列配置控制原始sql
newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql)
#8.执行结果
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"控制后")
resultDict={
"ctrlSql": newStrSql,
"data": result,
"tablesRowCol":tablesRowCol
}
# return result
#3.根据行列配置控制原始sql
#4.测试数据源连接是否正常
dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"}
dbConnent = cls.get_db_engine('mysql',dataParams)
query = "SELECT * FROM msq_table_constraints"
result = await cls.execute_sql(dbConnent, query)
return result
#5.执行原始sql
#6.执行控制后的sql
#7.执行结果
return resultDict
def get_db_engine(db_type: str, db_params: dict):
try:
if db_type == "mysql":
address = db_params['address']
if address.startswith("jdbc:mysql://"):
# 去掉 jdbc:mysql:// 部分
address = address[len("jdbc:mysql://"):]
jdbc_prefixes = {
"jdbc:mysql://": len("jdbc:mysql://"),
"jdbc:postgresql://": len("jdbc:postgresql://")
}
# Check and remove the matching prefix
for prefix, length in jdbc_prefixes.items():
if address.startswith(prefix):
address = address[length:]
db_params['address']=address
break # Once the correct prefix is found, exit the loop
if db_type.lower() == "mysql":
conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}"
print(f"数据库连接字符串: {conn_str}") # 输出调试信息
return create_async_engine(conn_str)
elif db_type == "postgresql":
elif db_type.lower() == "postgresql":
return create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}")
# 你可以根据需求添加更多数据库类型
else:
@ -366,8 +381,213 @@ class MetaSecurityService:
# 将每一行转化为字典,键为列名
result_dict = [dict(zip(columns, row)) for row in rows]
# # 使用 convert_decimal 处理数据
# result_dict = [convert_decimal(row) for row in result_dict]
# 转换为 JSON 字符串
return json.dumps(result_dict, ensure_ascii=False, indent=4)
return result_dict
except SQLAlchemyError as e:
raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}")
async def get_tables_from_sql(sql_query:str):
table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))"
table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE)
table_names = [match[1] for match in table_matches]
table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名
return table_names
@classmethod
async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str):
"""查询每个表的字段信息,根据数据库类型调整查询"""
columns = {}
query=""
for table_name in table_names:
if db_type.lower() == "postgresql":
# PostgreSQL: 使用 information_schema.columns 查询字段
query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'"
elif db_type.lower() == "mysql":
# MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段
query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'"
else:
raise ValueError(f"暂不支持数据库类型: {db_type}")
# Execute the query for the specific table
result = await cls.execute_sql(dbConnent, query, "字段查询")
# 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]}
columns[table_name] = [row["column_name"] for row in result]
return columns
def convert_decimal(obj):
if isinstance(obj, Decimal):
return float(obj) # 或者 str(obj) 来保留精度
elif isinstance(obj, dict):
# 递归处理字典中的每个值
return {key: convert_decimal(value) for key, value in obj.items()}
elif isinstance(obj, list):
# 递归处理列表中的每个元素
return [convert_decimal(item) for item in obj]
return obj # 返回非 Decimal、dict 或 list 的值
async def get_table_configs(query_db, page_object, user, role_id_list, table_name):
# 获取用户的列配置
user_col_list = await MetaSecurityDao.get_api_col_list(
query_db, page_object.dbRId, table_name, '0', user[0].user_id
)
# 获取角色的列配置
role_col_list = []
for role_id in role_id_list:
role_cols = await MetaSecurityDao.get_api_col_list(
query_db, page_object.dbRId, table_name, '1', role_id
)
role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中
# 获取用户的行配置
user_row_list = await MetaSecurityDao.get_api_row_list(
query_db, page_object.dbRId, table_name, '0', user[0].user_id
)
# 获取角色的行配置
role_row_list = []
for role_id in role_id_list:
role_rows = await MetaSecurityDao.get_api_row_list(
query_db, page_object.dbRId, table_name, '1', role_id
)
role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中
return {
"user_col_list": user_col_list,
"role_col_list": role_col_list,
"role_row_list": role_row_list,
"user_row_list": user_row_list
}
async def generate_sql(tablesRowCol:dict, table_columns:dict):
sql_queries = {}
# 1. 列控制
# 遍历每个表
for table_name, config in tablesRowCol.items():
# 获取该表的字段名
columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写
# 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名
select_columns = {col: f"null {col}" for col in columns}
# 处理角色列配置
for col in config["role_col_list"]:
# If dbCName is "ALL", handle it as a special case
if col.dbCName == "ALL":
if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null
for db_column in columns: # Assuming 'user' is the table name
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names
for db_column in columns:
select_columns[db_column] = db_column # 使用实际字段名
else:
# Handle specific columns listed in dbCName
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
for db_column in db_columns:
db_column = db_column.strip()
if db_column in columns: # Check if the column exists in the table
if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name
select_columns[db_column] = db_column # 使用实际字段名
# 处理用户列配置
for col in config["user_col_list"]:
if col.dbCName == "ALL": # 如果 dbCName 为 "ALL"
if col.ctrl_type == "0": # ctrlType 为 0,字符串字段
for db_column in columns: # 对所有字段加上 null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段
for db_column in columns: # 使用实际字段名,不加 null
select_columns[db_column] = db_column # 使用实际字段名
else: # 处理 dbCName 不为 "ALL" 的情况
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
for db_column in db_columns:
db_column = db_column.strip()
if db_column in columns:
if col.ctrl_type == "0":
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1":
select_columns[db_column] = db_column # 使用实际字段名
# 生成 SQL 查询
sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}"
# 2.行控制
select_rows={}
# 处理角色行配置
for row in config["role_row_list"]:
# 仅仅对固定值有效,不加行限制
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
# 控制方式 --固定值
select_rows[row.dbCName] = ""
else:
if row.ctrl_type == '0':
# row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句
if "," in row.ctrl_value:
# 将 ctrl_value 按逗号分割,并用单引号包裹每个值
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
else:
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
if row.ctrl_type == '1':
tab_col_value=row.ctrl_value.split(".")
if len(tab_col_value) != 2:
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
# 处理用户行配置
for row in config["user_row_list"]:
# 仅仅对固定值有效,不加行限制
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
# 控制方式 --固定值
select_rows[row.dbCName] = ""
else:
if row.ctrl_type == '0':
# row.obj_value 是逗号分隔的字符串时,改为 IN 语句
if "," in row.ctrl_value:
# 将 obj_value 按逗号分割,并用单引号包裹每个值
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
else:
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
if row.ctrl_type == '1':
tab_col_value=row.ctrl_value.split(".")
if len(tab_col_value) != 2:
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
if select_rows:
where_conditions = " AND ".join(select_rows.values())
sql_queries[table_name] += " WHERE " + where_conditions
return sql_queries
async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
# 遍历 ctrSqlDict 并替换 SQL 查询中的表名
for table_name, subquery in ctrSqlDict.items():
# 创建一个正则表达式,匹配原始 SQL 中的表名(注意大小写问题,正则会忽略大小写)
# 匹配类似 "模式名.tab1" 或 "tab1" 的表名
table_name_pattern = r'\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' + re.escape(table_name) + r'\b'
# 替换原始 SQL 中的表名为对应的子查询
oldStrSql = re.sub(table_name_pattern, f"({subquery})", oldStrSql, flags=re.IGNORECASE)
return oldStrSql
async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel):
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100'
headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,}
response = requests.get(url, headers=headers)
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
total_list = data["data"]["totalList"]
# 解析 connectionParams 字符串为字典
for item in total_list:
if item["id"]==current_user.dbRId:
item["connectionParams"] = json.loads(item["connectionParams"])
return item
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
else:
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')

2
vue-fastapi-backend/requirements.txt

@ -15,3 +15,5 @@ redis==5.0.7
requests==2.32.3
SQLAlchemy[asyncio]==2.0.31
user-agents==2.2.0
asyncpg== 0.30.0
aiomysql==0.2.0

37
vue-fastapi-backend/sql/t_metadata_extract_info_tt.sql

@ -0,0 +1,37 @@
/*
Navicat Premium Data Transfer
Source Server :
Source Server Type : MySQL
Source Server Version : 80041 (8.0.41-0ubuntu0.22.04.1)
Source Host : 47.113.147.166:3306
Source Schema : dash_test_w
Target Server Type : MySQL
Target Server Version : 80041 (8.0.41-0ubuntu0.22.04.1)
File Encoding : 65001
Date: 09/02/2025 23:39:48
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for t_metadata_extract_info_tt
-- ----------------------------
DROP TABLE IF EXISTS `t_metadata_extract_info_tt`;
CREATE TABLE `t_metadata_extract_info_tt` (
`extract_ver_num` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`ver_desc` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '版本描述',
`ssys_cd` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '源系统代码',
`data_whs_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '数据库名称',
`mdl_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '模式名称',
`tab_no` int NULL DEFAULT NULL COMMENT '表编号',
`tab_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '表类型',
`tab_eng_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '表英文名称',
`tab_cn_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '表中文名称',
`tab_rec_num` int NULL DEFAULT NULL COMMENT '表记录数'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
Loading…
Cancel
Save