user.py 6.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. from smtplib import SMTPException
  4. import transaction
  5. import typing as typing
  6. from tracim.exceptions import NotificationNotSend
  7. from tracim.lib.mail_notifier.notifier import get_email_manager
  8. from sqlalchemy.orm import Session
  9. from tracim import CFG
  10. from tracim.models.auth import User, Group
  11. from sqlalchemy.orm.exc import NoResultFound
  12. from tracim.exceptions import WrongUserPassword, UserNotExist
  13. from tracim.exceptions import AuthenticationFailed
  14. from tracim.models.context_models import UserInContext
  15. class UserApi(object):
  16. def __init__(
  17. self,
  18. current_user: typing.Optional[User],
  19. session: Session,
  20. config: CFG,
  21. ) -> None:
  22. self._session = session
  23. self._user = current_user
  24. self._config = config
  25. def _base_query(self):
  26. return self._session.query(User)
  27. def get_user_with_context(self, user: User) -> UserInContext:
  28. """
  29. Return UserInContext object from User
  30. """
  31. user = UserInContext(
  32. user=user,
  33. dbsession=self._session,
  34. config=self._config,
  35. )
  36. return user
  37. # Getters
  38. def get_one(self, user_id: int) -> User:
  39. """
  40. Get one user by user id
  41. """
  42. try:
  43. user = self._base_query().filter(User.user_id == user_id).one()
  44. except NoResultFound:
  45. raise UserNotExist()
  46. return user
  47. def get_one_by_email(self, email: str) -> User:
  48. """
  49. Get one user by email
  50. :param email: Email of the user
  51. :return: one user
  52. """
  53. try:
  54. user = self._base_query().filter(User.email == email).one()
  55. except NoResultFound:
  56. raise UserNotExist()
  57. return user
  58. # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
  59. def get_one_by_id(self, id: int) -> User:
  60. return self.get_one(user_id=id)
  61. def get_current_user(self) -> User:
  62. """
  63. Get current_user
  64. """
  65. if not self._user:
  66. raise UserNotExist()
  67. return self._user
  68. def get_all(self) -> typing.Iterable[User]:
  69. return self._session.query(User).order_by(User.display_name).all()
  70. # Check methods
  71. def user_with_email_exists(self, email: str) -> bool:
  72. try:
  73. self.get_one_by_email(email)
  74. return True
  75. # TODO - G.M - 09-04-2018 - Better exception
  76. except:
  77. return False
  78. def authenticate_user(self, email: str, password: str) -> User:
  79. """
  80. Authenticate user with email and password, raise AuthenticationFailed
  81. if uncorrect.
  82. :param email: email of the user
  83. :param password: cleartext password of the user
  84. :return: User who was authenticated.
  85. """
  86. try:
  87. user = self.get_one_by_email(email)
  88. if user.validate_password(password):
  89. return user
  90. else:
  91. raise WrongUserPassword()
  92. except (WrongUserPassword, NoResultFound, UserNotExist):
  93. raise AuthenticationFailed()
  94. # Actions
  95. def update(
  96. self,
  97. user: User,
  98. name: str=None,
  99. email: str=None,
  100. password: str=None,
  101. timezone: str='',
  102. do_save=True,
  103. ) -> None:
  104. if name is not None:
  105. user.display_name = name
  106. if email is not None:
  107. user.email = email
  108. if password is not None:
  109. user.password = password
  110. user.timezone = timezone
  111. if do_save:
  112. self.save(user)
  113. def create_user(
  114. self,
  115. email,
  116. password: str = None,
  117. name: str = None,
  118. timezone: str = '',
  119. groups=[],
  120. do_save: bool=True,
  121. do_notify: bool=True,
  122. ) -> User:
  123. new_user = self.create_minimal_user(email, groups, save_now=False)
  124. self.update(
  125. user=new_user,
  126. name=name,
  127. email=email,
  128. password=password,
  129. timezone=timezone,
  130. do_save=False,
  131. )
  132. if do_notify:
  133. try:
  134. email_manager = get_email_manager(self._config, self._session)
  135. email_manager.notify_created_account(
  136. new_user,
  137. password=password
  138. )
  139. except SMTPException as e:
  140. raise NotificationNotSend()
  141. if do_save:
  142. self.save(new_user)
  143. return new_user
  144. def create_minimal_user(
  145. self,
  146. email,
  147. groups=[],
  148. save_now=False
  149. ) -> User:
  150. """Previous create_user method"""
  151. user = User()
  152. user.email = email
  153. for group in groups:
  154. user.groups.append(group)
  155. self._session.add(user)
  156. if save_now:
  157. self._session.flush()
  158. return user
  159. def save(self, user: User):
  160. self._session.flush()
  161. def execute_created_user_actions(self, created_user: User) -> None:
  162. """
  163. Execute actions when user just been created
  164. :return:
  165. """
  166. # NOTE: Cyclic import
  167. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  168. #from tracim.lib.calendar import CalendarManager
  169. #from tracim.model.organisational import UserCalendar
  170. # TODO - G.M - 04-04-2018 - [auth]
  171. # Check if this is already needed with
  172. # new auth system
  173. created_user.ensure_auth_token(
  174. session=self._session,
  175. validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
  176. )
  177. # Ensure database is up-to-date
  178. self._session.flush()
  179. transaction.commit()
  180. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  181. # calendar_manager = CalendarManager(created_user)
  182. # calendar_manager.create_then_remove_fake_event(
  183. # calendar_class=UserCalendar,
  184. # related_object_id=created_user.user_id,
  185. # )