You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					76 lines
				
				3.4 KiB
			
		
		
			
		
	
	
					76 lines
				
				3.4 KiB
			| 
								 
											2 years ago
										 
									 | 
							
								from fastapi import Depends
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								from typing import Optional
							 | 
						||
| 
								 
											2 years ago
										 
									 | 
							
								from module_admin.entity.vo.user_vo import CurrentUserModel
							 | 
						||
| 
								 | 
							
								from module_admin.service.login_service import LoginService
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class GetDataScope:
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    获取当前用户数据权限对应的查询sql语句
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								    DATA_SCOPE_ALL = '1'
							 | 
						||
| 
								 | 
							
								    DATA_SCOPE_CUSTOM = '2'
							 | 
						||
| 
								 | 
							
								    DATA_SCOPE_DEPT = '3'
							 | 
						||
| 
								 | 
							
								    DATA_SCOPE_DEPT_AND_CHILD = '4'
							 | 
						||
| 
								 | 
							
								    DATA_SCOPE_SELF = '5'
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								    def __init__(
							 | 
						||
| 
								 | 
							
								        self,
							 | 
						||
| 
								 | 
							
								        query_alias: Optional[str] = '',
							 | 
						||
| 
								 | 
							
								        db_alias: Optional[str] = 'db',
							 | 
						||
| 
								 | 
							
								        user_alias: Optional[str] = 'user_id',
							 | 
						||
| 
								 | 
							
								        dept_alias: Optional[str] = 'dept_id',
							 | 
						||
| 
								 | 
							
								    ):
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        获取当前用户数据权限对应的查询sql语句
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        :param query_alias: 所要查询表对应的sqlalchemy模型名称,默认为''
							 | 
						||
| 
								 | 
							
								        :param db_alias: orm对象别名,默认为'db'
							 | 
						||
| 
								 | 
							
								        :param user_alias: 用户id字段别名,默认为'user_id'
							 | 
						||
| 
								 | 
							
								        :param dept_alias: 部门id字段别名,默认为'dept_id'
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 
											2 years ago
										 
									 | 
							
								        self.query_alias = query_alias
							 | 
						||
| 
								 | 
							
								        self.db_alias = db_alias
							 | 
						||
| 
								 | 
							
								        self.user_alias = user_alias
							 | 
						||
| 
								 | 
							
								        self.dept_alias = dept_alias
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __call__(self, current_user: CurrentUserModel = Depends(LoginService.get_current_user)):
							 | 
						||
| 
								 | 
							
								        user_id = current_user.user.user_id
							 | 
						||
| 
								 | 
							
								        dept_id = current_user.user.dept_id
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        custom_data_scope_role_id_list = [
							 | 
						||
| 
								 | 
							
								            item.role_id for item in current_user.user.role if item.data_scope == self.DATA_SCOPE_CUSTOM
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        ]
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        param_sql_list = []
							 | 
						||
| 
								 | 
							
								        for role in current_user.user.role:
							 | 
						||
| 
								 | 
							
								            if current_user.user.admin or role.data_scope == self.DATA_SCOPE_ALL:
							 | 
						||
| 
								 | 
							
								                param_sql_list = ['1 == 1']
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								            elif role.data_scope == self.DATA_SCOPE_CUSTOM:
							 | 
						||
| 
								 | 
							
								                if len(custom_data_scope_role_id_list) > 1:
							 | 
						||
| 
								 | 
							
								                    param_sql_list.append(
							 | 
						||
| 
								 | 
							
								                        f"{self.query_alias}.{self.dept_alias}.in_(select(SysRoleDept.dept_id).where(SysRoleDept.role_id.in_({custom_data_scope_role_id_list}))) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0"
							 | 
						||
| 
								 | 
							
								                    )
							 | 
						||
| 
								 | 
							
								                else:
							 | 
						||
| 
								 | 
							
								                    param_sql_list.append(
							 | 
						||
| 
								 | 
							
								                        f"{self.query_alias}.{self.dept_alias}.in_(select(SysRoleDept.dept_id).where(SysRoleDept.role_id == {role.role_id})) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0"
							 | 
						||
| 
								 | 
							
								                    )
							 | 
						||
| 
								 | 
							
								            elif role.data_scope == self.DATA_SCOPE_DEPT:
							 | 
						||
| 
								 | 
							
								                param_sql_list.append(
							 | 
						||
| 
								 | 
							
								                    f"{self.query_alias}.{self.dept_alias} == {dept_id} if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0"
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            elif role.data_scope == self.DATA_SCOPE_DEPT_AND_CHILD:
							 | 
						||
| 
								 | 
							
								                param_sql_list.append(
							 | 
						||
| 
								 | 
							
								                    f"{self.query_alias}.{self.dept_alias}.in_(select(SysDept.dept_id).where(or_(SysDept.dept_id == {dept_id}, func.find_in_set({dept_id}, SysDept.ancestors)))) if hasattr({self.query_alias}, '{self.dept_alias}') else 1 == 0"
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            elif role.data_scope == self.DATA_SCOPE_SELF:
							 | 
						||
| 
								 | 
							
								                param_sql_list.append(
							 | 
						||
| 
								 | 
							
								                    f"{self.query_alias}.{self.user_alias} == {user_id} if hasattr({self.query_alias}, '{self.user_alias}') else 1 == 0"
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                param_sql_list.append('1 == 0')
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        param_sql_list = list(dict.fromkeys(param_sql_list))
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								        param_sql = f"or_({', '.join(param_sql_list)})"
							 | 
						||
| 
								 
											2 years ago
										 
									 | 
							
								
							 | 
						||
| 
								 | 
							
								        return param_sql
							 |