user.py 7.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # -*- coding: utf-8 -*-
  2. from smtplib import SMTPException
  3. import transaction
  4. import typing as typing
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.orm.exc import NoResultFound
  7. from tracim import CFG
  8. from tracim.models.auth import User
  9. from tracim.models.auth import Group
  10. from tracim.exceptions import WrongUserPassword
  11. from tracim.exceptions import UserDoesNotExist
  12. from tracim.exceptions import AuthenticationFailed
  13. from tracim.exceptions import NotificationNotSend
  14. from tracim.exceptions import UserNotActive
  15. from tracim.models.context_models import UserInContext
  16. from tracim.lib.mail_notifier.notifier import get_email_manager
  17. class UserApi(object):
  18. def __init__(
  19. self,
  20. current_user: typing.Optional[User],
  21. session: Session,
  22. config: CFG,
  23. ) -> None:
  24. self._session = session
  25. self._user = current_user
  26. self._config = config
  27. def _base_query(self):
  28. return self._session.query(User)
  29. def get_user_with_context(self, user: User) -> UserInContext:
  30. """
  31. Return UserInContext object from User
  32. """
  33. user = UserInContext(
  34. user=user,
  35. dbsession=self._session,
  36. config=self._config,
  37. )
  38. return user
  39. # Getters
  40. def get_one(self, user_id: int) -> User:
  41. """
  42. Get one user by user id
  43. """
  44. try:
  45. user = self._base_query().filter(User.user_id == user_id).one()
  46. except NoResultFound as exc:
  47. raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc # nopep8
  48. return user
  49. def get_one_by_email(self, email: str) -> User:
  50. """
  51. Get one user by email
  52. :param email: Email of the user
  53. :return: one user
  54. """
  55. try:
  56. user = self._base_query().filter(User.email == email).one()
  57. except NoResultFound as exc:
  58. raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc # nopep8
  59. return user
  60. # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
  61. def get_one_by_id(self, id: int) -> User:
  62. return self.get_one(user_id=id)
  63. def get_current_user(self) -> User:
  64. """
  65. Get current_user
  66. """
  67. if not self._user:
  68. raise UserDoesNotExist('There is no current user')
  69. return self._user
  70. def get_all(self) -> typing.Iterable[User]:
  71. return self._session.query(User).order_by(User.display_name).all()
  72. # Check methods
  73. def user_with_email_exists(self, email: str) -> bool:
  74. try:
  75. self.get_one_by_email(email)
  76. return True
  77. # TODO - G.M - 09-04-2018 - Better exception
  78. except:
  79. return False
  80. def authenticate_user(self, email: str, password: str) -> User:
  81. """
  82. Authenticate user with email and password, raise AuthenticationFailed
  83. if uncorrect.
  84. :param email: email of the user
  85. :param password: cleartext password of the user
  86. :return: User who was authenticated.
  87. """
  88. try:
  89. user = self.get_one_by_email(email)
  90. if not user.is_active:
  91. raise UserNotActive('User "{}" is not active'.format(email))
  92. if user.validate_password(password):
  93. return user
  94. else:
  95. raise WrongUserPassword('User "{}" password is incorrect'.format(email)) # nopep8
  96. except (WrongUserPassword, UserDoesNotExist) as exc:
  97. raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc # nopep8
  98. # Actions
  99. def update(
  100. self,
  101. user: User,
  102. name: str=None,
  103. email: str=None,
  104. password: str=None,
  105. timezone: str=None,
  106. groups: typing.Optional[typing.List[Group]]=None,
  107. do_save=True,
  108. ) -> User:
  109. if name is not None:
  110. user.display_name = name
  111. if email is not None:
  112. user.email = email
  113. if password is not None:
  114. user.password = password
  115. if timezone is not None:
  116. user.timezone = timezone
  117. if groups is not None:
  118. # INFO - G.M - 2018-07-18 - Delete old groups
  119. for group in user.groups:
  120. if group not in groups:
  121. user.groups.remove(group)
  122. # INFO - G.M - 2018-07-18 - add new groups
  123. for group in groups:
  124. if group not in user.groups:
  125. user.groups.append(group)
  126. if do_save:
  127. self.save(user)
  128. return user
  129. def create_user(
  130. self,
  131. email,
  132. password: str = None,
  133. name: str = None,
  134. timezone: str = '',
  135. groups=[],
  136. do_save: bool=True,
  137. do_notify: bool=True,
  138. ) -> User:
  139. new_user = self.create_minimal_user(email, groups, save_now=False)
  140. self.update(
  141. user=new_user,
  142. name=name,
  143. email=email,
  144. password=password,
  145. timezone=timezone,
  146. do_save=False,
  147. )
  148. if do_notify:
  149. try:
  150. email_manager = get_email_manager(self._config, self._session)
  151. email_manager.notify_created_account(
  152. new_user,
  153. password=password
  154. )
  155. except SMTPException as e:
  156. raise NotificationNotSend()
  157. if do_save:
  158. self.save(new_user)
  159. return new_user
  160. def create_minimal_user(
  161. self,
  162. email,
  163. groups=[],
  164. save_now=False
  165. ) -> User:
  166. """Previous create_user method"""
  167. user = User()
  168. user.email = email
  169. for group in groups:
  170. user.groups.append(group)
  171. self._session.add(user)
  172. if save_now:
  173. self._session.flush()
  174. return user
  175. def enable(self, user: User, do_save=False):
  176. user.is_active = True
  177. if do_save:
  178. self.save(user)
  179. def disable(self, user:User, do_save=False):
  180. user.is_active = False
  181. if do_save:
  182. self.save(user)
  183. def save(self, user: User):
  184. self._session.flush()
  185. def execute_created_user_actions(self, created_user: User) -> None:
  186. """
  187. Execute actions when user just been created
  188. :return:
  189. """
  190. # NOTE: Cyclic import
  191. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  192. #from tracim.lib.calendar import CalendarManager
  193. #from tracim.model.organisational import UserCalendar
  194. # TODO - G.M - 04-04-2018 - [auth]
  195. # Check if this is already needed with
  196. # new auth system
  197. created_user.ensure_auth_token(
  198. session=self._session,
  199. validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
  200. )
  201. # Ensure database is up-to-date
  202. self._session.flush()
  203. transaction.commit()
  204. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  205. # calendar_manager = CalendarManager(created_user)
  206. # calendar_manager.create_then_remove_fake_event(
  207. # calendar_class=UserCalendar,
  208. # related_object_id=created_user.user_id,
  209. # )