You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

654 lines
30 KiB

from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
from exceptions.exception import ServiceException
from module_admin.dao.metaSecurity_dao import MetaSecurityDao
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel,DeleteMetaSecurityModel,MetaSecurityApiModel
from utils.common_util import CamelCaseUtil
import uuid
from module_admin.dao.login_dao import login_by_account
from module_admin.dao.user_dao import UserDao
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text
from config.env import AppConfig
import requests
2 months ago
from sqlalchemy.exc import OperationalError
import json
import re
from decimal import Decimal
class MetaSecurityService:
"""
数据源安全管理模块服务层
"""
@classmethod
async def get_meta_security_col_list_services(
cls, query_db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False
):
"""
获取列配置列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 列配置列表信息对象
"""
col_list_result = await MetaSecurityDao.get_meta_security_col_list(query_db, query_object, is_page)
return col_list_result
@classmethod
async def get_meta_security_row_list_services(
cls, query_db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False
):
"""
获取行配置列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 行配置列表信息对象
"""
row_list_result = await MetaSecurityDao.get_meta_security_row_list(query_db, query_object, is_page)
return row_list_result
@classmethod
async def get_meta_security_col_by_id_services(cls, query_db: AsyncSession, colId: str):
"""
获取列配置详细信息service
:param query_db: orm对象
:param colId: 列配置ID
:return: 列配置详细信息对象
"""
col = await MetaSecurityDao.get_meta_security_col_by_id(query_db, colId)
if col:
result = MetaSecurityColModel(**CamelCaseUtil.transform_result(col))
else:
result = MetaSecurityColModel(**dict())
return result
@classmethod
async def get_meta_security_row_by_id_services(cls, query_db: AsyncSession, rowId: str):
"""
获取行配置详细信息service
:param query_db: orm对象
:param rowId: 行配置ID
:return: 行配置详细信息对象
"""
row = await MetaSecurityDao.get_meta_security_row_by_id(query_db, rowId)
if row:
result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(row))
else:
result = MetaSecurityRowModel(**dict())
return result
@classmethod
async def add_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel):
"""
新增列配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 新增的列配置对象
:return: 新增列配置校验结果
"""
try:
if isinstance(page_object.obj_value, str) and page_object.obj_value:
obj_values = page_object.obj_value.split(",")
obj_names = page_object.obj_name.split(",")
for value, name in zip(obj_values, obj_names):
# 创建新的 page_object 实例,避免修改原始对象
new_page_object = MetaSecurityColModel(**page_object.model_dump(by_alias=True))
new_page_object.obj_value = value.strip() # 去除空格并赋值
new_page_object.obj_name = name.strip() # 去除空格并赋值
new_page_object.colId = str(uuid.uuid4())
# 调用 DAO 方法插入数据
await MetaSecurityDao.add_meta_security_col(query_db, new_page_object)
await query_db.commit()
return CrudResponseModel(is_success=True, message='新增列配置成功')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def add_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel):
"""
新增行配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 新增的行配置对象
:return: 新增行配置校验结果
"""
try:
if isinstance(page_object.obj_value, str) and page_object.obj_value:
obj_values = page_object.obj_value.split(",")
obj_names = page_object.obj_name.split(",")
for value, name in zip(obj_values, obj_names):
# 创建新的 page_object 实例,避免修改原始对象
new_page_object = MetaSecurityRowModel(**page_object.model_dump(by_alias=True))
new_page_object.obj_value = value.strip() # 去除空格并赋值
new_page_object.obj_name = name.strip() # 去除空格并赋值
new_page_object.rowId = str(uuid.uuid4())
# 调用 DAO 方法插入数据
await MetaSecurityDao.add_meta_security_row(query_db, new_page_object)
await query_db.commit()
# 缓存相关操作,如果需要
# await request.app.state.redis.set(...)
return CrudResponseModel(is_success=True, message='新增行配置成功')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def col_detail_services(cls, query_db: AsyncSession, col: str):
"""
获取参数配置详细信息service
:param query_db: orm对象
:param config_id: 参数配置id
:return: 参数配置id对应的信息
"""
config = await MetaSecurityDao.get_meta_security_col_by_id(query_db, col)
if config:
result = MetaSecurityColModel(**CamelCaseUtil.transform_result(config))
else:
result = MetaSecurityColModel(**dict())
return result
@classmethod
async def row_detail_services(cls, query_db: AsyncSession, row_id: str):
"""
获取参数配置详细信息service
:param query_db: orm对象
:param config_id: 参数配置id
:return: 参数配置id对应的信息
"""
config = await MetaSecurityDao.get_meta_security_row_by_id(query_db, row_id)
if config:
result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(config))
else:
result = MetaSecurityRowModel(**dict())
return result
@classmethod
async def edit_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel):
"""
编辑列配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 编辑的列配置对象
:return: 编辑列配置校验结果
"""
edit_col = page_object.model_dump(exclude_unset=True)
col_info = await cls.get_meta_security_col_by_id_services(query_db, page_object.colId)
if col_info:
try:
await MetaSecurityDao.update_meta_security_col(query_db, edit_col)
await query_db.commit()
# 缓存更新,如果需要
# await request.app.state.redis.set(...)
return CrudResponseModel(is_success=True, message='编辑列配置成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message=f'列配置{page_object.colId}不存在')
@classmethod
async def edit_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel):
"""
编辑行配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 编辑的行配置对象
:return: 编辑行配置校验结果
"""
edit_row = page_object.model_dump(exclude_unset=True)
row_info = await cls.get_meta_security_row_by_id_services(query_db, page_object.rowId)
if row_info:
try:
await MetaSecurityDao.update_meta_security_row(query_db, edit_row)
await query_db.commit()
return CrudResponseModel(is_success=True, message='编辑行配置成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message=f'行配置{page_object.rowId}不存在')
@classmethod
async def delete_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel):
"""
删除列配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 删除列配置对象
:return: 删除列配置校验结果
"""
if page_object.metaSecurity_ids:
col_id_list = page_object.metaSecurity_ids.split(',')
try:
for col_id in col_id_list:
col_info = await cls.get_meta_security_col_by_id_services(query_db, col_id)
if col_info:
# 校验不能删除的系统内置列
await MetaSecurityDao.delete_meta_security_col(query_db, col_id)
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除列配置成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入列配置ID为空')
@classmethod
async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel):
"""
删除行配置服务
:param request: Request对象
:param query_db: orm对象
:param page_object: 删除行配置对象
:return: 删除行配置校验结果
"""
if page_object.metaSecurity_ids:
row_id_list = page_object.metaSecurity_ids.split(',')
try:
for row_id in row_id_list:
row_info = await cls.get_meta_security_row_by_id_services(query_db, row_id)
if row_info:
await MetaSecurityDao.delete_meta_security_row(query_db, row_id)
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除行配置成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入行配置ID为空')
@classmethod
async def getMetaSercuitybysql(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityApiModel):
#1.校验用户
if not page_object.username:
raise ServiceException(data='', message='用户名不能为空!')
user = await login_by_account(query_db, page_object.username)
if not user:
raise ServiceException(data='', message='用户不存在')
if not page_object.password == user[0].password:
raise ServiceException(data='', message='用户密码错误!')
query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id)
role_id_list = [item.role_id for item in query_user.get('user_role_info')]
#2.测试数据源连接是否正常
# mysql
# dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"}
# postgresql
# dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"}
dsDataResource=await get_data_source_tree(request,page_object)
# dbConnent= cls.get_db_engine("postgresql",dataParams])
dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"])
2 months ago
# await test_connection(dbConnent)
#3.执行原始sql
result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始")
if 3 in role_id_list:
resultDict={
"ctrlSql": page_object.sqlStr,
"data": result,
"message":"数据安全管理员权限"
}
return resultDict
#4.获取sql中涉及的表名
sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr)
#5.根据表名获取数据库中的字段名
table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],)
#6.查询用户及该用户角色下的所有行列配置
tablesRowCol = {}
# 遍历每个表名,获取对应的配置
for table_name in sqlTableNames:
table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name)
tablesRowCol[table_name] = table_configs
2 months ago
# 返回最终的结果字典
ctrSqlDict = await generate_sql(tablesRowCol,table_columns)
oldStrSql= page_object.sqlStr
#7.根据行列配置控制原始sql
newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql)
#8.执行结果
2 months ago
result = await cls.execute_sql(dbConnent, newStrSql,"控制后")
resultDict={
"ctrlSql": newStrSql,
"data": result,
"tablesRowCol":tablesRowCol
}
return resultDict
def get_db_engine(db_type: str, db_params: dict):
try:
address = db_params['address']
jdbc_prefixes = {
"jdbc:mysql://": len("jdbc:mysql://"),
"jdbc:postgresql://": len("jdbc:postgresql://")
}
# Check and remove the matching prefix
for prefix, length in jdbc_prefixes.items():
if address.startswith(prefix):
address = address[length:]
db_params['address']=address
break # Once the correct prefix is found, exit the loop
if db_type.lower() == "mysql":
conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}"
print(f"数据库连接字符串: {conn_str}") # 输出调试信息
2 months ago
dbContent= create_async_engine(conn_str)
return dbContent
elif db_type.lower() == "postgresql":
2 months ago
dbContent= create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}")
return dbContent
else:
2 months ago
raise ValueError("不支持的数据库类型")
except SQLAlchemyError as e:
# 捕获SQLAlchemy相关的数据库连接错误
raise ConnectionError(f"数据库连接失败: {e}")
except Exception as e:
# 捕获其他非预期的错误
raise RuntimeError(f"连接过程中发生了未知错误: {e}")
@classmethod
async def execute_sql(cls, dbConnent, sql_query: str,sql_type: str):
# 创建异步会话
async with dbConnent.begin():
# 获取会话对象
async_session = sessionmaker(
dbConnent, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
try:
# 执行原始SQL查询
query = text(sql_query)
result = await session.execute(query)
# 获取所有结果
rows = result.fetchall()
# 获取列名
columns = result.keys()
# 将每一行转化为字典,键为列名
result_dict = [dict(zip(columns, row)) for row in rows]
# # 使用 convert_decimal 处理数据
# result_dict = [convert_decimal(row) for row in result_dict]
# 转换为 JSON 字符串
return result_dict
except SQLAlchemyError as e:
raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}")
async def get_tables_from_sql(sql_query:str):
table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))"
table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE)
table_names = [match[1] for match in table_matches]
table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名
return table_names
@classmethod
async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str):
"""查询每个表的字段信息,根据数据库类型调整查询"""
columns = {}
query=""
for table_name in table_names:
if db_type.lower() == "postgresql":
# PostgreSQL: 使用 information_schema.columns 查询字段
query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'"
elif db_type.lower() == "mysql":
# MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段
query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'"
else:
raise ValueError(f"暂不支持数据库类型: {db_type}")
# Execute the query for the specific table
result = await cls.execute_sql(dbConnent, query, "字段查询")
# 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]}
columns[table_name] = [row["column_name"] for row in result]
return columns
def convert_decimal(obj):
if isinstance(obj, Decimal):
return float(obj) # 或者 str(obj) 来保留精度
elif isinstance(obj, dict):
# 递归处理字典中的每个值
return {key: convert_decimal(value) for key, value in obj.items()}
elif isinstance(obj, list):
# 递归处理列表中的每个元素
return [convert_decimal(item) for item in obj]
return obj # 返回非 Decimal、dict 或 list 的值
async def get_table_configs(query_db, page_object, user, role_id_list, table_name):
# 获取用户的列配置
user_col_list = await MetaSecurityDao.get_api_col_list(
query_db, page_object.dbRId, table_name, '0', user[0].user_id
)
# 获取角色的列配置
role_col_list = []
for role_id in role_id_list:
role_cols = await MetaSecurityDao.get_api_col_list(
query_db, page_object.dbRId, table_name, '1', role_id
)
role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中
# 获取用户的行配置
user_row_list = await MetaSecurityDao.get_api_row_list(
query_db, page_object.dbRId, table_name, '0', user[0].user_id
)
# 获取角色的行配置
role_row_list = []
for role_id in role_id_list:
role_rows = await MetaSecurityDao.get_api_row_list(
query_db, page_object.dbRId, table_name, '1', role_id
)
role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中
2 months ago
isHave = any([
user_col_list,
role_col_list,
user_row_list,
role_row_list
])
return {
"user_col_list": user_col_list,
"role_col_list": role_col_list,
"role_row_list": role_row_list,
2 months ago
"user_row_list": user_row_list,
"isHave":isHave
}
async def generate_sql(tablesRowCol:dict, table_columns:dict):
sql_queries = {}
2 months ago
# 1. 列控制
# 遍历每个表
2 months ago
isHave=False
no_configTable_name=""
for table_name, table_configs in tablesRowCol.items():
if table_configs["isHave"]:
isHave=True
else:
no_configTable_name += table_name + ","
if not isHave:
no_configTable_name = no_configTable_name.rstrip(',')
raise ValueError(f"表:{no_configTable_name}均未配置行列数据安全")
for table_name, config in tablesRowCol.items():
# 获取该表的字段名
columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写
# 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名
select_columns = {col: f"null {col}" for col in columns}
# 处理角色列配置
for col in config["role_col_list"]:
# If dbCName is "ALL", handle it as a special case
if col.dbCName == "ALL":
if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null
for db_column in columns: # Assuming 'user' is the table name
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names
for db_column in columns:
select_columns[db_column] = db_column # 使用实际字段名
else:
# Handle specific columns listed in dbCName
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
for db_column in db_columns:
db_column = db_column.strip()
if db_column in columns: # Check if the column exists in the table
if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name
select_columns[db_column] = db_column # 使用实际字段名
# 处理用户列配置
for col in config["user_col_list"]:
if col.dbCName == "ALL": # 如果 dbCName 为 "ALL"
if col.ctrl_type == "0": # ctrlType 为 0,字符串字段
for db_column in columns: # 对所有字段加上 null
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段
for db_column in columns: # 使用实际字段名,不加 null
select_columns[db_column] = db_column # 使用实际字段名
else: # 处理 dbCName 不为 "ALL" 的情况
db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")]
for db_column in db_columns:
db_column = db_column.strip()
if db_column in columns:
if col.ctrl_type == "0":
select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀
elif col.ctrl_type == "1":
select_columns[db_column] = db_column # 使用实际字段名
# 生成 SQL 查询
sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}"
# 2.行控制
select_rows={}
# 处理角色行配置
for row in config["role_row_list"]:
# 仅仅对固定值有效,不加行限制
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
# 控制方式 --固定值
select_rows[row.dbCName] = ""
else:
if row.ctrl_type == '0':
# row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句
if "," in row.ctrl_value:
# 将 ctrl_value 按逗号分割,并用单引号包裹每个值
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
else:
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
if row.ctrl_type == '1':
tab_col_value=row.ctrl_value.split(".")
if len(tab_col_value) != 2:
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
# 处理用户行配置
for row in config["user_row_list"]:
# 仅仅对固定值有效,不加行限制
if row.ctrl_value == "ALL" and row.ctrl_type == '0':
# 控制方式 --固定值
select_rows[row.dbCName] = ""
else:
if row.ctrl_type == '0':
# row.obj_value 是逗号分隔的字符串时,改为 IN 语句
if "," in row.ctrl_value:
# 将 obj_value 按逗号分割,并用单引号包裹每个值
values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")]
select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})"
else:
select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'"
if row.ctrl_type == '1':
tab_col_value=row.ctrl_value.split(".")
if len(tab_col_value) != 2:
raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值")
select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})"
2 months ago
if select_rows.values():
where_conditions = " AND ".join(select_rows.values())
2 months ago
if where_conditions:
sql_queries[table_name] += " WHERE " + where_conditions
return sql_queries
async def replace_table_with_subquery(ctrSqlDict, oldStrSql):
2 months ago
table_alias_map = {} # 存储表名和别名的映射
for table_name, subquery in ctrSqlDict.items():
# 构建正则表达式,匹配表名及可能的别名
pattern = (
r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选)
+ re.escape(table_name) # 转义表名
+ r'\b)' # 结束表名
r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名)
r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字
)
def replace(match):
original_table = match.group(1) # 原始表名(可能含模式名)
alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名)
alias_name = match.group(3) # 别名名称(无 AS 前缀)
2 months ago
if original_table not in table_alias_map:
# 处理表名后直接跟着 SQL 关键字的情况
sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION",
"EXCEPT", "INTERSECT", "FETCH", "OFFSET"}
if alias_name and alias_name.upper().split()[0] not in sql_keywords:
# 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名
replaced = f"({subquery}) {alias_part}"
table_alias_map[original_table] = alias_part
else:
# 无别名时,或者别名无效(如 LIMIT),添加默认别名
alias = original_table.split('.')[-1]
replaced = f"({subquery}) AS {alias}{alias_part}"
table_alias_map[original_table] = alias
else:
2 months ago
alias = table_alias_map[original_table]
replaced = f"{alias}" # 使用别名
2 months ago
return replaced
# 执行替换(忽略大小写)
oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE)
return oldStrSql
async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel):
url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100'
headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,}
response = requests.get(url, headers=headers, verify=False)
if response.reason == 'OK':
response_text = response.text
data = json.loads(response_text)
total_list = data["data"]["totalList"]
# 解析 connectionParams 字符串为字典
for item in total_list:
if item["id"]==current_user.dbRId:
item["connectionParams"] = json.loads(item["connectionParams"])
return item
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
else:
raise Exception(f'根据数据源ID:{current_user.dbRId}获取数据源信息失败,状态: {response.reason}')
2 months ago
async def test_connection(db_content):
try:
# 尝试执行一个简单的查询来测试连接
async with db_content.connect() as connection:
# 这里执行一个简单的查询,例如“SELECT 1”
await connection.scalar("SELECT 1")
except Exception as e:
raise Exception("数据源连接失败") from e