|
@@ -1,146 +0,0 @@
|
1
|
|
-# -*- coding: utf-8 -*-
|
2
|
|
-import typing
|
3
|
|
-
|
4
|
|
-from pyramid.interfaces import IAuthorizationPolicy
|
5
|
|
-from zope.interface import implementer
|
6
|
|
-
|
7
|
|
-try:
|
8
|
|
- from json.decoder import JSONDecodeError
|
9
|
|
-except ImportError: # python3.4
|
10
|
|
- JSONDecodeError = ValueError
|
11
|
|
-from sqlalchemy.orm.exc import NoResultFound
|
12
|
|
-
|
13
|
|
-from pyramid.request import Request
|
14
|
|
-
|
15
|
|
-from tracim.models.auth import User
|
16
|
|
-from tracim.models.data import Workspace
|
17
|
|
-from tracim.lib.core.user import UserApi
|
18
|
|
-from tracim.lib.core.workspace import WorkspaceApi
|
19
|
|
-
|
20
|
|
-from tracim.exceptions import NotAuthentificated
|
21
|
|
-from tracim.exceptions import WorkspaceNotFound
|
22
|
|
-from tracim.exceptions import InsufficientUserWorkspaceRole
|
23
|
|
-BASIC_AUTH_WEBUI_REALM = "tracim"
|
24
|
|
-TRACIM_DEFAULT_PERM = 'tracim'
|
25
|
|
-
|
26
|
|
-
|
27
|
|
-def get_safe_user(
|
28
|
|
- request: Request,
|
29
|
|
-) -> User:
|
30
|
|
- """
|
31
|
|
- Get current pyramid authenticated user from request
|
32
|
|
- :param request: pyramid request
|
33
|
|
- :return: current authenticated user
|
34
|
|
- """
|
35
|
|
- app_config = request.registry.settings['CFG']
|
36
|
|
- uapi = UserApi(None, session=request.dbsession, config=app_config)
|
37
|
|
- user = None
|
38
|
|
- try:
|
39
|
|
- login = request.authenticated_userid
|
40
|
|
- if not login:
|
41
|
|
- raise NotAuthentificated('not authenticated user_id,'
|
42
|
|
- 'Failed Authentification ?')
|
43
|
|
- user = uapi.get_one_by_email(login)
|
44
|
|
- except NoResultFound:
|
45
|
|
- raise NotAuthentificated('User not found')
|
46
|
|
- return user
|
47
|
|
-
|
48
|
|
-
|
49
|
|
-def get_workspace(user: User, request: Request) -> typing.Optional[Workspace]:
|
50
|
|
- """
|
51
|
|
- Get current workspace from request
|
52
|
|
- :param user: User who want to check the workspace
|
53
|
|
- :param request: pyramid request
|
54
|
|
- :return:
|
55
|
|
- """
|
56
|
|
- workspace_id = ''
|
57
|
|
- try:
|
58
|
|
- if 'workspace_id' not in request.json_body:
|
59
|
|
- return None
|
60
|
|
- workspace_id = request.json_body['workspace_id']
|
61
|
|
- wapi = WorkspaceApi(current_user=user, session=request.dbsession)
|
62
|
|
- workspace = wapi.get_one(workspace_id)
|
63
|
|
- except JSONDecodeError:
|
64
|
|
- raise WorkspaceNotFound('Bad json body')
|
65
|
|
- except NoResultFound:
|
66
|
|
- raise WorkspaceNotFound(
|
67
|
|
- 'Workspace {} does not exist '
|
68
|
|
- 'or is not visible for this user'.format(workspace_id)
|
69
|
|
- )
|
70
|
|
- return workspace
|
71
|
|
-
|
72
|
|
-###
|
73
|
|
-# BASIC AUTH
|
74
|
|
-###
|
75
|
|
-
|
76
|
|
-
|
77
|
|
-def basic_auth_check_credentials(
|
78
|
|
- login: str,
|
79
|
|
- cleartext_password: str,
|
80
|
|
- request: 'TracimRequest'
|
81
|
|
-) -> typing.Optional[list]:
|
82
|
|
- """
|
83
|
|
- Check credential for pyramid basic_auth
|
84
|
|
- :param login: login of user
|
85
|
|
- :param cleartext_password: user password in cleartext
|
86
|
|
- :param request: Pyramid request
|
87
|
|
- :return: None if auth failed, list of permissions if auth succeed
|
88
|
|
- """
|
89
|
|
-
|
90
|
|
- # Do not accept invalid user
|
91
|
|
- user = _get_basic_auth_unsafe_user(request)
|
92
|
|
- if not user \
|
93
|
|
- or user.email != login \
|
94
|
|
- or not user.validate_password(cleartext_password):
|
95
|
|
- return None
|
96
|
|
- return []
|
97
|
|
-
|
98
|
|
-
|
99
|
|
-def _get_basic_auth_unsafe_user(
|
100
|
|
- request: Request,
|
101
|
|
-) -> typing.Optional[User]:
|
102
|
|
- """
|
103
|
|
- :param request: pyramid request
|
104
|
|
- :return: User or None
|
105
|
|
- """
|
106
|
|
- app_config = request.registry.settings['CFG']
|
107
|
|
- uapi = UserApi(None, session=request.dbsession, config=app_config)
|
108
|
|
- try:
|
109
|
|
- login = request.unauthenticated_userid
|
110
|
|
- if not login:
|
111
|
|
- return None
|
112
|
|
- user = uapi.get_one_by_email(login)
|
113
|
|
- except NoResultFound:
|
114
|
|
- return None
|
115
|
|
- return user
|
116
|
|
-
|
117
|
|
-####
|
118
|
|
-
|
119
|
|
-
|
120
|
|
-def require_workspace_role(minimal_required_role):
|
121
|
|
- def decorator(func):
|
122
|
|
-
|
123
|
|
- def wrapper(self, request: 'TracimRequest'):
|
124
|
|
- user = request.current_user
|
125
|
|
- workspace = request.current_workspace
|
126
|
|
- if workspace.get_user_role(user) >= minimal_required_role:
|
127
|
|
- return func(self, request)
|
128
|
|
- raise InsufficientUserWorkspaceRole()
|
129
|
|
-
|
130
|
|
- return wrapper
|
131
|
|
- return decorator
|
132
|
|
-
|
133
|
|
-###
|
134
|
|
-
|
135
|
|
-
|
136
|
|
-@implementer(IAuthorizationPolicy)
|
137
|
|
-class AcceptAllAuthorizationPolicy(object):
|
138
|
|
- """
|
139
|
|
- Simple AuthorizationPolicy to avoid trouble with pyramid.
|
140
|
|
- Acceot any request.
|
141
|
|
- """
|
142
|
|
- def permits(self, context, principals, permision):
|
143
|
|
- return True
|
144
|
|
-
|
145
|
|
- def principals_allowed_by_permission(self, context, permission):
|
146
|
|
- raise NotImplementedError()
|