user.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. import threading
  3. import transaction
  4. import typing as typing
  5. from tracim.models.auth import User
  6. # TODO - G.M -28-03-2018 - [CurrentUser] Check if "current user" stuff is always needed for tracimv2
  7. # CURRENT_USER_WEB = 'WEB'
  8. # CURRENT_USER_WSGIDAV = 'WSGIDAV'
  9. class UserApi(object):
  10. def __init__(self, current_user: User, session, config):
  11. self._session = session
  12. self._user = current_user
  13. self._config = config
  14. def get_all(self):
  15. return self._session.query(User).order_by(User.display_name).all()
  16. def _base_query(self):
  17. return self._session.query(User)
  18. def get_one(self, user_id: int):
  19. return self._base_query().filter(User.user_id==user_id).one()
  20. def get_one_by_email(self, email: str):
  21. return self._base_query().filter(User.email==email).one()
  22. def get_one_by_id(self, id: int) -> User:
  23. return self._base_query().filter(User.user_id==id).one()
  24. def update(
  25. self,
  26. user: User,
  27. name: str=None,
  28. email: str=None,
  29. do_save=True,
  30. timezone: str='',
  31. ):
  32. if name is not None:
  33. user.display_name = name
  34. if email is not None:
  35. user.email = email
  36. user.timezone = timezone
  37. if do_save:
  38. self.save(user)
  39. if email and self._user and user.user_id==self._user.user_id:
  40. pass
  41. # this is required for the _session to keep on being up-to-date
  42. # TODO - G.M - 28-03-2018 - [CurrentUser] Check for pyramid equivalent
  43. # tg.request.identity['repoze.who.userid'] = email
  44. # tg.auth_force_login(email)
  45. def user_with_email_exists(self, email: str):
  46. try:
  47. self.get_one_by_email(email)
  48. return True
  49. except:
  50. return False
  51. def create_user(self, email=None, groups=[], save_now=False) -> User:
  52. user = User()
  53. if email:
  54. user.email = email
  55. for group in groups:
  56. user.groups.append(group)
  57. self._session.add(user)
  58. if save_now:
  59. self._session.flush()
  60. return user
  61. def save(self, user: User):
  62. self._session.flush()
  63. def execute_created_user_actions(self, created_user: User) -> None:
  64. """
  65. Execute actions when user just been created
  66. :return:
  67. """
  68. # NOTE: Cyclic import
  69. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  70. #from tracim.lib.calendar import CalendarManager
  71. #from tracim.model.organisational import UserCalendar
  72. created_user.ensure_auth_token(dbsession=self._session, validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY)
  73. # Ensure database is up-to-date
  74. self._session.flush()
  75. transaction.commit()
  76. # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
  77. # calendar_manager = CalendarManager(created_user)
  78. # calendar_manager.create_then_remove_fake_event(
  79. # calendar_class=UserCalendar,
  80. # related_object_id=created_user.user_id,
  81. # )
  82. # TODO - G.M - 28-03-2018 - [CurrentUser] Check for pyramid equivalent
  83. # class CurrentUserGetterInterface(object):
  84. # def get_current_user(self) -> typing.Union[None, User]:
  85. # raise NotImplementedError()
  86. #
  87. #
  88. # class BaseCurrentUserGetter(CurrentUserGetterInterface):
  89. # def __init__(self) -> None:
  90. # self.api = UserApi(None)
  91. # class WebCurrentUserGetter(BaseCurrentUserGetter):
  92. # def get_current_user(self) -> typing.Union[None, User]:
  93. # # HACK - D.A. - 2015-09-02
  94. # # In tests, the tg.request.identity may not be set
  95. # # (this is a buggy case, but for now this is how the software is;)
  96. # if tg.request is not None:
  97. # if hasattr(tg.request, 'identity'):
  98. # if tg.request.identity is not None:
  99. # return self.api.get_one_by_email(
  100. # tg.request.identity['repoze.who.userid'],
  101. # )
  102. #
  103. # return None
  104. # TODO - G.M - 28-03-2018 - [Webdav] Reenable Webdav stuff
  105. # class WsgidavCurrentUserGetter(BaseCurrentUserGetter):
  106. # def get_current_user(self) -> typing.Union[None, User]:
  107. # if hasattr(cherrypy.request, 'current_user_email'):
  108. # return self.api.get_one_by_email(
  109. # cherrypy.request.current_user_email,
  110. # )
  111. #
  112. # return None
  113. # class CurrentUserGetterApi(object):
  114. # thread_local = threading.local()
  115. # matches = {
  116. # CURRENT_USER_WEB: WebCurrentUserGetter,
  117. # CURRENT_USER_WSGIDAV: WsgidavCurrentUserGetter,
  118. # }
  119. # default = CURRENT_USER_WEB
  120. #
  121. # @classmethod
  122. # def get_current_user(cls) -> User:
  123. # try:
  124. # return cls.thread_local.getter.get_current_user()
  125. # except AttributeError:
  126. # return cls.factory(cls.default).get_current_user()
  127. #
  128. # @classmethod
  129. # def set_thread_local_getter(cls, name) -> None:
  130. # if not hasattr(cls.thread_local, 'getter'):
  131. # cls.thread_local.getter = cls.factory(name)
  132. #
  133. # @classmethod
  134. # def factory(cls, name: str) -> CurrentUserGetterInterface:
  135. # return cls.matches[name]()