user.py 8.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. from smtplib import SMTPException
  4. import transaction
  5. import typing as typing
  6. from tracim.exceptions import NotificationNotSend
  7. from tracim.exceptions import EmailValidationFailed
  8. from tracim.lib.mail_notifier.notifier import get_email_manager
  9. from sqlalchemy.orm import Session
  10. from tracim import CFG
  11. from tracim.models.auth import User
  12. from tracim.models.auth import Group
  13. from sqlalchemy.orm.exc import NoResultFound
  14. from tracim.exceptions import UserDoesNotExist
  15. from tracim.exceptions import WrongUserPassword
  16. from tracim.exceptions import AuthenticationFailed
  17. from tracim.models.context_models import UserInContext
  18. from tracim.models.context_models import TypeUser
  19. class UserApi(object):
  20. def __init__(
  21. self,
  22. current_user: typing.Optional[User],
  23. session: Session,
  24. config: CFG,
  25. ) -> None:
  26. self._session = session
  27. self._user = current_user
  28. self._config = config
  29. def _base_query(self):
  30. return self._session.query(User)
  31. def get_user_with_context(self, user: User) -> UserInContext:
  32. """
  33. Return UserInContext object from User
  34. """
  35. user = UserInContext(
  36. user=user,
  37. dbsession=self._session,
  38. config=self._config,
  39. )
  40. return user
  41. # Getters
  42. def get_one(self, user_id: int) -> User:
  43. """
  44. Get one user by user id
  45. """
  46. try:
  47. user = self._base_query().filter(User.user_id == user_id).one()
  48. except NoResultFound as exc:
  49. raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc # nopep8
  50. return user
  51. def get_one_by_email(self, email: str) -> User:
  52. """
  53. Get one user by email
  54. :param email: Email of the user
  55. :return: one user
  56. """
  57. try:
  58. user = self._base_query().filter(User.email == email).one()
  59. except NoResultFound as exc:
  60. raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc # nopep8
  61. return user
  62. def get_one_by_public_name(self, public_name: str) -> User:
  63. """
  64. Get one user by public_name
  65. """
  66. try:
  67. user = self._base_query().filter(User.display_name == public_name).one()
  68. except NoResultFound as exc:
  69. raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc # nopep8
  70. return user
  71. # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
  72. def get_one_by_id(self, id: int) -> User:
  73. return self.get_one(user_id=id)
  74. def get_current_user(self) -> User:
  75. """
  76. Get current_user
  77. """
  78. if not self._user:
  79. raise UserDoesNotExist('There is no current user')
  80. return self._user
  81. def get_all(self) -> typing.Iterable[User]:
  82. return self._session.query(User).order_by(User.display_name).all()
  83. def find(
  84. self,
  85. user_id: int=None,
  86. email: str=None,
  87. public_name: str=None
  88. ) -> typing.Tuple[TypeUser, User]:
  89. """
  90. Find existing user from all theses params.
  91. Check is made in this order: user_id, email, public_name
  92. If no user found raise UserDoesNotExist exception
  93. """
  94. user = None
  95. if user_id:
  96. try:
  97. user = self.get_one(user_id)
  98. return TypeUser.USER_ID, user
  99. except UserDoesNotExist:
  100. pass
  101. if email:
  102. try:
  103. user = self.get_one_by_email(email)
  104. return TypeUser.EMAIL, user
  105. except UserDoesNotExist:
  106. pass
  107. if public_name:
  108. try:
  109. user = self.get_one_by_public_name(public_name)
  110. return TypeUser.PUBLIC_NAME, user
  111. except UserDoesNotExist:
  112. pass
  113. raise UserDoesNotExist('User not found with any of given params.')
  114. # Check methods
  115. def user_with_email_exists(self, email: str) -> bool:
  116. try:
  117. self.get_one_by_email(email)
  118. return True
  119. # TODO - G.M - 09-04-2018 - Better exception
  120. except:
  121. return False
  122. def authenticate_user(self, email: str, password: str) -> User:
  123. """
  124. Authenticate user with email and password, raise AuthenticationFailed
  125. if uncorrect.
  126. :param email: email of the user
  127. :param password: cleartext password of the user
  128. :return: User who was authenticated.
  129. """
  130. try:
  131. user = self.get_one_by_email(email)
  132. if user.validate_password(password):
  133. return user
  134. else:
  135. raise WrongUserPassword('User "{}" password is incorrect'.format(email)) # nopep8
  136. except (WrongUserPassword, UserDoesNotExist) as exc:
  137. raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc # nopep8
  138. # Actions
  139. def _check_email(self, email: str) -> bool:
  140. # TODO - G.M - 2018-07-05 - find a better way to check email
  141. if not email:
  142. return False
  143. email = email.split('@')
  144. if len(email) != 2:
  145. return False
  146. return True
  147. def update(
  148. self,
  149. user: User,
  150. name: str=None,
  151. email: str=None,
  152. password: str=None,
  153. timezone: str='',
  154. do_save=True,
  155. ) -> None:
  156. if name is not None:
  157. user.display_name = name
  158. if email is not None:
  159. email_exist = self._check_email(email)
  160. if not email_exist:
  161. raise EmailValidationFailed('Email given form {} is uncorrect'.format(email)) # nopep8
  162. user.email = email
  163. if password is not None:
  164. user.password = password
  165. user.timezone = timezone
  166. if do_save:
  167. self.save(user)
  168. def create_user(
  169. self,
  170. email,
  171. password: str = None,
  172. name: str = None,
  173. timezone: str = '',
  174. groups=[],
  175. do_save: bool=True,
  176. do_notify: bool=True,
  177. ) -> User:
  178. new_user = self.create_minimal_user(email, groups, save_now=False)
  179. self.update(
  180. user=new_user,
  181. name=name,
  182. email=email,
  183. password=password,
  184. timezone=timezone,
  185. do_save=False,
  186. )
  187. if do_notify:
  188. try:
  189. email_manager = get_email_manager(self._config, self._session)
  190. email_manager.notify_created_account(
  191. new_user,
  192. password=password
  193. )
  194. except SMTPException as e:
  195. raise NotificationNotSend()
  196. if do_save:
  197. self.save(new_user)
  198. return new_user
  199. def create_minimal_user(
  200. self,
  201. email,
  202. groups=[],
  203. save_now=False
  204. ) -> User:
  205. """Previous create_user method"""
  206. user = User()
  207. email_exist = self._check_email(email)
  208. if not email_exist:
  209. raise EmailValidationFailed('Email given form {} is uncorrect'.format(email)) # nopep8
  210. user.email = email
  211. user.display_name = email.split('@')[0]
  212. for group in groups:
  213. user.groups.append(group)
  214. self._session.add(user)
  215. if save_now:
  216. self._session.flush()
  217. return user
  218. def save(self, user: User):
  219. self._session.flush()
  220. def execute_created_user_actions(self, created_user: User) -> None:
  221. """
  222. Execute actions when user just been created
  223. :return:
  224. """
  225. # NOTE: Cyclic import
  226. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  227. #from tracim.lib.calendar import CalendarManager
  228. #from tracim.model.organisational import UserCalendar
  229. # TODO - G.M - 04-04-2018 - [auth]
  230. # Check if this is already needed with
  231. # new auth system
  232. created_user.ensure_auth_token(
  233. session=self._session,
  234. validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
  235. )
  236. # Ensure database is up-to-date
  237. self._session.flush()
  238. transaction.commit()
  239. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  240. # calendar_manager = CalendarManager(created_user)
  241. # calendar_manager.create_then_remove_fake_event(
  242. # calendar_class=UserCalendar,
  243. # related_object_id=created_user.user_id,
  244. # )