|
@ -9,6 +9,12 @@ class GetDataScope: |
|
|
获取当前用户数据权限对应的查询sql语句 |
|
|
获取当前用户数据权限对应的查询sql语句 |
|
|
""" |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
DATA_SCOPE_ALL = '1' |
|
|
|
|
|
DATA_SCOPE_CUSTOM = '2' |
|
|
|
|
|
DATA_SCOPE_DEPT = '3' |
|
|
|
|
|
DATA_SCOPE_DEPT_AND_CHILD = '4' |
|
|
|
|
|
DATA_SCOPE_SELF = '5' |
|
|
|
|
|
|
|
|
def __init__( |
|
|
def __init__( |
|
|
self, |
|
|
self, |
|
|
query_alias: Optional[str] = '', |
|
|
query_alias: Optional[str] = '', |
|
@ -16,6 +22,14 @@ class GetDataScope: |
|
|
user_alias: Optional[str] = 'user_id', |
|
|
user_alias: Optional[str] = 'user_id', |
|
|
dept_alias: Optional[str] = 'dept_id', |
|
|
dept_alias: Optional[str] = 'dept_id', |
|
|
): |
|
|
): |
|
|
|
|
|
""" |
|
|
|
|
|
获取当前用户数据权限对应的查询sql语句 |
|
|
|
|
|
|
|
|
|
|
|
:param query_alias: 所要查询表对应的sqlalchemy模型名称,默认为'' |
|
|
|
|
|
:param db_alias: orm对象别名,默认为'db' |
|
|
|
|
|
:param user_alias: 用户id字段别名,默认为'user_id' |
|
|
|
|
|
:param dept_alias: 部门id字段别名,默认为'dept_id' |
|
|
|
|
|
""" |
|
|
self.query_alias = query_alias |
|
|
self.query_alias = query_alias |
|
|
self.db_alias = db_alias |
|
|
self.db_alias = db_alias |
|
|
self.user_alias = user_alias |
|
|
self.user_alias = user_alias |
|
@ -24,23 +38,37 @@ class GetDataScope: |
|
|
def __call__(self, current_user: CurrentUserModel = Depends(LoginService.get_current_user)): |
|
|
def __call__(self, current_user: CurrentUserModel = Depends(LoginService.get_current_user)): |
|
|
user_id = current_user.user.user_id |
|
|
user_id = current_user.user.user_id |
|
|
dept_id = current_user.user.dept_id |
|
|
dept_id = current_user.user.dept_id |
|
|
role_datascope_list = [ |
|
|
custom_data_scope_role_id_list = [ |
|
|
dict(role_id=item.role_id, data_scope=int(item.data_scope)) for item in current_user.user.role |
|
|
item.role_id for item in current_user.user.role if item.data_scope == self.DATA_SCOPE_CUSTOM |
|
|
] |
|
|
] |
|
|
max_data_scope_dict = min(role_datascope_list, key=lambda x: x['data_scope']) |
|
|
param_sql_list = [] |
|
|
max_role_id = max_data_scope_dict['role_id'] |
|
|
for role in current_user.user.role: |
|
|
max_data_scope = max_data_scope_dict['data_scope'] |
|
|
if current_user.user.admin or role.data_scope == self.DATA_SCOPE_ALL: |
|
|
if self.query_alias == '' or max_data_scope == 1 or user_id == 1: |
|
|
param_sql_list = ['1 == 1'] |
|
|
param_sql = '1 == 1' |
|
|
break |
|
|
elif max_data_scope == 2: |
|
|
elif role.data_scope == self.DATA_SCOPE_CUSTOM: |
|
|
param_sql = f"{self.query_alias}.{self.dept_alias}.in_(select(SysRoleDept.dept_id).where(SysRoleDept.role_id == {max_role_id})) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
if len(custom_data_scope_role_id_list) > 1: |
|
|
elif max_data_scope == 3: |
|
|
param_sql_list.append( |
|
|
param_sql = f"{self.query_alias}.{self.dept_alias} == {dept_id} if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
f"{self.query_alias}.{self.dept_alias}.in_(select(SysRoleDept.dept_id).where(SysRoleDept.role_id.in_({custom_data_scope_role_id_list}))) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
elif max_data_scope == 4: |
|
|
) |
|
|
param_sql = f"{self.query_alias}.{self.dept_alias}.in_(select(SysDept.dept_id).where(or_(SysDept.dept_id == {dept_id}, func.find_in_set({dept_id}, SysDept.ancestors)))) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
else: |
|
|
elif max_data_scope == 5: |
|
|
param_sql_list.append( |
|
|
param_sql = f"{self.query_alias}.{self.user_alias} == {user_id} if hasattr({self.query_alias}, '{self.user_alias}') else 1 == 0" |
|
|
f"{self.query_alias}.{self.dept_alias}.in_(select(SysRoleDept.dept_id).where(SysRoleDept.role_id == {role.role_id})) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
else: |
|
|
) |
|
|
param_sql = '1 == 0' |
|
|
elif role.data_scope == self.DATA_SCOPE_DEPT: |
|
|
|
|
|
param_sql_list.append( |
|
|
|
|
|
f"{self.query_alias}.{self.dept_alias} == {dept_id} if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
|
|
|
) |
|
|
|
|
|
elif role.data_scope == self.DATA_SCOPE_DEPT_AND_CHILD: |
|
|
|
|
|
param_sql_list.append( |
|
|
|
|
|
f"{self.query_alias}.{self.dept_alias}.in_(select(SysDept.dept_id).where(or_(SysDept.dept_id == {dept_id}, func.find_in_set({dept_id}, SysDept.ancestors)))) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0" |
|
|
|
|
|
) |
|
|
|
|
|
elif role.data_scope == self.DATA_SCOPE_SELF: |
|
|
|
|
|
param_sql_list.append( |
|
|
|
|
|
f"{self.query_alias}.{self.user_alias} == {user_id} if hasattr({self.query_alias}, '{self.user_alias}') else 1 == 0" |
|
|
|
|
|
) |
|
|
|
|
|
else: |
|
|
|
|
|
param_sql_list.append('1 == 0') |
|
|
|
|
|
param_sql = f"or_({', '.join(param_sql_list)})" |
|
|
|
|
|
|
|
|
return param_sql |
|
|
return param_sql |
|
|