test_webdav.py 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- coding: utf-8 -*-
  2. import io
  3. import pytest
  4. from sqlalchemy.exc import InvalidRequestError
  5. from tracim.lib.core.user import UserApi
  6. from tracim.tests import eq_
  7. from tracim.lib.core.notifications import DummyNotifier
  8. from tracim.lib.webdav.dav_provider import Provider
  9. from tracim.lib.webdav.resources import RootResource
  10. from tracim.models import Content
  11. from tracim.models import ContentRevisionRO
  12. from tracim.tests import StandardTest
  13. from tracim.fixtures.content import Content as ContentFixtures
  14. from tracim.fixtures.users_and_groups import Base as BaseFixture
  15. from wsgidav import util
  16. class TestWebDav(StandardTest):
  17. fixtures = [BaseFixture, ContentFixtures]
  18. def _get_provider(self, config):
  19. return Provider(
  20. show_archived=False,
  21. show_deleted=False,
  22. show_history=False,
  23. app_config=config,
  24. )
  25. def _get_environ(
  26. self,
  27. provider: Provider,
  28. username: str,
  29. ) -> dict:
  30. return {
  31. 'http_authenticator.username': username,
  32. 'http_authenticator.realm': '/',
  33. 'wsgidav.provider': provider,
  34. 'tracim_user': self._get_user(username),
  35. 'tracim_dbsession': self.session,
  36. }
  37. def _get_user(self, email):
  38. return UserApi(None,
  39. self.session,
  40. self.app_config
  41. ).get_one_by_email(email)
  42. def _put_new_text_file(
  43. self,
  44. provider,
  45. environ,
  46. file_path,
  47. file_content,
  48. ):
  49. # This part id a reproduction of
  50. # wsgidav.request_server.RequestServer#doPUT
  51. # Grab parent folder where create file
  52. parentRes = provider.getResourceInst(
  53. util.getUriParent(file_path),
  54. environ,
  55. )
  56. assert parentRes, 'we should found folder for {0}'.format(file_path)
  57. new_resource = parentRes.createEmptyResource(
  58. util.getUriName(file_path),
  59. )
  60. write_object = new_resource.beginWrite(
  61. contentType='application/octet-stream',
  62. )
  63. write_object.write(file_content)
  64. write_object.close()
  65. new_resource.endWrite(withErrors=False)
  66. # Now file should exist
  67. return provider.getResourceInst(
  68. file_path,
  69. environ,
  70. )
  71. def test_unit__get_root__ok(self):
  72. provider = self._get_provider(self.app_config)
  73. root = provider.getResourceInst(
  74. '/',
  75. self._get_environ(
  76. provider,
  77. 'bob@fsf.local',
  78. )
  79. )
  80. assert root, 'Path / should return a RootResource instance'
  81. assert isinstance(root, RootResource)
  82. def test_unit__list_workspaces_with_user__ok(self):
  83. provider = self._get_provider(self.app_config)
  84. root = provider.getResourceInst(
  85. '/',
  86. self._get_environ(
  87. provider,
  88. 'bob@fsf.local',
  89. )
  90. )
  91. assert root, 'Path / should return a RootResource instance'
  92. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  93. children = root.getMemberList()
  94. eq_(
  95. 2,
  96. len(children),
  97. msg='RootResource should return 2 workspaces instead {0}'.format(
  98. len(children),
  99. )
  100. )
  101. workspaces_names = [w.name for w in children]
  102. assert 'Recipes' in workspaces_names, \
  103. 'Recipes should be in names ({0})'.format(
  104. workspaces_names,
  105. )
  106. assert 'Others' in workspaces_names, 'Others should be in names ({0})'.format(
  107. workspaces_names,
  108. )
  109. def test_unit__list_workspaces_with_admin__ok(self):
  110. provider = self._get_provider(self.app_config)
  111. root = provider.getResourceInst(
  112. '/',
  113. self._get_environ(
  114. provider,
  115. 'admin@admin.admin',
  116. )
  117. )
  118. assert root, 'Path / should return a RootResource instance'
  119. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  120. children = root.getMemberList()
  121. eq_(
  122. 2,
  123. len(children),
  124. msg='RootResource should return 3 workspaces instead {0}'.format(
  125. len(children),
  126. )
  127. )
  128. workspaces_names = [w.name for w in children]
  129. assert 'Recipes' in workspaces_names, 'Recipes should be in names ({0})'.format(
  130. workspaces_names,
  131. )
  132. assert 'Business' in workspaces_names, 'Business should be in names ({0})'.format(
  133. workspaces_names,
  134. )
  135. def test_unit__list_workspace_folders__ok(self):
  136. provider = self._get_provider(self.app_config)
  137. Recipes = provider.getResourceInst(
  138. '/Recipes/',
  139. self._get_environ(
  140. provider,
  141. 'bob@fsf.local',
  142. )
  143. )
  144. assert Recipes, 'Path /Recipes should return a Wrkspace instance'
  145. children = Recipes.getMemberList()
  146. eq_(
  147. 2,
  148. len(children),
  149. msg='Recipes should list 2 folders instead {0}'.format(
  150. len(children),
  151. ),
  152. )
  153. folders_names = [f.name for f in children]
  154. assert 'Salads' in folders_names, 'Salads should be in names ({0})'.format(
  155. folders_names,
  156. )
  157. assert 'Desserts' in folders_names, 'Desserts should be in names ({0})'.format(
  158. folders_names,
  159. )
  160. def test_unit__list_content__ok(self):
  161. provider = self._get_provider(self.app_config)
  162. Salads = provider.getResourceInst(
  163. '/Recipes/Desserts',
  164. self._get_environ(
  165. provider,
  166. 'bob@fsf.local',
  167. )
  168. )
  169. assert Salads, 'Path /Salads should return a Wrkspace instance'
  170. children = Salads.getMemberList()
  171. eq_(
  172. 5,
  173. len(children),
  174. msg='Salads should list 5 Files instead {0}'.format(
  175. len(children),
  176. ),
  177. )
  178. content_names = [c.name for c in children]
  179. assert 'Brownie Recipe.html' in content_names, \
  180. 'Brownie Recipe.html should be in names ({0})'.format(
  181. content_names,
  182. )
  183. assert 'Best Cakes ʔ.html' in content_names,\
  184. 'Best Cakes ʔ.html should be in names ({0})'.format(
  185. content_names,
  186. )
  187. assert 'Apple_Pie.txt' in content_names,\
  188. 'Apple_Pie.txt should be in names ({0})'.format(content_names,)
  189. assert 'Fruits Desserts' in content_names, \
  190. 'Fruits Desserts should be in names ({0})'.format(
  191. content_names,
  192. )
  193. assert 'Tiramisu Recipe.html' in content_names,\
  194. 'Tiramisu Recipe.html should be in names ({0})'.format(
  195. content_names,
  196. )
  197. def test_unit__get_content__ok(self):
  198. provider = self._get_provider(self.app_config)
  199. pie = provider.getResourceInst(
  200. '/Recipes/Desserts/Apple_Pie.txt',
  201. self._get_environ(
  202. provider,
  203. 'bob@fsf.local',
  204. )
  205. )
  206. assert pie, 'Apple_Pie should be found'
  207. eq_('Apple_Pie.txt', pie.name)
  208. def test_unit__delete_content__ok(self):
  209. provider = self._get_provider(self.app_config)
  210. pie = provider.getResourceInst(
  211. '/Recipes/Desserts/Apple_Pie.txt',
  212. self._get_environ(
  213. provider,
  214. 'bob@fsf.local',
  215. )
  216. )
  217. content_pie = self.session.query(ContentRevisionRO) \
  218. .filter(Content.label == 'Apple_Pie') \
  219. .one() # It must exist only one revision, cf fixtures
  220. eq_(
  221. False,
  222. content_pie.is_deleted,
  223. msg='Content should not be deleted !'
  224. )
  225. content_pie_id = content_pie.content_id
  226. pie.delete()
  227. self.session.flush()
  228. content_pie = self.session.query(ContentRevisionRO) \
  229. .filter(Content.content_id == content_pie_id) \
  230. .order_by(Content.revision_id.desc()) \
  231. .first()
  232. eq_(
  233. True,
  234. content_pie.is_deleted,
  235. msg='Content should be deleted !'
  236. )
  237. result = provider.getResourceInst(
  238. '/Recipes/Desserts/Apple_Pie.txt',
  239. self._get_environ(
  240. provider,
  241. 'bob@fsf.local',
  242. )
  243. )
  244. eq_(None, result, msg='Result should be None instead {0}'.format(
  245. result
  246. ))
  247. def test_unit__create_content__ok(self):
  248. provider = self._get_provider(self.app_config)
  249. environ = self._get_environ(
  250. provider,
  251. 'bob@fsf.local',
  252. )
  253. result = provider.getResourceInst(
  254. '/Recipes/Salads/greek_salad.txt',
  255. environ,
  256. )
  257. eq_(None, result, msg='Result should be None instead {0}'.format(
  258. result
  259. ))
  260. result = self._put_new_text_file(
  261. provider,
  262. environ,
  263. '/Recipes/Salads/greek_salad.txt',
  264. b'Greek Salad\n',
  265. )
  266. assert result, 'Result should not be None instead {0}'.format(
  267. result
  268. )
  269. eq_(
  270. b'Greek Salad\n',
  271. result.content.depot_file.file.read(),
  272. msg='fiel content should be "Greek Salad\n" but it is {0}'.format(
  273. result.content.depot_file.file.read()
  274. )
  275. )
  276. def test_unit__create_delete_and_create_file__ok(self):
  277. provider = self._get_provider(self.app_config)
  278. environ = self._get_environ(
  279. provider,
  280. 'bob@fsf.local',
  281. )
  282. new_file = provider.getResourceInst(
  283. '/Recipes/Salads/greek_salad.txt',
  284. environ,
  285. )
  286. eq_(None, new_file, msg='Result should be None instead {0}'.format(
  287. new_file
  288. ))
  289. # create it
  290. new_file = self._put_new_text_file(
  291. provider,
  292. environ,
  293. '/Recipes/Salads/greek_salad.txt',
  294. b'Greek Salad\n',
  295. )
  296. assert new_file, 'Result should not be None instead {0}'.format(
  297. new_file
  298. )
  299. content_new_file = self.session.query(ContentRevisionRO) \
  300. .filter(Content.label == 'greek_salad') \
  301. .one() # It must exist only one revision
  302. eq_(
  303. False,
  304. content_new_file.is_deleted,
  305. msg='Content should not be deleted !'
  306. )
  307. content_new_file_id = content_new_file.content_id
  308. # Delete if
  309. new_file.delete()
  310. self.session.flush()
  311. content_pie = self.session.query(ContentRevisionRO) \
  312. .filter(Content.content_id == content_new_file_id) \
  313. .order_by(Content.revision_id.desc()) \
  314. .first()
  315. eq_(
  316. True,
  317. content_pie.is_deleted,
  318. msg='Content should be deleted !'
  319. )
  320. result = provider.getResourceInst(
  321. '/Recipes/Salads/greek_salad.txt',
  322. self._get_environ(
  323. provider,
  324. 'bob@fsf.local',
  325. )
  326. )
  327. eq_(None, result, msg='Result should be None instead {0}'.format(
  328. result
  329. ))
  330. # Then create it again
  331. new_file = self._put_new_text_file(
  332. provider,
  333. environ,
  334. '/Recipes/Salads/greek_salad.txt',
  335. b'greek_salad\n',
  336. )
  337. assert new_file, 'Result should not be None instead {0}'.format(
  338. new_file
  339. )
  340. # Previous file is still dleeted
  341. self.session.flush()
  342. content_pie = self.session.query(ContentRevisionRO) \
  343. .filter(Content.content_id == content_new_file_id) \
  344. .order_by(Content.revision_id.desc()) \
  345. .first()
  346. eq_(
  347. True,
  348. content_pie.is_deleted,
  349. msg='Content should be deleted !'
  350. )
  351. # And an other file exist for this name
  352. content_new_new_file = self.session.query(ContentRevisionRO) \
  353. .filter(Content.label == 'greek_salad') \
  354. .order_by(Content.revision_id.desc()) \
  355. .first()
  356. assert content_new_new_file.content_id != content_new_file_id,\
  357. 'Contents ids should not be same !'
  358. eq_(
  359. False,
  360. content_new_new_file.is_deleted,
  361. msg='Content should not be deleted !'
  362. )
  363. def test_unit__rename_content__ok(self):
  364. provider = self._get_provider(self.app_config)
  365. environ = self._get_environ(
  366. provider,
  367. 'bob@fsf.local',
  368. )
  369. pie = provider.getResourceInst(
  370. '/Recipes/Desserts/Apple_Pie.txt',
  371. environ,
  372. )
  373. content_pie = self.session.query(ContentRevisionRO) \
  374. .filter(Content.label == 'Apple_Pie') \
  375. .one() # It must exist only one revision, cf fixtures
  376. assert content_pie, 'Apple_Pie should be exist'
  377. content_pie_id = content_pie.content_id
  378. pie.moveRecursive('/Recipes/Desserts/Apple_Pie_RENAMED.txt')
  379. # Database content is renamed
  380. content_pie = self.session.query(ContentRevisionRO) \
  381. .filter(ContentRevisionRO.content_id == content_pie_id) \
  382. .order_by(ContentRevisionRO.revision_id.desc()) \
  383. .first()
  384. eq_(
  385. 'Apple_Pie_RENAMED',
  386. content_pie.label,
  387. msg='File should be labeled Apple_Pie_RENAMED, not {0}'.format(
  388. content_pie.label
  389. )
  390. )
  391. def test_unit__move_content__ok(self):
  392. provider = self._get_provider(self.app_config)
  393. environ = self._get_environ(
  394. provider,
  395. 'bob@fsf.local',
  396. )
  397. pie = provider.getResourceInst(
  398. '/Recipes/Desserts/Apple_Pie.txt',
  399. environ,
  400. )
  401. content_pie = self.session.query(ContentRevisionRO) \
  402. .filter(Content.label == 'Apple_Pie') \
  403. .one() # It must exist only one revision, cf fixtures
  404. assert content_pie, 'Apple_Pie should be exist'
  405. content_pie_id = content_pie.content_id
  406. content_pie_parent = content_pie.parent
  407. eq_(
  408. content_pie_parent.label,
  409. 'Desserts',
  410. msg='field parent should be Desserts',
  411. )
  412. pie.moveRecursive('/Recipes/Salads/Apple_Pie.txt') # move in f2
  413. # Database content is moved
  414. content_pie = self.session.query(ContentRevisionRO) \
  415. .filter(ContentRevisionRO.content_id == content_pie_id) \
  416. .order_by(ContentRevisionRO.revision_id.desc()) \
  417. .first()
  418. assert content_pie.parent.label != content_pie_parent.label,\
  419. 'file should be moved in Salads but is in {0}'.format(
  420. content_pie.parent.label
  421. )
  422. def test_unit__move_and_rename_content__ok(self):
  423. provider = self._get_provider(self.app_config)
  424. environ = self._get_environ(
  425. provider,
  426. 'bob@fsf.local',
  427. )
  428. pie = provider.getResourceInst(
  429. '/Recipes/Desserts/Apple_Pie.txt',
  430. environ,
  431. )
  432. content_pie = self.session.query(ContentRevisionRO) \
  433. .filter(Content.label == 'Apple_Pie') \
  434. .one() # It must exist only one revision, cf fixtures
  435. assert content_pie, 'Apple_Pie should be exist'
  436. content_pie_id = content_pie.content_id
  437. content_pie_parent = content_pie.parent
  438. eq_(
  439. content_pie_parent.label,
  440. 'Desserts',
  441. msg='field parent should be Desserts',
  442. )
  443. pie.moveRecursive('/Others/Infos/Apple_Pie_RENAMED.txt')
  444. # Database content is moved
  445. content_pie = self.session.query(ContentRevisionRO) \
  446. .filter(ContentRevisionRO.content_id == content_pie_id) \
  447. .order_by(ContentRevisionRO.revision_id.desc()) \
  448. .first()
  449. assert content_pie.parent.label != content_pie_parent.label,\
  450. 'file should be moved in Recipesf2 but is in {0}'.format(
  451. content_pie.parent.label
  452. )
  453. eq_(
  454. 'Apple_Pie_RENAMED',
  455. content_pie.label,
  456. msg='File should be labeled Apple_Pie_RENAMED, not {0}'.format(
  457. content_pie.label
  458. )
  459. )
  460. def test_unit__move_content__ok__another_workspace(self):
  461. provider = self._get_provider(self.app_config)
  462. environ = self._get_environ(
  463. provider,
  464. 'bob@fsf.local',
  465. )
  466. content_to_move_res = provider.getResourceInst(
  467. '/Recipes/Desserts/Apple_Pie.txt',
  468. environ,
  469. )
  470. content_to_move = self.session.query(ContentRevisionRO) \
  471. .filter(Content.label == 'Apple_Pie') \
  472. .one() # It must exist only one revision, cf fixtures
  473. assert content_to_move, 'Apple_Pie should be exist'
  474. content_to_move_id = content_to_move.content_id
  475. content_to_move_parent = content_to_move.parent
  476. eq_(
  477. content_to_move_parent.label,
  478. 'Desserts',
  479. msg='field parent should be Desserts',
  480. )
  481. content_to_move_res.moveRecursive('/Others/Infos/Apple_Pie.txt') # move in Business, f1
  482. # Database content is moved
  483. content_to_move = self.session.query(ContentRevisionRO) \
  484. .filter(ContentRevisionRO.content_id == content_to_move_id) \
  485. .order_by(ContentRevisionRO.revision_id.desc()) \
  486. .first()
  487. assert content_to_move.parent, 'Content should have a parent'
  488. assert content_to_move.parent.label == 'Infos',\
  489. 'file should be moved in Infos but is in {0}'.format(
  490. content_to_move.parent.label
  491. )
  492. def test_unit__update_content__ok(self):
  493. provider = self._get_provider(self.app_config)
  494. environ = self._get_environ(
  495. provider,
  496. 'bob@fsf.local',
  497. )
  498. result = provider.getResourceInst(
  499. '/Recipes/Salads/greek_salad.txt',
  500. environ,
  501. )
  502. eq_(None, result, msg='Result should be None instead {0}'.format(
  503. result
  504. ))
  505. result = self._put_new_text_file(
  506. provider,
  507. environ,
  508. '/Recipes/Salads/greek_salad.txt',
  509. b'hello\n',
  510. )
  511. assert result, 'Result should not be None instead {0}'.format(
  512. result
  513. )
  514. eq_(
  515. b'hello\n',
  516. result.content.depot_file.file.read(),
  517. msg='fiel content should be "hello\n" but it is {0}'.format(
  518. result.content.depot_file.file.read()
  519. )
  520. )
  521. # ReInit DummyNotifier counter
  522. DummyNotifier.send_count = 0
  523. # Update file content
  524. write_object = result.beginWrite(
  525. contentType='application/octet-stream',
  526. )
  527. write_object.write(b'An other line')
  528. write_object.close()
  529. result.endWrite(withErrors=False)
  530. eq_(
  531. 1,
  532. DummyNotifier.send_count,
  533. msg='DummyNotifier should send 1 mail, not {}'.format(
  534. DummyNotifier.send_count
  535. ),
  536. )