dav_provider.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. # coding: utf8
  2. import re
  3. from os.path import basename, dirname
  4. from sqlalchemy.orm.exc import NoResultFound
  5. from tracim import CFG
  6. from tracim.lib.webdav.utils import transform_to_bdd, HistoryType, \
  7. SpecialFolderExtension
  8. from wsgidav.dav_provider import DAVProvider
  9. from wsgidav.lock_manager import LockManager
  10. from tracim.lib.webdav.lock_storage import LockStorage
  11. from tracim.lib.core.content import ContentApi
  12. from tracim.lib.core.content import ContentRevisionRO
  13. from tracim.lib.core.workspace import WorkspaceApi
  14. from tracim.lib.webdav import resources
  15. from tracim.lib.webdav.utils import normpath
  16. from tracim.models.data import ContentType, Content, Workspace
  17. class Provider(DAVProvider):
  18. """
  19. This class' role is to provide to wsgidav _DAVResource. Wsgidav will then use them to execute action and send
  20. informations to the client
  21. """
  22. def __init__(
  23. self,
  24. app_config: CFG,
  25. show_history=True,
  26. show_deleted=True,
  27. show_archived=True,
  28. manage_locks=True,
  29. ):
  30. super(Provider, self).__init__()
  31. if manage_locks:
  32. self.lockManager = LockManager(LockStorage())
  33. self.app_config = app_config
  34. self._show_archive = show_archived
  35. self._show_delete = show_deleted
  36. self._show_history = show_history
  37. def show_history(self):
  38. return self._show_history
  39. def show_delete(self):
  40. return self._show_delete
  41. def show_archive(self):
  42. return self._show_archive
  43. #########################################################
  44. # Everything override from DAVProvider
  45. def getResourceInst(self, path: str, environ: dict):
  46. """
  47. Called by wsgidav whenever a request is called to get the _DAVResource corresponding to the path
  48. """
  49. user = environ['tracim_user']
  50. session = environ['tracim_dbsession']
  51. if not self.exists(path, environ):
  52. return None
  53. path = normpath(path)
  54. root_path = environ['http_authenticator.realm']
  55. # If the requested path is the root, then we return a RootResource resource
  56. if path == root_path:
  57. return resources.RootResource(path, environ, user=user, session=session)
  58. workspace_api = WorkspaceApi(
  59. current_user=user,
  60. session=session,
  61. config=self.app_config,
  62. )
  63. workspace = self.get_workspace_from_path(path, workspace_api)
  64. # If the request path is in the form root/name, then we return a WorkspaceResource resource
  65. parent_path = dirname(path)
  66. if parent_path == root_path:
  67. if not workspace:
  68. return None
  69. return resources.WorkspaceResource(
  70. path=path,
  71. environ=environ,
  72. workspace=workspace,
  73. user=user,
  74. session=session,
  75. )
  76. # And now we'll work on the path to establish which type or resource is requested
  77. content_api = ContentApi(
  78. current_user=user,
  79. session=session,
  80. config=self.app_config,
  81. show_archived=False, # self._show_archive,
  82. show_deleted=False, # self._show_delete
  83. )
  84. content = self.get_content_from_path(
  85. path=path,
  86. content_api=content_api,
  87. workspace=workspace
  88. )
  89. # Easy cases : path either end with /.deleted, /.archived or /.history, then we return corresponding resources
  90. if path.endswith(SpecialFolderExtension.Archived) and self._show_archive:
  91. return resources.ArchivedFolderResource(path, environ, workspace, content)
  92. if path.endswith(SpecialFolderExtension.Deleted) and self._show_delete:
  93. return resources.DeletedFolderResource(path, environ, workspace, content)
  94. if path.endswith(SpecialFolderExtension.History) and self._show_history:
  95. is_deleted_folder = re.search(r'/\.deleted/\.history$', path) is not None
  96. is_archived_folder = re.search(r'/\.archived/\.history$', path) is not None
  97. type = HistoryType.Deleted if is_deleted_folder \
  98. else HistoryType.Archived if is_archived_folder \
  99. else HistoryType.Standard
  100. return resources.HistoryFolderResource(path, environ, workspace, content, type)
  101. # Now that's more complicated, we're trying to find out if the path end with /.history/file_name
  102. is_history_file_folder = re.search(r'/\.history/([^/]+)$', path) is not None
  103. if is_history_file_folder and self._show_history:
  104. return resources.HistoryFileFolderResource(
  105. path=path,
  106. environ=environ,
  107. content=content
  108. )
  109. # And here next step :
  110. is_history_file = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) .+', path) is not None
  111. if self._show_history and is_history_file:
  112. revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path).group(1)
  113. content_revision = content_api.get_one_revision(revision_id)
  114. content = self.get_content_from_revision(content_revision, content_api)
  115. if content.type == ContentType.File:
  116. return resources.HistoryFileResource(path, environ, content, content_revision)
  117. else:
  118. return resources.HistoryOtherFile(path, environ, content, content_revision)
  119. # And if we're still going, the client is asking for a standard Folder/File/Page/Thread so we check the type7
  120. # and return the corresponding resource
  121. if content is None:
  122. return None
  123. if content.type == ContentType.Folder:
  124. return resources.FolderResource(
  125. path=path,
  126. environ=environ,
  127. workspace=content.workspace,
  128. content=content,
  129. session=session,
  130. user=user,
  131. )
  132. elif content.type == ContentType.File:
  133. return resources.FileResource(
  134. path=path,
  135. environ=environ,
  136. content=content,
  137. session=session,
  138. user=user
  139. )
  140. else:
  141. return resources.OtherFileResource(
  142. path=path,
  143. environ=environ,
  144. content=content,
  145. session=session,
  146. user=user,
  147. )
  148. def exists(self, path, environ) -> bool:
  149. """
  150. Called by wsgidav to check if a certain path is linked to a _DAVResource
  151. """
  152. path = normpath(path)
  153. working_path = self.reduce_path(path)
  154. root_path = environ['http_authenticator.realm']
  155. parent_path = dirname(working_path)
  156. user = environ['tracim_user']
  157. session = environ['tracim_dbsession']
  158. if path == root_path:
  159. return True
  160. workspace = self.get_workspace_from_path(
  161. path,
  162. WorkspaceApi(
  163. current_user=user,
  164. session=session,
  165. config=self.app_config,
  166. )
  167. )
  168. if parent_path == root_path or workspace is None:
  169. return workspace is not None
  170. # TODO bastien: Arnaud avait mis a True, verif le comportement
  171. # lorsque l'on explore les dossiers archive et deleted
  172. content_api = ContentApi(
  173. current_user=user,
  174. session=session,
  175. config=self.app_config,
  176. show_archived=False,
  177. show_deleted=False
  178. )
  179. revision_id = re.search(r'/\.history/[^/]+/\((\d+) - [a-zA-Z]+\) ([^/].+)$', path)
  180. is_archived = self.is_path_archive(path)
  181. is_deleted = self.is_path_delete(path)
  182. if revision_id:
  183. revision_id = revision_id.group(1)
  184. content = content_api.get_one_revision(revision_id)
  185. else:
  186. content = self.get_content_from_path(working_path, content_api, workspace)
  187. return content is not None \
  188. and content.is_deleted == is_deleted \
  189. and content.is_archived == is_archived
  190. def is_path_archive(self, path):
  191. """
  192. This function will check if a given path is linked to a file that's archived or not. We're checking if the
  193. given path end with one of these string :
  194. ex:
  195. - /a/b/.archived/my_file
  196. - /a/b/.archived/.history/my_file
  197. - /a/b/.archived/.history/my_file/(3615 - edition) my_file
  198. """
  199. return re.search(
  200. r'/\.archived/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
  201. path
  202. ) is not None
  203. def is_path_delete(self, path):
  204. """
  205. This function will check if a given path is linked to a file that's deleted or not. We're checking if the
  206. given path end with one of these string :
  207. ex:
  208. - /a/b/.deleted/my_file
  209. - /a/b/.deleted/.history/my_file
  210. - /a/b/.deleted/.history/my_file/(3615 - edition) my_file
  211. """
  212. return re.search(
  213. r'/\.deleted/(\.history/)?(?!\.history)[^/]*(/\.)?(history|deleted|archived)?$',
  214. path
  215. ) is not None
  216. def reduce_path(self, path: str) -> str:
  217. """
  218. As we use the given path to request the database
  219. ex: if the path is /a/b/.deleted/c/.archived, we're trying to get the archived content of the 'c' resource,
  220. we need to keep the path /a/b/c
  221. ex: if the path is /a/b/.history/my_file, we're trying to get the history of the file my_file, thus we need
  222. the path /a/b/my_file
  223. ex: if the path is /a/b/.history/my_file/(1985 - edition) my_old_name, we're looking for,
  224. thus we remove all useless information
  225. """
  226. path = re.sub(r'/\.archived', r'', path)
  227. path = re.sub(r'/\.deleted', r'', path)
  228. path = re.sub(r'/\.history/[^/]+/(\d+)-.+', r'/\1', path)
  229. path = re.sub(r'/\.history/([^/]+)', r'/\1', path)
  230. path = re.sub(r'/\.history', r'', path)
  231. return path
  232. def get_content_from_path(self, path, content_api: ContentApi, workspace: Workspace) -> Content:
  233. """
  234. Called whenever we want to get the Content item from the database for a given path
  235. """
  236. path = self.reduce_path(path)
  237. parent_path = dirname(path)
  238. relative_parents_path = parent_path[len(workspace.label)+1:]
  239. parents = relative_parents_path.split('/')
  240. try:
  241. parents.remove('')
  242. except ValueError:
  243. pass
  244. parents = [transform_to_bdd(x) for x in parents]
  245. try:
  246. return content_api.get_one_by_label_and_parent_labels(
  247. content_label=transform_to_bdd(basename(path)),
  248. content_parent_labels=parents,
  249. workspace=workspace,
  250. )
  251. except NoResultFound:
  252. return None
  253. def get_content_from_revision(self, revision: ContentRevisionRO, api: ContentApi) -> Content:
  254. try:
  255. return api.get_one(revision.content_id, ContentType.Any)
  256. except NoResultFound:
  257. return None
  258. def get_parent_from_path(self, path, api: ContentApi, workspace) -> Content:
  259. return self.get_content_from_path(dirname(path), api, workspace)
  260. def get_workspace_from_path(self, path: str, api: WorkspaceApi) -> Workspace:
  261. try:
  262. return api.get_one_by_label(transform_to_bdd(path.split('/')[1]))
  263. except NoResultFound:
  264. return None