from fastapi import Request from sqlalchemy.ext.asyncio import AsyncSession from exceptions.exception import ServiceException from module_admin.dao.metaSecurity_dao import MetaSecurityDao from module_admin.entity.vo.common_vo import CrudResponseModel from module_admin.entity.vo.metasecurity_vo import MetaSecurityColModel, MetaSecurityRowModel,DeleteMetaSecurityModel,MetaSecurityApiModel from utils.common_util import CamelCaseUtil import uuid from module_admin.dao.login_dao import login_by_account from module_admin.dao.user_dao import UserDao from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import text from config.env import AppConfig import requests from sqlalchemy.exc import OperationalError import json import re from decimal import Decimal class MetaSecurityService: """ 数据源安全管理模块服务层 """ @classmethod async def get_meta_security_col_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityColModel, is_page: bool = False ): """ 获取列配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 列配置列表信息对象 """ col_list_result = await MetaSecurityDao.get_meta_security_col_list(query_db, query_object, is_page) return col_list_result @classmethod async def get_meta_security_row_list_services( cls, query_db: AsyncSession, query_object: MetaSecurityRowModel, is_page: bool = False ): """ 获取行配置列表信息service :param query_db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 行配置列表信息对象 """ row_list_result = await MetaSecurityDao.get_meta_security_row_list(query_db, query_object, is_page) return row_list_result @classmethod async def get_meta_security_col_by_id_services(cls, query_db: AsyncSession, colId: str): """ 获取列配置详细信息service :param query_db: orm对象 :param colId: 列配置ID :return: 列配置详细信息对象 """ col = await MetaSecurityDao.get_meta_security_col_by_id(query_db, colId) if col: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(col)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def get_meta_security_row_by_id_services(cls, query_db: AsyncSession, rowId: str): """ 获取行配置详细信息service :param query_db: orm对象 :param rowId: 行配置ID :return: 行配置详细信息对象 """ row = await MetaSecurityDao.get_meta_security_row_by_id(query_db, rowId) if row: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(row)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def add_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 新增列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的列配置对象 :return: 新增列配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityColModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.colId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_col(query_db, new_page_object) await query_db.commit() return CrudResponseModel(is_success=True, message='新增列配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def add_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 新增行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 新增的行配置对象 :return: 新增行配置校验结果 """ try: if isinstance(page_object.obj_value, str) and page_object.obj_value: obj_values = page_object.obj_value.split(",") obj_names = page_object.obj_name.split(",") for value, name in zip(obj_values, obj_names): # 创建新的 page_object 实例,避免修改原始对象 new_page_object = MetaSecurityRowModel(**page_object.model_dump(by_alias=True)) new_page_object.obj_value = value.strip() # 去除空格并赋值 new_page_object.obj_name = name.strip() # 去除空格并赋值 new_page_object.rowId = str(uuid.uuid4()) # 调用 DAO 方法插入数据 await MetaSecurityDao.add_meta_security_row(query_db, new_page_object) await query_db.commit() # 缓存相关操作,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='新增行配置成功') except Exception as e: await query_db.rollback() raise e @classmethod async def col_detail_services(cls, query_db: AsyncSession, col: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_col_by_id(query_db, col) if config: result = MetaSecurityColModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityColModel(**dict()) return result @classmethod async def row_detail_services(cls, query_db: AsyncSession, row_id: str): """ 获取参数配置详细信息service :param query_db: orm对象 :param config_id: 参数配置id :return: 参数配置id对应的信息 """ config = await MetaSecurityDao.get_meta_security_row_by_id(query_db, row_id) if config: result = MetaSecurityRowModel(**CamelCaseUtil.transform_result(config)) else: result = MetaSecurityRowModel(**dict()) return result @classmethod async def edit_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityColModel): """ 编辑列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的列配置对象 :return: 编辑列配置校验结果 """ edit_col = page_object.model_dump(exclude_unset=True) col_info = await cls.get_meta_security_col_by_id_services(query_db, page_object.colId) if col_info: try: await MetaSecurityDao.update_meta_security_col(query_db, edit_col) await query_db.commit() # 缓存更新,如果需要 # await request.app.state.redis.set(...) return CrudResponseModel(is_success=True, message='编辑列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'列配置{page_object.colId}不存在') @classmethod async def edit_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityRowModel): """ 编辑行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 编辑的行配置对象 :return: 编辑行配置校验结果 """ edit_row = page_object.model_dump(exclude_unset=True) row_info = await cls.get_meta_security_row_by_id_services(query_db, page_object.rowId) if row_info: try: await MetaSecurityDao.update_meta_security_row(query_db, edit_row) await query_db.commit() return CrudResponseModel(is_success=True, message='编辑行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message=f'行配置{page_object.rowId}不存在') @classmethod async def delete_meta_security_col_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除列配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除列配置对象 :return: 删除列配置校验结果 """ if page_object.metaSecurity_ids: col_id_list = page_object.metaSecurity_ids.split(',') try: for col_id in col_id_list: col_info = await cls.get_meta_security_col_by_id_services(query_db, col_id) if col_info: # 校验不能删除的系统内置列 await MetaSecurityDao.delete_meta_security_col(query_db, col_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除列配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入列配置ID为空') @classmethod async def delete_meta_security_row_services(cls, request: Request, query_db: AsyncSession, page_object: DeleteMetaSecurityModel): """ 删除行配置服务 :param request: Request对象 :param query_db: orm对象 :param page_object: 删除行配置对象 :return: 删除行配置校验结果 """ if page_object.metaSecurity_ids: row_id_list = page_object.metaSecurity_ids.split(',') try: for row_id in row_id_list: row_info = await cls.get_meta_security_row_by_id_services(query_db, row_id) if row_info: await MetaSecurityDao.delete_meta_security_row(query_db, row_id) await query_db.commit() return CrudResponseModel(is_success=True, message='删除行配置成功') except Exception as e: await query_db.rollback() raise e else: raise ServiceException(message='传入行配置ID为空') @classmethod async def getMetaSercuitybysql(cls, request: Request, query_db: AsyncSession, page_object: MetaSecurityApiModel): #1.校验用户 if not page_object.username: raise ServiceException(data='', message='用户名不能为空!') user = await login_by_account(query_db, page_object.username) if not user: raise ServiceException(data='', message='用户不存在') if not page_object.password == user[0].password: raise ServiceException(data='', message='用户密码错误!') query_user = await UserDao.get_user_by_id(query_db, user_id=user[0].user_id) role_id_list = [item.role_id for item in query_user.get('user_role_info')] #2.测试数据源连接是否正常 # mysql # dataParams ={"user":"dbf","password":"1q2w3e4r","address":"jdbc:mysql://47.113.147.166:3306","database":"dash_test_w","jdbcUrl":"jdbc:mysql://47.113.147.166:3306/dash_test_w","driverClassName":"com.mysql.cj.jdbc.Driver","validationQuery":"select 1"} # postgresql # dataParams ={"user":"testuser","password":"testpd","address":"jdbc:postgresql://47.121.207.11:5432","database":"zx2","jdbcUrl":"jdbc:postgresql://47.121.207.11:5432/zx2","driverClassName":"org.postgresql.Driver","validationQuery":"select version()"} dsDataResource=await get_data_source_tree(request,page_object) # dbConnent= cls.get_db_engine("postgresql",dataParams]) dbConnent= cls.get_db_engine(dsDataResource["type"],dsDataResource["connectionParams"]) # await test_connection(dbConnent) #3.执行原始sql result = await cls.execute_sql(dbConnent, page_object.sqlStr,"原始") if 3 in role_id_list: resultDict={ "ctrlSql": page_object.sqlStr, "data": result, "message":"数据安全管理员权限" } return resultDict #4.获取sql中涉及的表名 sqlTableNames =await cls.get_tables_from_sql(page_object.sqlStr) #5.根据表名获取数据库中的字段名 table_columns = await cls.get_columns_from_tables(dbConnent, sqlTableNames,dsDataResource["type"],) #6.查询用户及该用户角色下的所有行列配置 tablesRowCol = {} # 遍历每个表名,获取对应的配置 for table_name in sqlTableNames: table_configs = await get_table_configs(query_db, page_object, user, role_id_list, table_name) tablesRowCol[table_name] = table_configs # 返回最终的结果字典 ctrSqlDict = await generate_sql(tablesRowCol,table_columns) oldStrSql= page_object.sqlStr if page_object.isPage: oldStrSql=generate_pagination_sql(page_object,dsDataResource["type"]) #7.根据行列配置控制原始sql newStrSql =await replace_table_with_subquery(ctrSqlDict,oldStrSql) #8.执行结果 result = await cls.execute_sql(dbConnent, newStrSql,"控制后") resultDict={ "ctrlSql": newStrSql, "data": result, "tablesRowCol":tablesRowCol } return resultDict def get_db_engine(db_type: str, db_params: dict): try: address = db_params['address'] jdbc_prefixes = { "jdbc:mysql://": len("jdbc:mysql://"), "jdbc:postgresql://": len("jdbc:postgresql://") } # Check and remove the matching prefix for prefix, length in jdbc_prefixes.items(): if address.startswith(prefix): address = address[length:] db_params['address']=address break # Once the correct prefix is found, exit the loop if db_type.lower() == "mysql": conn_str=f"mysql+aiomysql://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}" print(f"数据库连接字符串: {conn_str}") # 输出调试信息 dbContent= create_async_engine(conn_str) return dbContent elif db_type.lower() == "postgresql": dbContent= create_async_engine(f"postgresql+asyncpg://{db_params['user']}:{db_params['password']}@{db_params['address']}/{db_params['database']}") return dbContent else: raise ValueError("不支持的数据库类型") except SQLAlchemyError as e: # 捕获SQLAlchemy相关的数据库连接错误 raise ConnectionError(f"数据库连接失败: {e}") except Exception as e: # 捕获其他非预期的错误 raise RuntimeError(f"连接过程中发生了未知错误: {e}") @classmethod async def execute_sql(cls, dbConnent, sql_query: str,sql_type: str): # 创建异步会话 async with dbConnent.begin(): # 获取会话对象 async_session = sessionmaker( dbConnent, class_=AsyncSession, expire_on_commit=False ) async with async_session() as session: try: # 执行原始SQL查询 query = text(sql_query) result = await session.execute(query) # 获取所有结果 rows = result.fetchall() # 获取列名 columns = result.keys() # 将每一行转化为字典,键为列名 result_dict = [dict(zip(columns, row)) for row in rows] # # 使用 convert_decimal 处理数据 # result_dict = [convert_decimal(row) for row in result_dict] # 转换为 JSON 字符串 return result_dict except SQLAlchemyError as e: raise RuntimeError(f"{sql_type}执行 SQL 查询时发生错误: {e}") async def get_tables_from_sql(sql_query:str): table_pattern = r"(FROM|JOIN|INTO|UPDATE)\s+([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+))" table_matches = re.findall(table_pattern, sql_query, re.IGNORECASE) table_names = [match[1] for match in table_matches] table_names = [match[1].split('.')[-1] for match in table_matches] # `split('.')[-1]` 取最后一部分,即表名 return table_names @classmethod async def get_columns_from_tables(cls, dbConnent, table_names, db_type: str): """查询每个表的字段信息,根据数据库类型调整查询""" columns = {} query="" for table_name in table_names: if db_type.lower() == "postgresql": # PostgreSQL: 使用 information_schema.columns 查询字段 query= f"SELECT column_name FROM information_schema.columns WHERE table_name ='{table_name}'" elif db_type.lower() == "mysql": # MySQL: 使用 INFORMATION_SCHEMA.COLUMNS 查询字段 query= f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME ='{table_name}'" else: raise ValueError(f"暂不支持数据库类型: {db_type}") # Execute the query for the specific table result = await cls.execute_sql(dbConnent, query, "字段查询") # 将结果转换为字典格式 {table_name: ['column1', 'column2', ...]} columns[table_name] = [row["column_name"] for row in result] return columns def convert_decimal(obj): if isinstance(obj, Decimal): return float(obj) # 或者 str(obj) 来保留精度 elif isinstance(obj, dict): # 递归处理字典中的每个值 return {key: convert_decimal(value) for key, value in obj.items()} elif isinstance(obj, list): # 递归处理列表中的每个元素 return [convert_decimal(item) for item in obj] return obj # 返回非 Decimal、dict 或 list 的值 async def get_table_configs(query_db, page_object, user, role_id_list, table_name): # 获取用户的列配置 user_col_list = await MetaSecurityDao.get_api_col_list( query_db, page_object.dbRCode, table_name, '0', user[0].user_id ) # 获取角色的列配置 role_col_list = [] for role_id in role_id_list: role_cols = await MetaSecurityDao.get_api_col_list( query_db, page_object.dbRCode, table_name, '1', role_id ) role_col_list.extend(role_cols) # 将每个角色的列配置合并到列表中 # 获取用户的行配置 user_row_list = await MetaSecurityDao.get_api_row_list( query_db, page_object.dbRCode, table_name, '0', user[0].user_id ) # 获取角色的行配置 role_row_list = [] for role_id in role_id_list: role_rows = await MetaSecurityDao.get_api_row_list( query_db, page_object.dbRCode, table_name, '1', role_id ) role_row_list.extend(role_rows) # 将每个角色的行配置合并到列表中 isHave = any([ user_col_list, role_col_list, user_row_list, role_row_list ]) return { "user_col_list": user_col_list, "role_col_list": role_col_list, "role_row_list": role_row_list, "user_row_list": user_row_list, "isHave":isHave } async def generate_sql(tablesRowCol:dict, table_columns:dict): sql_queries = {} # 1. 列控制 # 遍历每个表 isHave=False no_configTable_name="" for table_name, table_configs in tablesRowCol.items(): if table_configs["isHave"]: isHave=True else: no_configTable_name += table_name + "," if not isHave: no_configTable_name = no_configTable_name.rstrip(',') raise ValueError(f"表:{no_configTable_name}均未配置行列数据安全") for table_name, config in tablesRowCol.items(): # 获取该表的字段名 columns = {col.lower(): col for col in table_columns[table_name]} # 将字段名转为小写 # 初始化 SELECT 部分:用字典存储字段名,值是 null 字段名 select_columns = {col: f"null {col}" for col in columns} # 处理角色列配置 for col in config["role_col_list"]: # If dbCName is "ALL", handle it as a special case if col.dbCName == "ALL": if col.ctrl_type == '0': # If ctrl_type is '0', prefix all columns with null for db_column in columns: # Assuming 'user' is the table name select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column names for db_column in columns: select_columns[db_column] = db_column # 使用实际字段名 else: # Handle specific columns listed in dbCName db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] for db_column in db_columns: db_column = db_column.strip() if db_column in columns: # Check if the column exists in the table if col.ctrl_type == '0': # If ctrl_type is '0', prefix with null select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == '1': # If ctrl_type is '1', use actual column name select_columns[db_column] = db_column # 使用实际字段名 # 处理用户列配置 for col in config["user_col_list"]: if col.dbCName == "ALL": # 如果 dbCName 为 "ALL" if col.ctrl_type == "0": # ctrlType 为 0,字符串字段 for db_column in columns: # 对所有字段加上 null select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": # ctrlType 为 1,实际数据库字段 for db_column in columns: # 使用实际字段名,不加 null select_columns[db_column] = db_column # 使用实际字段名 else: # 处理 dbCName 不为 "ALL" 的情况 db_columns = [db_column.strip().lower() for db_column in col.dbCName.split(",")] for db_column in db_columns: db_column = db_column.strip() if db_column in columns: if col.ctrl_type == "0": select_columns[db_column] = f"null {db_column}" # 仍然保留 null 前缀 elif col.ctrl_type == "1": select_columns[db_column] = db_column # 使用实际字段名 # 生成 SQL 查询 sql_queries[table_name] = f"SELECT {', '.join(select_columns.values())} FROM {table_name}" # 2.行控制 select_rows={} # 处理角色行配置 for row in config["role_row_list"]: # 仅仅对固定值有效,不加行限制 if row.ctrl_value == "ALL" and row.ctrl_type == '0': # 控制方式 --固定值 select_rows[row.dbCName] = "" else: if row.ctrl_type == '0': # row.ctrl_value 是逗号分隔的字符串时,改为 IN 语句 if "," in row.ctrl_value: # 将 ctrl_value 按逗号分割,并用单引号包裹每个值 values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" else: select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" if row.ctrl_type == '1': tab_col_value=row.ctrl_value.split(".") if len(tab_col_value) != 2: raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" # 处理用户行配置 for row in config["user_row_list"]: # 仅仅对固定值有效,不加行限制 if row.ctrl_value == "ALL" and row.ctrl_type == '0': # 控制方式 --固定值 select_rows[row.dbCName] = "" else: if row.ctrl_type == '0': # row.obj_value 是逗号分隔的字符串时,改为 IN 语句 if "," in row.ctrl_value: # 将 obj_value 按逗号分割,并用单引号包裹每个值 values = [f"'{value.strip()}'" for value in row.ctrl_value.split(",")] select_rows[row.dbCName] = f"{row.dbCName} IN ({', '.join(values)})" else: select_rows[row.dbCName] = f"{row.dbCName} = '{row.ctrl_value}'" if row.ctrl_type == '1': tab_col_value=row.ctrl_value.split(".") if len(tab_col_value) != 2: raise RuntimeError(f"{row.dbCName}字段控制类型为表字段,未维护正确的值") select_rows[row.dbCName] = f"{row.dbCName} in (select {tab_col_value[1]} from {tab_col_value[0]})" if select_rows.values(): where_conditions = " AND ".join(select_rows.values()) if where_conditions: sql_queries[table_name] += " WHERE " + where_conditions return sql_queries async def replace_table_with_subquery(ctrSqlDict, oldStrSql): table_alias_map = {} # 存储表名和别名的映射 for table_name, subquery in ctrSqlDict.items(): # 构建正则表达式,匹配表名及可能的别名 pattern = ( r'(\b(?:[a-zA-Z_][a-zA-Z0-9_]*\.)?' # 匹配模式名(可选) + re.escape(table_name) # 转义表名 + r'\b)' # 结束表名 r'(\s+(?:AS\s+)?(\w+))?' # 捕获别名部分(含 AS 或直接别名) r'(?=\s*[\w\(\)]*)' # 确保后面是合法 SQL 语法,不是 SQL 关键字 ) def replace(match): original_table = match.group(1) # 原始表名(可能含模式名) alias_part = match.group(2) # 别名部分(含空格、AS 或直接别名) alias_name = match.group(3) # 别名名称(无 AS 前缀) if original_table not in table_alias_map: # 处理表名后直接跟着 SQL 关键字的情况 sql_keywords = {"LIMIT", "WHERE", "ORDER", "GROUP", "HAVING", "JOIN", "ON", "USING", "UNION", "EXCEPT", "INTERSECT", "FETCH", "OFFSET"} if alias_name and alias_name.upper().split()[0] not in sql_keywords: # 已存在别名,且别名后没有紧跟 SQL 关键字,保留原别名 replaced = f"({subquery}) {alias_part}" table_alias_map[original_table] = alias_part else: # 无别名时,或者别名无效(如 LIMIT),添加默认别名 alias = original_table.split('.')[-1] replaced = f"({subquery}) AS {alias}{alias_part}" table_alias_map[original_table] = alias else: alias = table_alias_map[original_table] replaced = f"{alias}" # 使用别名 return replaced # 执行替换(忽略大小写) oldStrSql = re.sub(pattern, replace, oldStrSql, flags=re.IGNORECASE) return oldStrSql async def get_data_source_tree(request: Request, current_user: MetaSecurityApiModel): url = f'{AppConfig.ds_server_url}/dolphinscheduler/datasources/withpwdlist?pageNo=1&pageSize=100' headers = {'dashUserName': current_user.username ,'dashPassword': current_user.password,} response = requests.get(url, headers=headers, verify=False) if response.reason == 'OK': response_text = response.text data = json.loads(response_text) total_list = data["data"]["totalList"] # 解析 connectionParams 字符串为字典 for item in total_list: if item["name"]==current_user.dbRCode: item["connectionParams"] = json.loads(item["connectionParams"]) return item raise Exception(f'根据数据源ID:{current_user.dbRCode}获取数据源信息失败,状态: {response.reason}') else: raise Exception(f'根据数据源ID:{current_user.dbRCode}获取数据源信息失败,状态: {response.reason}') async def test_connection(db_content): try: # 尝试执行一个简单的查询来测试连接 async with db_content.connect() as connection: # 这里执行一个简单的查询,例如“SELECT 1” await connection.scalar("SELECT 1") except Exception as e: raise Exception("数据源连接失败") from e def generate_pagination_sql(page_object: MetaSecurityApiModel, db_type: str) -> str: """ 生成带分页的 SQL 语句 :param page_object: 包含分页参数的对象 :param db_type: 数据库类型(大写字符串) :return: 带分页的 SQL 语句 """ page_num = page_object.pageNum or 1 # 当前页码,默认为 1 page_size = page_object.pageSize or 10 # 每页大小,默认为 10 offset = (page_num - 1) * page_size # 计算偏移量(跳过的行数) oldStrSql = page_object.sqlStr # 获取原始 SQL 语句 db_type = db_type.upper() # 确保数据库类型为大写 if db_type == "MYSQL" or db_type == "POSTGRESQL": newStrSql = f"{oldStrSql} LIMIT {page_size} OFFSET {offset}" elif db_type == "SQLSERVER": newStrSql = f"{oldStrSql} ORDER BY id OFFSET {offset} ROWS FETCH NEXT {page_size} ROWS ONLY" elif db_type == "ORACLE": newStrSql = f""" SELECT * FROM ( SELECT a.*, ROWNUM rnum FROM ( {oldStrSql} ORDER BY id ) a WHERE ROWNUM <= {offset + page_size} ) WHERE rnum > {offset} """ else: raise ValueError(f"不支持的数据库类型: {db_type}") return newStrSql