workspace.py 9.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # -*- coding: utf-8 -*-
  2. import typing
  3. from sqlalchemy.orm import Query
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy.orm.exc import NoResultFound
  6. from tracim_backend.config import CFG
  7. from tracim_backend.exceptions import EmptyLabelNotAllowed
  8. from tracim_backend.exceptions import WorkspaceNotFound
  9. from tracim_backend.lib.utils.translation import fake_translator as _
  10. from tracim_backend.lib.core.userworkspace import RoleApi
  11. from tracim_backend.models.auth import Group
  12. from tracim_backend.models.auth import User
  13. from tracim_backend.models.context_models import WorkspaceInContext
  14. from tracim_backend.models.data import UserRoleInWorkspace
  15. from tracim_backend.models.data import Workspace
  16. __author__ = 'damien'
  17. class WorkspaceApi(object):
  18. def __init__(
  19. self,
  20. session: Session,
  21. current_user: User,
  22. config: CFG,
  23. force_role: bool=False
  24. ):
  25. """
  26. :param current_user: Current user of context
  27. :param force_role: If True, app role in queries even if admin
  28. """
  29. self._session = session
  30. self._user = current_user
  31. self._config = config
  32. self._force_role = force_role
  33. def _base_query_without_roles(self):
  34. return self._session.query(Workspace).filter(Workspace.is_deleted == False)
  35. def _base_query(self):
  36. if not self._force_role and self._user.profile.id>=Group.TIM_ADMIN:
  37. return self._base_query_without_roles()
  38. return self._session.query(Workspace).\
  39. join(Workspace.roles).\
  40. filter(UserRoleInWorkspace.user_id == self._user.user_id).\
  41. filter(Workspace.is_deleted == False)
  42. def get_workspace_with_context(
  43. self,
  44. workspace: Workspace
  45. ) -> WorkspaceInContext:
  46. """
  47. Return WorkspaceInContext object from Workspace
  48. """
  49. workspace = WorkspaceInContext(
  50. workspace=workspace,
  51. dbsession=self._session,
  52. config=self._config,
  53. )
  54. return workspace
  55. def create_workspace(
  56. self,
  57. label: str='',
  58. description: str='',
  59. calendar_enabled: bool=False,
  60. save_now: bool=False,
  61. ) -> Workspace:
  62. if not label:
  63. raise EmptyLabelNotAllowed('Workspace label cannot be empty')
  64. workspace = Workspace()
  65. workspace.label = label
  66. workspace.description = description
  67. workspace.calendar_enabled = calendar_enabled
  68. # By default, we force the current user to be the workspace manager
  69. # And to receive email notifications
  70. role_api = RoleApi(
  71. session=self._session,
  72. current_user=self._user,
  73. config=self._config
  74. )
  75. role = role_api.create_one(
  76. self._user,
  77. workspace,
  78. UserRoleInWorkspace.WORKSPACE_MANAGER,
  79. with_notif=True,
  80. )
  81. self._session.add(workspace)
  82. self._session.add(role)
  83. if save_now:
  84. self._session.flush()
  85. # TODO - G.M - 28-03-2018 - [Calendar] Reenable calendar stuff
  86. # if calendar_enabled:
  87. # self._ensure_calendar_exist(workspace)
  88. # else:
  89. # self._disable_calendar(workspace)
  90. return workspace
  91. def update_workspace(
  92. self,
  93. workspace: Workspace,
  94. label: str,
  95. description: str,
  96. save_now: bool=False,
  97. ) -> Workspace:
  98. """
  99. Update workspace
  100. :param workspace: workspace to update
  101. :param label: new label of workspace
  102. :param description: new description
  103. :param save_now: database flush
  104. :return: updated workspace
  105. """
  106. if not label:
  107. raise EmptyLabelNotAllowed('Workspace label cannot be empty')
  108. workspace.label = label
  109. workspace.description = description
  110. if save_now:
  111. self.save(workspace)
  112. return workspace
  113. def get_one(self, id):
  114. try:
  115. return self._base_query().filter(Workspace.workspace_id == id).one()
  116. except NoResultFound as exc:
  117. raise WorkspaceNotFound('workspace {} does not exist or not visible for user'.format(id)) from exc # nopep8
  118. def get_one_by_label(self, label: str) -> Workspace:
  119. try:
  120. return self._base_query().filter(Workspace.label == label).one()
  121. except NoResultFound as exc:
  122. raise WorkspaceNotFound('workspace {} does not exist or not visible for user'.format(id)) from exc # nopep8
  123. """
  124. def get_one_for_current_user(self, id):
  125. return self._base_query().filter(Workspace.workspace_id==id).\
  126. session.query(ZKContact).filter(ZKContact.groups.any(ZKGroup.id.in_([1,2,3])))
  127. filter(sqla.).one()
  128. """
  129. def get_all(self):
  130. return self._base_query().all()
  131. def get_all_for_user(self, user: User, ignored_ids=None):
  132. workspaces = []
  133. for role in user.roles:
  134. if not role.workspace.is_deleted:
  135. if not ignored_ids:
  136. workspaces.append(role.workspace)
  137. elif role.workspace.workspace_id not in ignored_ids:
  138. workspaces.append(role.workspace)
  139. else:
  140. pass # do not return workspace
  141. workspaces.sort(key=lambda workspace: workspace.label.lower())
  142. return workspaces
  143. def get_all_manageable(self) -> typing.List[Workspace]:
  144. """Get all workspaces the current user has manager rights on."""
  145. workspaces = [] # type: typing.List[Workspace]
  146. if self._user.profile.id == Group.TIM_ADMIN:
  147. workspaces = self._base_query().order_by(Workspace.label).all()
  148. elif self._user.profile.id == Group.TIM_MANAGER:
  149. workspaces = self._base_query() \
  150. .filter(
  151. UserRoleInWorkspace.role ==
  152. UserRoleInWorkspace.WORKSPACE_MANAGER
  153. ) \
  154. .order_by(Workspace.label) \
  155. .all()
  156. return workspaces
  157. def disable_notifications(self, user: User, workspace: Workspace):
  158. for role in user.roles:
  159. if role.workspace==workspace:
  160. role.do_notify = False
  161. def enable_notifications(self, user: User, workspace: Workspace):
  162. for role in user.roles:
  163. if role.workspace==workspace:
  164. role.do_notify = True
  165. def get_notifiable_roles(self, workspace: Workspace) -> [UserRoleInWorkspace]:
  166. roles = []
  167. for role in workspace.roles:
  168. if role.do_notify==True \
  169. and role.user!=self._user \
  170. and role.user.is_active:
  171. roles.append(role)
  172. return roles
  173. def save(self, workspace: Workspace):
  174. self._session.flush()
  175. def delete_one(self, workspace_id, flush=True):
  176. workspace = self.get_one(workspace_id)
  177. workspace.is_deleted = True
  178. if flush:
  179. self._session.flush()
  180. def restore_one(self, workspace_id, flush=True):
  181. workspace = self._session.query(Workspace)\
  182. .filter(Workspace.is_deleted==True)\
  183. .filter(Workspace.workspace_id==workspace_id).one()
  184. workspace.is_deleted = False
  185. if flush:
  186. self._session.flush()
  187. return workspace
  188. def execute_created_workspace_actions(self, workspace: Workspace) -> None:
  189. pass
  190. # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
  191. # self.ensure_calendar_exist(workspace)
  192. # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
  193. # def ensure_calendar_exist(self, workspace: Workspace) -> None:
  194. # # Note: Cyclic imports
  195. # from tracim.lib.calendar import CalendarManager
  196. # from tracim.model.organisational import WorkspaceCalendar
  197. #
  198. # calendar_manager = CalendarManager(self._user)
  199. #
  200. # try:
  201. # calendar_manager.enable_calendar_file(
  202. # calendar_class=WorkspaceCalendar,
  203. # related_object_id=workspace.workspace_id,
  204. # raise_=True,
  205. # )
  206. # # If previous calendar file no exist, calendar must be created
  207. # except FileNotFoundError:
  208. # self._user.ensure_auth_token()
  209. #
  210. # # Ensure database is up-to-date
  211. # self.session.flush()
  212. # transaction.commit()
  213. #
  214. # calendar_manager.create_then_remove_fake_event(
  215. # calendar_class=WorkspaceCalendar,
  216. # related_object_id=workspace.workspace_id,
  217. # )
  218. #
  219. # def disable_calendar(self, workspace: Workspace) -> None:
  220. # # Note: Cyclic imports
  221. # from tracim.lib.calendar import CalendarManager
  222. # from tracim.model.organisational import WorkspaceCalendar
  223. #
  224. # calendar_manager = CalendarManager(self._user)
  225. # calendar_manager.disable_calendar_file(
  226. # calendar_class=WorkspaceCalendar,
  227. # related_object_id=workspace.workspace_id,
  228. # raise_=False,
  229. # )
  230. def get_base_query(self) -> Query:
  231. return self._base_query()
  232. def generate_label(self) -> str:
  233. """
  234. :return: Generated workspace label
  235. """
  236. query = self._base_query_without_roles() \
  237. .filter(Workspace.label.ilike('{0}%'.format(
  238. _('Workspace'),
  239. )))
  240. return _('Workspace {}').format(
  241. query.count() + 1,
  242. )
  243. class UnsafeWorkspaceApi(WorkspaceApi):
  244. def _base_query(self):
  245. return self.session.query(Workspace).filter(Workspace.is_deleted==False)