user.py 5.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. import transaction
  4. import typing as typing
  5. from sqlalchemy.orm import Session
  6. from tracim import CFG
  7. from tracim.models.auth import User, Group
  8. from sqlalchemy.orm.exc import NoResultFound
  9. from tracim.exceptions import WrongUserPassword, UserNotExist
  10. from tracim.exceptions import AuthenticationFailed
  11. from tracim.models.context_models import UserInContext
  12. class UserApi(object):
  13. def __init__(
  14. self,
  15. current_user: typing.Optional[User],
  16. session: Session,
  17. config: CFG,
  18. ) -> None:
  19. self._session = session
  20. self._user = current_user
  21. self._config = config
  22. def _base_query(self):
  23. return self._session.query(User)
  24. def get_user_with_context(self, user: User) -> UserInContext:
  25. """
  26. Return UserInContext object from User
  27. """
  28. user = UserInContext(
  29. user=user,
  30. dbsession=self._session,
  31. config=self._config,
  32. )
  33. return user
  34. # Getters
  35. def get_one(self, user_id: int) -> User:
  36. """
  37. Get one user by user id
  38. """
  39. try:
  40. user = self._base_query().filter(User.user_id == user_id).one()
  41. except NoResultFound:
  42. raise UserNotExist()
  43. return user
  44. def get_one_by_email(self, email: str) -> User:
  45. """
  46. Get one user by email
  47. :param email: Email of the user
  48. :return: one user
  49. """
  50. try:
  51. user = self._base_query().filter(User.email == email).one()
  52. except NoResultFound:
  53. raise UserNotExist()
  54. return user
  55. # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
  56. def get_one_by_id(self, id: int) -> User:
  57. return self.get_one(user_id=id)
  58. def get_current_user(self) -> User:
  59. """
  60. Get current_user
  61. """
  62. if not self._user:
  63. raise UserNotExist()
  64. return self._user
  65. def get_all(self) -> typing.Iterable[User]:
  66. return self._session.query(User).order_by(User.display_name).all()
  67. # Check methods
  68. def user_with_email_exists(self, email: str) -> bool:
  69. try:
  70. self.get_one_by_email(email)
  71. return True
  72. # TODO - G.M - 09-04-2018 - Better exception
  73. except:
  74. return False
  75. def authenticate_user(self, email: str, password: str) -> User:
  76. """
  77. Authenticate user with email and password, raise AuthenticationFailed
  78. if uncorrect.
  79. :param email: email of the user
  80. :param password: cleartext password of the user
  81. :return: User who was authenticated.
  82. """
  83. try:
  84. user = self.get_one_by_email(email)
  85. if user.validate_password(password):
  86. return user
  87. else:
  88. raise WrongUserPassword()
  89. except (WrongUserPassword, NoResultFound, UserNotExist):
  90. raise AuthenticationFailed()
  91. def can_see_private_info_of_user(self, user: User):
  92. """
  93. Return boolean wheter current api user has right
  94. to see private information of a user.
  95. :param user:
  96. :return:
  97. """
  98. return self._user and (
  99. self._user.user_id == user.user_id or
  100. self._user.profile.id >= Group.TIM_ADMIN)
  101. # Actions
  102. def update(
  103. self,
  104. user: User,
  105. name: str=None,
  106. email: str=None,
  107. do_save=True,
  108. timezone: str='',
  109. ) -> None:
  110. if name is not None:
  111. user.display_name = name
  112. if email is not None:
  113. user.email = email
  114. user.timezone = timezone
  115. if do_save:
  116. self.save(user)
  117. def create_user(self, email=None, groups=[], save_now=False) -> User:
  118. user = User()
  119. if email:
  120. user.email = email
  121. for group in groups:
  122. user.groups.append(group)
  123. self._session.add(user)
  124. if save_now:
  125. self._session.flush()
  126. return user
  127. def save(self, user: User):
  128. self._session.flush()
  129. def execute_created_user_actions(self, created_user: User) -> None:
  130. """
  131. Execute actions when user just been created
  132. :return:
  133. """
  134. # NOTE: Cyclic import
  135. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  136. #from tracim.lib.calendar import CalendarManager
  137. #from tracim.model.organisational import UserCalendar
  138. # TODO - G.M - 04-04-2018 - [auth]
  139. # Check if this is already needed with
  140. # new auth system
  141. created_user.ensure_auth_token(
  142. session=self._session,
  143. validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
  144. )
  145. # Ensure database is up-to-date
  146. self._session.flush()
  147. transaction.commit()
  148. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  149. # calendar_manager = CalendarManager(created_user)
  150. # calendar_manager.create_then_remove_fake_event(
  151. # calendar_class=UserCalendar,
  152. # related_object_id=created_user.user_id,
  153. # )