|
@@ -1,12 +1,12 @@
|
1
|
1
|
# -*- coding: utf-8 -*-
|
|
2
|
+from typing import Dict
|
|
3
|
+
|
2
|
4
|
from sqlalchemy import and_
|
3
|
5
|
from tg.configuration.auth import TGAuthMetadata
|
4
|
6
|
|
5
|
7
|
from tracim.lib.auth.base import Auth
|
6
|
8
|
from tracim.model import DBSession, User
|
7
|
9
|
|
8
|
|
-# TODO : temporary fix to update DB, to remove
|
9
|
|
-import transaction
|
10
|
10
|
|
11
|
11
|
class InternalAuth(Auth):
|
12
|
12
|
|
|
@@ -29,24 +29,35 @@ class InternalApplicationAuthMetadata(TGAuthMetadata):
|
29
|
29
|
def __init__(self, sa_auth):
|
30
|
30
|
self.sa_auth = sa_auth
|
31
|
31
|
|
32
|
|
- def authenticate(self, environ, identity, allow_auth_token: bool=False):
|
33
|
|
- user = self.sa_auth.dbsession.query(self.sa_auth.user_class).filter(and_(
|
34
|
|
- self.sa_auth.user_class.is_active == True,
|
35
|
|
- self.sa_auth.user_class.email == identity['login']
|
36
|
|
- )).first()
|
37
|
|
-
|
38
|
|
- if user and user.validate_password(identity['password']):
|
39
|
|
- if not user.webdav_left_digest_response_hash:
|
40
|
|
- user.webdav_left_digest_response_hash = '%s:/:%s' % (identity['login'], identity['password'])
|
41
|
|
- DBSession.flush()
|
42
|
|
- # TODO : temporary fix to update DB, to remove
|
43
|
|
- transaction.commit()
|
44
|
|
- return identity['login']
|
45
|
|
-
|
46
|
|
- if user and allow_auth_token:
|
47
|
|
- user.ensure_auth_token()
|
48
|
|
- if user.auth_token == identity['password']:
|
49
|
|
- return identity['login']
|
|
32
|
+ def authenticate(
|
|
33
|
+ self,
|
|
34
|
+ environ: Dict[str, str],
|
|
35
|
+ identity: Dict[str, str],
|
|
36
|
+ allow_auth_token: bool = False,
|
|
37
|
+ ) -> str:
|
|
38
|
+ """
|
|
39
|
+ Authenticates using given credentials.
|
|
40
|
+
|
|
41
|
+ Checks password first then auth token if allowed.
|
|
42
|
+ :param environ:
|
|
43
|
+ :param identity: The given credentials to authenticate.
|
|
44
|
+ :param allow_auth_token: The indicator of auth token use.
|
|
45
|
+ :return: The given login or an empty string if auth failed.
|
|
46
|
+ """
|
|
47
|
+ result = ''
|
|
48
|
+ user = self.sa_auth.dbsession \
|
|
49
|
+ .query(self.sa_auth.user_class) \
|
|
50
|
+ .filter(self.sa_auth.user_class.is_active.is_(True)) \
|
|
51
|
+ .filter(self.sa_auth.user_class.email == identity['login']) \
|
|
52
|
+ .first()
|
|
53
|
+ if user:
|
|
54
|
+ if user.validate_password(identity['password']):
|
|
55
|
+ result = identity['login']
|
|
56
|
+ if allow_auth_token:
|
|
57
|
+ user.ensure_auth_token()
|
|
58
|
+ if user.auth_token == identity['password']:
|
|
59
|
+ result = identity['login']
|
|
60
|
+ return result
|
50
|
61
|
|
51
|
62
|
def get_user(self, identity, userid):
|
52
|
63
|
return self.sa_auth.dbsession.query(self.sa_auth.user_class).filter(
|