user.py 14KB


  1. # -*- coding: utf-8 -*-
  2. from smtplib import SMTPException
  3. import transaction
  4. import typing as typing
  5. from sqlalchemy import or_
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy.orm import Query
  8. from sqlalchemy.orm.exc import NoResultFound
  9. from tracim_backend.config import CFG
  10. from tracim_backend.models.auth import User
  11. from tracim_backend.models.auth import Group
  12. from tracim_backend.exceptions import NoUserSetted
  13. from tracim_backend.exceptions import TooShortAutocompleteString
  14. from tracim_backend.exceptions import PasswordDoNotMatch
  15. from tracim_backend.exceptions import EmailValidationFailed
  16. from tracim_backend.exceptions import UserDoesNotExist
  17. from tracim_backend.exceptions import WrongUserPassword
  18. from tracim_backend.exceptions import AuthenticationFailed
  19. from tracim_backend.exceptions import NotificationNotSend
  20. from tracim_backend.exceptions import UserNotActive
  21. from tracim_backend.models.context_models import UserInContext
  22. from tracim_backend.lib.mail_notifier.notifier import get_email_manager
  23. from tracim_backend.models.context_models import TypeUser
  24. from tracim_backend.models.data import UserRoleInWorkspace
  25. class UserApi(object):
  26. def __init__(
  27. self,
  28. current_user: typing.Optional[User],
  29. session: Session,
  30. config: CFG,
  31. ) -> None:
  32. self._session = session
  33. self._user = current_user
  34. self._config = config
  35. def _base_query(self):
  36. return self._session.query(User)
  37. def get_user_with_context(self, user: User) -> UserInContext:
  38. """
  39. Return UserInContext object from User
  40. """
  41. user = UserInContext(
  42. user=user,
  43. dbsession=self._session,
  44. config=self._config,
  45. )
  46. return user
  47. # Getters
  48. def get_one(self, user_id: int) -> User:
  49. """
  50. Get one user by user id
  51. """
  52. try:
  53. user = self._base_query().filter(User.user_id == user_id).one()
  54. except NoResultFound as exc:
  55. raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc # nopep8
  56. return user
  57. def get_one_by_email(self, email: str) -> User:
  58. """
  59. Get one user by email
  60. :param email: Email of the user
  61. :return: one user
  62. """
  63. try:
  64. user = self._base_query().filter(User.email == email).one()
  65. except NoResultFound as exc:
  66. raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc # nopep8
  67. return user
  68. def get_one_by_public_name(self, public_name: str) -> User:
  69. """
  70. Get one user by public_name
  71. """
  72. try:
  73. user = self._base_query().filter(User.display_name == public_name).one()
  74. except NoResultFound as exc:
  75. raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc # nopep8
  76. return user
  77. # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
  78. def get_one_by_id(self, id: int) -> User:
  79. return self.get_one(user_id=id)
  80. def get_current_user(self) -> User:
  81. """
  82. Get current_user
  83. """
  84. if not self._user:
  85. raise UserDoesNotExist('There is no current user')
  86. return self._user
  87. def _get_all_query(self) -> Query:
  88. return self._session.query(User).order_by(User.display_name)
  89. def get_all(self) -> typing.Iterable[User]:
  90. return self._get_all_query().all()
  91. def get_known_user(
  92. self,
  93. acp: str,
  94. ) -> typing.Iterable[User]:
  95. """
  96. Return list of know user by current UserApi user.
  97. :param acp: autocomplete filter by name/email
  98. :return: List of found users
  99. """
  100. if len(acp) < 2:
  101. raise TooShortAutocompleteString(
  102. '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp) # nopep8
  103. )
  104. query = self._get_all_query()
  105. query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp)))) # nopep8
  106. # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
  107. # should show only user in same workspace as user
  108. if self._user and self._user.profile.id <= Group.TIM_USER:
  109. user_workspaces_id_query = self._session.\
  110. query(UserRoleInWorkspace.workspace_id).\
  111. distinct(UserRoleInWorkspace.workspace_id).\
  112. filter(UserRoleInWorkspace.user_id == self._user.user_id)
  113. users_in_workspaces = self._session.\
  114. query(UserRoleInWorkspace.user_id).\
  115. distinct(UserRoleInWorkspace.user_id).\
  116. filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery() # nopep8
  117. query = query.filter(User.user_id.in_(users_in_workspaces))
  118. return query.all()
  119. def find(
  120. self,
  121. user_id: int=None,
  122. email: str=None,
  123. public_name: str=None
  124. ) -> typing.Tuple[TypeUser, User]:
  125. """
  126. Find existing user from all theses params.
  127. Check is made in this order: user_id, email, public_name
  128. If no user found raise UserDoesNotExist exception
  129. """
  130. user = None
  131. if user_id:
  132. try:
  133. user = self.get_one(user_id)
  134. return TypeUser.USER_ID, user
  135. except UserDoesNotExist:
  136. pass
  137. if email:
  138. try:
  139. user = self.get_one_by_email(email)
  140. return TypeUser.EMAIL, user
  141. except UserDoesNotExist:
  142. pass
  143. if public_name:
  144. try:
  145. user = self.get_one_by_public_name(public_name)
  146. return TypeUser.PUBLIC_NAME, user
  147. except UserDoesNotExist:
  148. pass
  149. raise UserDoesNotExist('User not found with any of given params.')
  150. # Check methods
  151. def user_with_email_exists(self, email: str) -> bool:
  152. try:
  153. self.get_one_by_email(email)
  154. return True
  155. # TODO - G.M - 09-04-2018 - Better exception
  156. except:
  157. return False
  158. def authenticate_user(self, email: str, password: str) -> User:
  159. """
  160. Authenticate user with email and password, raise AuthenticationFailed
  161. if uncorrect.
  162. :param email: email of the user
  163. :param password: cleartext password of the user
  164. :return: User who was authenticated.
  165. """
  166. try:
  167. user = self.get_one_by_email(email)
  168. if not user.is_active:
  169. raise UserNotActive('User "{}" is not active'.format(email))
  170. if user.validate_password(password):
  171. return user
  172. else:
  173. raise WrongUserPassword('User "{}" password is incorrect'.format(email)) # nopep8
  174. except (WrongUserPassword, UserDoesNotExist) as exc:
  175. raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc # nopep8
  176. # Actions
  177. def set_password(
  178. self,
  179. user: User,
  180. loggedin_user_password: str,
  181. new_password: str,
  182. new_password2: str,
  183. do_save: bool=True
  184. ):
  185. """
  186. Set User password if loggedin user password is correct
  187. and both new_password are the same.
  188. :param user: User who need password changed
  189. :param loggedin_user_password: cleartext password of logged user (not
  190. same as user)
  191. :param new_password: new password for user
  192. :param new_password2: should be same as new_password
  193. :param do_save: should we save new user password ?
  194. :return:
  195. """
  196. if not self._user:
  197. raise NoUserSetted('Current User should be set in UserApi to use this method') # nopep8
  198. if not self._user.validate_password(loggedin_user_password): # nopep8
  199. raise WrongUserPassword(
  200. 'Wrong password for authenticated user {}'. format(self._user.user_id) # nopep8
  201. )
  202. if new_password != new_password2:
  203. raise PasswordDoNotMatch('Passwords given are different')
  204. self.update(
  205. user=user,
  206. password=new_password,
  207. do_save=do_save,
  208. )
  209. if do_save:
  210. # TODO - G.M - 2018-07-24 - Check why commit is needed here
  211. self.save(user)
  212. return user
  213. def set_email(
  214. self,
  215. user: User,
  216. loggedin_user_password: str,
  217. email: str,
  218. do_save: bool = True
  219. ):
  220. """
  221. Set email address of user if loggedin user password is correct
  222. :param user: User who need email changed
  223. :param loggedin_user_password: cleartext password of logged user (not
  224. same as user)
  225. :param email:
  226. :param do_save:
  227. :return:
  228. """
  229. if not self._user:
  230. raise NoUserSetted('Current User should be set in UserApi to use this method') # nopep8
  231. if not self._user.validate_password(loggedin_user_password): # nopep8
  232. raise WrongUserPassword(
  233. 'Wrong password for authenticated user {}'. format(self._user.user_id) # nopep8
  234. )
  235. self.update(
  236. user=user,
  237. email=email,
  238. do_save=do_save,
  239. )
  240. return user
  241. def _check_email(self, email: str) -> bool:
  242. # TODO - G.M - 2018-07-05 - find a better way to check email
  243. if not email:
  244. return False
  245. email = email.split('@')
  246. if len(email) != 2:
  247. return False
  248. return True
  249. def update(
  250. self,
  251. user: User,
  252. name: str=None,
  253. email: str=None,
  254. password: str=None,
  255. timezone: str=None,
  256. groups: typing.Optional[typing.List[Group]]=None,
  257. do_save=True,
  258. ) -> User:
  259. if name is not None:
  260. user.display_name = name
  261. if email is not None:
  262. email_exist = self._check_email(email)
  263. if not email_exist:
  264. raise EmailValidationFailed('Email given form {} is uncorrect'.format(email)) # nopep8
  265. user.email = email
  266. if password is not None:
  267. user.password = password
  268. if timezone is not None:
  269. user.timezone = timezone
  270. if groups is not None:
  271. # INFO - G.M - 2018-07-18 - Delete old groups
  272. for group in user.groups:
  273. if group not in groups:
  274. user.groups.remove(group)
  275. # INFO - G.M - 2018-07-18 - add new groups
  276. for group in groups:
  277. if group not in user.groups:
  278. user.groups.append(group)
  279. if do_save:
  280. self.save(user)
  281. return user
  282. def create_user(
  283. self,
  284. email,
  285. password: str = None,
  286. name: str = None,
  287. timezone: str = '',
  288. groups=[],
  289. do_save: bool=True,
  290. do_notify: bool=True,
  291. ) -> User:
  292. new_user = self.create_minimal_user(email, groups, save_now=False)
  293. self.update(
  294. user=new_user,
  295. name=name,
  296. email=email,
  297. password=password,
  298. timezone=timezone,
  299. do_save=False,
  300. )
  301. if do_notify:
  302. try:
  303. email_manager = get_email_manager(self._config, self._session)
  304. email_manager.notify_created_account(
  305. new_user,
  306. password=password
  307. )
  308. except SMTPException as e:
  309. raise NotificationNotSend()
  310. if do_save:
  311. self.save(new_user)
  312. return new_user
  313. def create_minimal_user(
  314. self,
  315. email,
  316. groups=[],
  317. save_now=False
  318. ) -> User:
  319. """Previous create_user method"""
  320. user = User()
  321. email_exist = self._check_email(email)
  322. if not email_exist:
  323. raise EmailValidationFailed('Email given form {} is uncorrect'.format(email)) # nopep8
  324. user.email = email
  325. user.display_name = email.split('@')[0]
  326. for group in groups:
  327. user.groups.append(group)
  328. self._session.add(user)
  329. if save_now:
  330. self._session.flush()
  331. return user
  332. def enable(self, user: User, do_save=False):
  333. user.is_active = True
  334. if do_save:
  335. self.save(user)
  336. def disable(self, user:User, do_save=False):
  337. user.is_active = False
  338. if do_save:
  339. self.save(user)
  340. def save(self, user: User):
  341. self._session.flush()
  342. def execute_created_user_actions(self, created_user: User) -> None:
  343. """
  344. Execute actions when user just been created
  345. :return:
  346. """
  347. # NOTE: Cyclic import
  348. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  349. #from tracim.lib.calendar import CalendarManager
  350. #from tracim.model.organisational import UserCalendar
  351. # TODO - G.M - 04-04-2018 - [auth]
  352. # Check if this is already needed with
  353. # new auth system
  354. created_user.ensure_auth_token(
  355. session=self._session,
  356. validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
  357. )
  358. # Ensure database is up-to-date
  359. self._session.flush()
  360. transaction.commit()
  361. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  362. # calendar_manager = CalendarManager(created_user)
  363. # calendar_manager.create_then_remove_fake_event(
  364. # calendar_class=UserCalendar,
  365. # related_object_id=created_user.user_id,
  366. # )