workspace.py 8.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- coding: utf-8 -*-
  2. import typing
  3. from sqlalchemy.orm import Query
  4. from sqlalchemy.orm import Session
  5. from tracim import CFG
  6. from tracim.lib.utils.translation import fake_translator as _
  7. from tracim.lib.core.userworkspace import RoleApi
  8. from tracim.models.auth import Group
  9. from tracim.models.auth import User
  10. from tracim.models.context_models import WorkspaceInContext
  11. from tracim.models.data import UserRoleInWorkspace
  12. from tracim.models.data import Workspace
  13. __author__ = 'damien'
  14. class WorkspaceApi(object):
  15. def __init__(
  16. self,
  17. session: Session,
  18. current_user: User,
  19. config: CFG,
  20. force_role: bool=False
  21. ):
  22. """
  23. :param current_user: Current user of context
  24. :param force_role: If True, app role in queries even if admin
  25. """
  26. self._session = session
  27. self._user = current_user
  28. self._config = config
  29. self._force_role = force_role
  30. def _base_query_without_roles(self):
  31. return self._session.query(Workspace).filter(Workspace.is_deleted == False)
  32. def _base_query(self):
  33. if not self._force_role and self._user.profile.id>=Group.TIM_ADMIN:
  34. return self._base_query_without_roles()
  35. return self._session.query(Workspace).\
  36. join(Workspace.roles).\
  37. filter(UserRoleInWorkspace.user_id == self._user.user_id).\
  38. filter(Workspace.is_deleted == False)
  39. def get_workspace_with_context(
  40. self,
  41. workspace: Workspace
  42. ) -> WorkspaceInContext:
  43. """
  44. Return WorkspaceInContext object from Workspace
  45. """
  46. workspace = WorkspaceInContext(
  47. workspace=workspace,
  48. dbsession=self._session,
  49. config=self._config,
  50. )
  51. return workspace
  52. def create_workspace(
  53. self,
  54. label: str='',
  55. description: str='',
  56. calendar_enabled: bool=False,
  57. save_now: bool=False,
  58. ) -> Workspace:
  59. if not label:
  60. label = self.generate_label()
  61. workspace = Workspace()
  62. workspace.label = label
  63. workspace.description = description
  64. workspace.calendar_enabled = calendar_enabled
  65. # By default, we force the current user to be the workspace manager
  66. # And to receive email notifications
  67. role_api = RoleApi(
  68. session=self._session,
  69. current_user=self._user,
  70. config=self._config
  71. )
  72. role = role_api.create_one(
  73. self._user,
  74. workspace,
  75. UserRoleInWorkspace.WORKSPACE_MANAGER,
  76. with_notif=True,
  77. )
  78. self._session.add(workspace)
  79. self._session.add(role)
  80. if save_now:
  81. self._session.flush()
  82. # TODO - G.M - 28-03-2018 - [Calendar] Reenable calendar stuff
  83. # if calendar_enabled:
  84. # self._ensure_calendar_exist(workspace)
  85. # else:
  86. # self._disable_calendar(workspace)
  87. return workspace
  88. def get_one(self, id):
  89. return self._base_query().filter(Workspace.workspace_id == id).one()
  90. def get_one_by_label(self, label: str) -> Workspace:
  91. return self._base_query().filter(Workspace.label == label).one()
  92. """
  93. def get_one_for_current_user(self, id):
  94. return self._base_query().filter(Workspace.workspace_id==id).\
  95. session.query(ZKContact).filter(ZKContact.groups.any(ZKGroup.id.in_([1,2,3])))
  96. filter(sqla.).one()
  97. """
  98. def get_all(self):
  99. return self._base_query().all()
  100. def get_all_for_user(self, user: User, ignored_ids=None):
  101. workspaces = []
  102. for role in user.roles:
  103. if not role.workspace.is_deleted:
  104. if not ignored_ids:
  105. workspaces.append(role.workspace)
  106. elif role.workspace.workspace_id not in ignored_ids:
  107. workspaces.append(role.workspace)
  108. else:
  109. pass # do not return workspace
  110. workspaces.sort(key=lambda workspace: workspace.label.lower())
  111. return workspaces
  112. def get_all_manageable(self) -> typing.List[Workspace]:
  113. """Get all workspaces the current user has manager rights on."""
  114. workspaces = [] # type: typing.List[Workspace]
  115. if self._user.profile.id == Group.TIM_ADMIN:
  116. workspaces = self._base_query().order_by(Workspace.label).all()
  117. elif self._user.profile.id == Group.TIM_MANAGER:
  118. workspaces = self._base_query() \
  119. .filter(
  120. UserRoleInWorkspace.role ==
  121. UserRoleInWorkspace.WORKSPACE_MANAGER
  122. ) \
  123. .order_by(Workspace.label) \
  124. .all()
  125. return workspaces
  126. def disable_notifications(self, user: User, workspace: Workspace):
  127. for role in user.roles:
  128. if role.workspace==workspace:
  129. role.do_notify = False
  130. def enable_notifications(self, user: User, workspace: Workspace):
  131. for role in user.roles:
  132. if role.workspace==workspace:
  133. role.do_notify = True
  134. def get_notifiable_roles(self, workspace: Workspace) -> [UserRoleInWorkspace]:
  135. roles = []
  136. for role in workspace.roles:
  137. if role.do_notify==True \
  138. and role.user!=self._user \
  139. and role.user.is_active:
  140. roles.append(role)
  141. return roles
  142. def save(self, workspace: Workspace):
  143. self._session.flush()
  144. def delete_one(self, workspace_id, flush=True):
  145. workspace = self.get_one(workspace_id)
  146. workspace.is_deleted = True
  147. if flush:
  148. self._session.flush()
  149. def restore_one(self, workspace_id, flush=True):
  150. workspace = self._session.query(Workspace)\
  151. .filter(Workspace.is_deleted==True)\
  152. .filter(Workspace.workspace_id==workspace_id).one()
  153. workspace.is_deleted = False
  154. if flush:
  155. self._session.flush()
  156. return workspace
  157. def execute_created_workspace_actions(self, workspace: Workspace) -> None:
  158. pass
  159. # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
  160. # self.ensure_calendar_exist(workspace)
  161. # TODO - G.M - 28-03-2018 - [Calendar] Re-enable this calendar stuff
  162. # def ensure_calendar_exist(self, workspace: Workspace) -> None:
  163. # # Note: Cyclic imports
  164. # from tracim.lib.calendar import CalendarManager
  165. # from tracim.model.organisational import WorkspaceCalendar
  166. #
  167. # calendar_manager = CalendarManager(self._user)
  168. #
  169. # try:
  170. # calendar_manager.enable_calendar_file(
  171. # calendar_class=WorkspaceCalendar,
  172. # related_object_id=workspace.workspace_id,
  173. # raise_=True,
  174. # )
  175. # # If previous calendar file no exist, calendar must be created
  176. # except FileNotFoundError:
  177. # self._user.ensure_auth_token()
  178. #
  179. # # Ensure database is up-to-date
  180. # self.session.flush()
  181. # transaction.commit()
  182. #
  183. # calendar_manager.create_then_remove_fake_event(
  184. # calendar_class=WorkspaceCalendar,
  185. # related_object_id=workspace.workspace_id,
  186. # )
  187. #
  188. # def disable_calendar(self, workspace: Workspace) -> None:
  189. # # Note: Cyclic imports
  190. # from tracim.lib.calendar import CalendarManager
  191. # from tracim.model.organisational import WorkspaceCalendar
  192. #
  193. # calendar_manager = CalendarManager(self._user)
  194. # calendar_manager.disable_calendar_file(
  195. # calendar_class=WorkspaceCalendar,
  196. # related_object_id=workspace.workspace_id,
  197. # raise_=False,
  198. # )
  199. def get_base_query(self) -> Query:
  200. return self._base_query()
  201. def generate_label(self) -> str:
  202. """
  203. :return: Generated workspace label
  204. """
  205. query = self._base_query_without_roles() \
  206. .filter(Workspace.label.ilike('{0}%'.format(
  207. _('Workspace'),
  208. )))
  209. return _('Workspace {}').format(
  210. query.count() + 1,
  211. )
  212. class UnsafeWorkspaceApi(WorkspaceApi):
  213. def _base_query(self):
  214. return self.session.query(Workspace).filter(Workspace.is_deleted==False)