sql_dav_provider.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # coding: utf8
  2. import re
  3. from os.path import basename, dirname
  4. from sqlalchemy.orm.exc import NoResultFound
  5. from tracim.lib.webdav.utils import transform_to_bdd
  6. from wsgidav.dav_provider import DAVProvider
  7. from wsgidav.lock_manager import LockManager
  8. from tracim.lib.webdav import HistoryType, SpecialFolderExtension
  9. from tracim.lib.webdav import sql_resources
  10. from tracim.lib.webdav.lock_storage import LockStorage
  11. from tracim.lib.content import ContentApi
  12. from tracim.lib.content import ContentRevisionRO
  13. from tracim.lib.user import UserApi
  14. from tracim.lib.workspace import WorkspaceApi
  15. from tracim.model.data import Content, Workspace
  16. from tracim.model.data import ContentType
  17. from tracim.lib.webdav.utils import normpath
  18. class Provider(DAVProvider):
  19. """
  20. This class' role is to provide to wsgidav _DAVResource. Wsgidav will then use them to execute action and send
  21. informations to the client
  22. """
  23. def __init__(self, show_history=True, show_deleted=True, show_archived=True, manage_locks=True):
  24. super(Provider, self).__init__()
  25. if manage_locks:
  26. self.lockManager = LockManager(LockStorage())
  27. self._show_archive = show_archived
  28. self._show_delete = show_deleted
  29. self._show_history = show_history
  30. def show_history(self):
  31. return self._show_history
  32. def show_delete(self):
  33. return self._show_delete
  34. def show_archive(self):
  35. return self._show_archive
  36. #########################################################
  37. # Everything override from DAVProvider
  38. def getResourceInst(self, path: str, environ: dict):
  39. """
  40. Called by wsgidav whenever a request is called to get the _DAVResource corresponding to the path
  41. """
  42. if not self.exists(path, environ):
  43. return None
  44. path = normpath(path)
  45. root_path = environ['http_authenticator.realm']
  46. # If the requested path is the root, then we return a Root resource
  47. if path == root_path:
  48. return sql_resources.Root(path, environ)
  49. user = UserApi(None).get_one_by_email(environ['http_authenticator.username'])
  50. workspace_api = WorkspaceApi(user)
  51. workspace = self.get_workspace_from_path(path, workspace_api)
  52. # If the request path is in the form root/name, then we return a Workspace resource
  53. parent_path = dirname(path)
  54. if parent_path == root_path:
  55. if not workspace:
  56. return None
  57. return sql_resources.Workspace(path, environ, workspace)
  58. # And now we'll work on the path to establish which type or resource is requested
  59. content_api = ContentApi(
  60. user,
  61. show_archived=False, # self._show_archive,
  62. show_deleted=False, # self._show_delete
  63. )
  64. content = self.get_content_from_path(
  65. path=path,
  66. content_api=content_api,
  67. workspace=workspace
  68. )
  69. # Easy cases : path either end with /.deleted, /.archived or /.history, then we return corresponding resources
  70. if path.endswith(SpecialFolderExtension.Archived) and self._show_archive:
  71. return sql_resources.ArchivedFolder(path, environ, workspace, content)
  72. if path.endswith(SpecialFolderExtension.Deleted) and self._show_delete:
  73. return sql_resources.DeletedFolder(path, environ, workspace, content)
  74. if path.endswith(SpecialFolderExtension.History) and self._show_history:
  75. is_deleted_folder = re.search(r'/\.deleted/\.history$', path) is not None
  76. is_archived_folder = re.search(r'/\.archived/\.history$', path) is not None
  77. type = HistoryType.Deleted if is_deleted_folder \
  78. else HistoryType.Archived if is_archived_folder \
  79. else HistoryType.Standard
  80. return sql_resources.HistoryFolder(path, environ, workspace, content, type)
  81. # Now that's more complicated, we're trying to find out if the path end with /.history/file_name
  82. is_history_file_folder = re.search(r'/\.history/([^/]+)$', path) is not None
  83. if is_history_file_folder and self._show_history:
  84. return sql_resources.HistoryFileFolder(
  85. path=path,
  86. environ=environ,
  87. content=content
  88. )
  89. # And here next step :
  90. is_history_file = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) .+', path) is not None
  91. if self._show_history and is_history_file:
  92. revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path).group(1)
  93. content_revision = content_api.get_one_revision(revision_id)
  94. content = self.get_content_from_revision(content_revision, content_api)
  95. if content.type == ContentType.File:
  96. return sql_resources.HistoryFile(path, environ, content, content_revision)
  97. else:
  98. return sql_resources.HistoryOtherFile(path, environ, content, content_revision)
  99. # And if we're still going, the client is asking for a standard Folder/File/Page/Thread so we check the type7
  100. # and return the corresponding resource
  101. if content is None:
  102. return None
  103. if content.type == ContentType.Folder:
  104. return sql_resources.Folder(path, environ, content.workspace, content)
  105. elif content.type == ContentType.File:
  106. return sql_resources.File(path, environ, content)
  107. else:
  108. return sql_resources.OtherFile(path, environ, content)
  109. def exists(self, path, environ) -> bool:
  110. """
  111. Called by wsgidav to check if a certain path is linked to a _DAVResource
  112. """
  113. path = normpath(path)
  114. working_path = self.reduce_path(path)
  115. root_path = environ['http_authenticator.realm']
  116. parent_path = dirname(working_path)
  117. if path == root_path:
  118. return True
  119. user = UserApi(None).get_one_by_email(environ['http_authenticator.username'])
  120. workspace = self.get_workspace_from_path(path, WorkspaceApi(user))
  121. if parent_path == root_path or workspace is None:
  122. return workspace is not None
  123. # TODO bastien: Arnaud avait mis a True, verif le comportement
  124. # lorsque l'on explore les dossiers archive et deleted
  125. content_api = ContentApi(user, show_archived=False, show_deleted=False)
  126. revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path)
  127. is_archived = self.is_path_archive(path)
  128. is_deleted = self.is_path_delete(path)
  129. if revision_id:
  130. revision_id = revision_id.group(1)
  131. content = content_api.get_one_revision(revision_id)
  132. else:
  133. content = self.get_content_from_path(working_path, content_api, workspace)
  134. return content is not None \
  135. and content.is_deleted == is_deleted \
  136. and content.is_archived == is_archived
  137. def is_path_archive(self, path):
  138. """
  139. This function will check if a given path is linked to a file that's archived or not. We're checking if the
  140. given path end with one of these string :
  141. ex:
  142. - /a/b/.archived/my_file
  143. - /a/b/.archived/.history/my_file
  144. - /a/b/.archived/.history/my_file/(3615 - edition) my_file
  145. """
  146. return re.search(
  147. r'/\.archived/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
  148. path
  149. ) is not None
  150. def is_path_delete(self, path):
  151. """
  152. This function will check if a given path is linked to a file that's deleted or not. We're checking if the
  153. given path end with one of these string :
  154. ex:
  155. - /a/b/.deleted/my_file
  156. - /a/b/.deleted/.history/my_file
  157. - /a/b/.deleted/.history/my_file/(3615 - edition) my_file
  158. """
  159. return re.search(
  160. r'/\.deleted/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
  161. path
  162. ) is not None
  163. def reduce_path(self, path: str) -> str:
  164. """
  165. As we use the given path to request the database
  166. ex: if the path is /a/b/.deleted/c/.archived, we're trying to get the archived content of the 'c' resource,
  167. we need to keep the path /a/b/c
  168. ex: if the path is /a/b/.history/my_file, we're trying to get the history of the file my_file, thus we need
  169. the path /a/b/my_file
  170. ex: if the path is /a/b/.history/my_file/(1985 - edition) my_old_name, we're looking for,
  171. thus we remove all useless information
  172. """
  173. path = re.sub(r'/\.archived', r'', path)
  174. path = re.sub(r'/\.deleted', r'', path)
  175. path = re.sub(r'/\.history/[^/]+/(\d+)-.+', r'/\1', path)
  176. path = re.sub(r'/\.history/([^/]+)', r'/\1', path)
  177. path = re.sub(r'/\.history', r'', path)
  178. return path
  179. def get_content_from_path(self, path, content_api: ContentApi, workspace: Workspace) -> Content:
  180. """
  181. Called whenever we want to get the Content item from the database for a given path
  182. """
  183. path = self.reduce_path(path)
  184. parent_path = dirname(path)
  185. relative_parents_path = parent_path[len(workspace.label)+1:]
  186. parents = relative_parents_path.split('/')
  187. try:
  188. parents.remove('')
  189. except ValueError:
  190. pass
  191. parents = [transform_to_bdd(x) for x in parents]
  192. try:
  193. return content_api.get_one_by_label_and_parent_labels(
  194. content_label=transform_to_bdd(basename(path)),
  195. content_parent_labels=parents,
  196. workspace=workspace,
  197. )
  198. except NoResultFound:
  199. return None
  200. def get_content_from_revision(self, revision: ContentRevisionRO, api: ContentApi) -> Content:
  201. try:
  202. return api.get_one(revision.content_id, ContentType.Any)
  203. except NoResultFound:
  204. return None
  205. def get_parent_from_path(self, path, api: ContentApi, workspace) -> Content:
  206. return self.get_content_from_path(dirname(path), api, workspace)
  207. def get_workspace_from_path(self, path: str, api: WorkspaceApi) -> Workspace:
  208. try:
  209. return api.get_one_by_label(transform_to_bdd(path.split('/')[1]))
  210. except NoResultFound:
  211. return None