test_webdav.py 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. # -*- coding: utf-8 -*-
  2. import io
  3. import pytest
  4. from sqlalchemy.exc import InvalidRequestError
  5. from wsgidav.wsgidav_app import DEFAULT_CONFIG
  6. from tracim_backend import WebdavAppFactory
  7. from tracim_backend.lib.core.user import UserApi
  8. from tracim_backend.lib.webdav import TracimDomainController
  9. from tracim_backend.tests import eq_
  10. from tracim_backend.lib.core.notifications import DummyNotifier
  11. from tracim_backend.lib.webdav.dav_provider import Provider
  12. from tracim_backend.lib.webdav.resources import RootResource
  13. from tracim_backend.models import Content
  14. from tracim_backend.models import ContentRevisionRO
  15. from tracim_backend.tests import StandardTest
  16. from tracim_backend.fixtures.content import Content as ContentFixtures
  17. from tracim_backend.fixtures.users_and_groups import Base as BaseFixture
  18. from wsgidav import util
  19. from unittest.mock import MagicMock
  20. class TestWebdavFactory(StandardTest):
  21. config_section = 'webdav_test'
  22. def test_unit__initConfig__ok__nominal_case(self):
  23. """
  24. Check if config is correctly modify for wsgidav using mocked
  25. wsgidav and tracim conf (as dict)
  26. :return:
  27. """
  28. tracim_settings = self.settings
  29. wsgidav_setting = DEFAULT_CONFIG.copy()
  30. wsgidav_setting.update(
  31. {
  32. 'root_path': '',
  33. 'acceptbasic': True,
  34. 'acceptdigest': False,
  35. 'defaultdigest': False,
  36. }
  37. )
  38. mock = MagicMock()
  39. mock._initConfig = WebdavAppFactory._initConfig
  40. mock._readConfigFile.return_value = wsgidav_setting
  41. mock._get_tracim_settings.return_value = tracim_settings
  42. config = mock._initConfig(mock)
  43. assert config
  44. assert config['acceptbasic'] is True
  45. assert config['acceptdigest'] is False
  46. assert config['defaultdigest'] is False
  47. # TODO - G.M - 25-05-2018 - Better check for middleware stack config
  48. assert 'middleware_stack' in config
  49. assert len(config['middleware_stack']) == 7
  50. assert 'root_path' in config
  51. assert 'provider_mapping' in config
  52. assert config['root_path'] in config['provider_mapping']
  53. assert isinstance(config['provider_mapping'][config['root_path']], Provider) # nopep8
  54. assert 'domaincontroller' in config
  55. assert isinstance(config['domaincontroller'], TracimDomainController)
  56. class TestWebDav(StandardTest):
  57. fixtures = [BaseFixture, ContentFixtures]
  58. def _get_provider(self, config):
  59. return Provider(
  60. show_archived=False,
  61. show_deleted=False,
  62. show_history=False,
  63. app_config=config,
  64. )
  65. def _get_environ(
  66. self,
  67. provider: Provider,
  68. username: str,
  69. ) -> dict:
  70. return {
  71. 'http_authenticator.username': username,
  72. 'http_authenticator.realm': '/',
  73. 'wsgidav.provider': provider,
  74. 'tracim_user': self._get_user(username),
  75. 'tracim_dbsession': self.session,
  76. }
  77. def _get_user(self, email):
  78. return UserApi(None,
  79. self.session,
  80. self.app_config
  81. ).get_one_by_email(email)
  82. def _put_new_text_file(
  83. self,
  84. provider,
  85. environ,
  86. file_path,
  87. file_content,
  88. ):
  89. # This part id a reproduction of
  90. # wsgidav.request_server.RequestServer#doPUT
  91. # Grab parent folder where create file
  92. parentRes = provider.getResourceInst(
  93. util.getUriParent(file_path),
  94. environ,
  95. )
  96. assert parentRes, 'we should found folder for {0}'.format(file_path)
  97. new_resource = parentRes.createEmptyResource(
  98. util.getUriName(file_path),
  99. )
  100. write_object = new_resource.beginWrite(
  101. contentType='application/octet-stream',
  102. )
  103. write_object.write(file_content)
  104. write_object.close()
  105. new_resource.endWrite(withErrors=False)
  106. # Now file should exist
  107. return provider.getResourceInst(
  108. file_path,
  109. environ,
  110. )
  111. def test_unit__get_root__ok(self):
  112. provider = self._get_provider(self.app_config)
  113. root = provider.getResourceInst(
  114. '/',
  115. self._get_environ(
  116. provider,
  117. 'bob@fsf.local',
  118. )
  119. )
  120. assert root, 'Path / should return a RootResource instance'
  121. assert isinstance(root, RootResource)
  122. def test_unit__list_workspaces_with_user__ok(self):
  123. provider = self._get_provider(self.app_config)
  124. root = provider.getResourceInst(
  125. '/',
  126. self._get_environ(
  127. provider,
  128. 'bob@fsf.local',
  129. )
  130. )
  131. assert root, 'Path / should return a RootResource instance'
  132. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  133. children = root.getMemberList()
  134. eq_(
  135. 2,
  136. len(children),
  137. msg='RootResource should return 2 workspaces instead {0}'.format(
  138. len(children),
  139. )
  140. )
  141. workspaces_names = [w.name for w in children]
  142. assert 'Recipes' in workspaces_names, \
  143. 'Recipes should be in names ({0})'.format(
  144. workspaces_names,
  145. )
  146. assert 'Others' in workspaces_names, 'Others should be in names ({0})'.format(
  147. workspaces_names,
  148. )
  149. def test_unit__list_workspaces_with_admin__ok(self):
  150. provider = self._get_provider(self.app_config)
  151. root = provider.getResourceInst(
  152. '/',
  153. self._get_environ(
  154. provider,
  155. 'admin@admin.admin',
  156. )
  157. )
  158. assert root, 'Path / should return a RootResource instance'
  159. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  160. children = root.getMemberList()
  161. eq_(
  162. 2,
  163. len(children),
  164. msg='RootResource should return 3 workspaces instead {0}'.format(
  165. len(children),
  166. )
  167. )
  168. workspaces_names = [w.name for w in children]
  169. assert 'Recipes' in workspaces_names, 'Recipes should be in names ({0})'.format(
  170. workspaces_names,
  171. )
  172. assert 'Business' in workspaces_names, 'Business should be in names ({0})'.format(
  173. workspaces_names,
  174. )
  175. def test_unit__list_workspace_folders__ok(self):
  176. provider = self._get_provider(self.app_config)
  177. Recipes = provider.getResourceInst(
  178. '/Recipes/',
  179. self._get_environ(
  180. provider,
  181. 'bob@fsf.local',
  182. )
  183. )
  184. assert Recipes, 'Path /Recipes should return a Wrkspace instance'
  185. children = Recipes.getMemberList()
  186. eq_(
  187. 2,
  188. len(children),
  189. msg='Recipes should list 2 folders instead {0}'.format(
  190. len(children),
  191. ),
  192. )
  193. folders_names = [f.name for f in children]
  194. assert 'Salads' in folders_names, 'Salads should be in names ({0})'.format(
  195. folders_names,
  196. )
  197. assert 'Desserts' in folders_names, 'Desserts should be in names ({0})'.format(
  198. folders_names,
  199. )
  200. def test_unit__list_content__ok(self):
  201. provider = self._get_provider(self.app_config)
  202. Salads = provider.getResourceInst(
  203. '/Recipes/Desserts',
  204. self._get_environ(
  205. provider,
  206. 'bob@fsf.local',
  207. )
  208. )
  209. assert Salads, 'Path /Salads should return a Wrkspace instance'
  210. children = Salads.getMemberList()
  211. eq_(
  212. 5,
  213. len(children),
  214. msg='Salads should list 5 Files instead {0}'.format(
  215. len(children),
  216. ),
  217. )
  218. content_names = [c.name for c in children]
  219. assert 'Brownie Recipe.html' in content_names, \
  220. 'Brownie Recipe.html should be in names ({0})'.format(
  221. content_names,
  222. )
  223. assert 'Best Cakesʔ.html' in content_names,\
  224. 'Best Cakesʔ.html should be in names ({0})'.format(
  225. content_names,
  226. )
  227. assert 'Apple_Pie.txt' in content_names,\
  228. 'Apple_Pie.txt should be in names ({0})'.format(content_names,)
  229. assert 'Fruits Desserts' in content_names, \
  230. 'Fruits Desserts should be in names ({0})'.format(
  231. content_names,
  232. )
  233. assert 'Tiramisu Recipe.html' in content_names,\
  234. 'Tiramisu Recipe.html should be in names ({0})'.format(
  235. content_names,
  236. )
  237. def test_unit__get_content__ok(self):
  238. provider = self._get_provider(self.app_config)
  239. pie = provider.getResourceInst(
  240. '/Recipes/Desserts/Apple_Pie.txt',
  241. self._get_environ(
  242. provider,
  243. 'bob@fsf.local',
  244. )
  245. )
  246. assert pie, 'Apple_Pie should be found'
  247. eq_('Apple_Pie.txt', pie.name)
  248. def test_unit__delete_content__ok(self):
  249. provider = self._get_provider(self.app_config)
  250. pie = provider.getResourceInst(
  251. '/Recipes/Desserts/Apple_Pie.txt',
  252. self._get_environ(
  253. provider,
  254. 'bob@fsf.local',
  255. )
  256. )
  257. content_pie = self.session.query(ContentRevisionRO) \
  258. .filter(Content.label == 'Apple_Pie') \
  259. .one() # It must exist only one revision, cf fixtures
  260. eq_(
  261. False,
  262. content_pie.is_deleted,
  263. msg='Content should not be deleted !'
  264. )
  265. content_pie_id = content_pie.content_id
  266. pie.delete()
  267. self.session.flush()
  268. content_pie = self.session.query(ContentRevisionRO) \
  269. .filter(Content.content_id == content_pie_id) \
  270. .order_by(Content.revision_id.desc()) \
  271. .first()
  272. eq_(
  273. True,
  274. content_pie.is_deleted,
  275. msg='Content should be deleted!'
  276. )
  277. result = provider.getResourceInst(
  278. '/Recipes/Desserts/Apple_Pie.txt',
  279. self._get_environ(
  280. provider,
  281. 'bob@fsf.local',
  282. )
  283. )
  284. eq_(None, result, msg='Result should be None instead {0}'.format(
  285. result
  286. ))
  287. def test_unit__create_content__ok(self):
  288. provider = self._get_provider(self.app_config)
  289. environ = self._get_environ(
  290. provider,
  291. 'bob@fsf.local',
  292. )
  293. result = provider.getResourceInst(
  294. '/Recipes/Salads/greek_salad.txt',
  295. environ,
  296. )
  297. eq_(None, result, msg='Result should be None instead {0}'.format(
  298. result
  299. ))
  300. result = self._put_new_text_file(
  301. provider,
  302. environ,
  303. '/Recipes/Salads/greek_salad.txt',
  304. b'Greek Salad\n',
  305. )
  306. assert result, 'Result should not be None instead {0}'.format(
  307. result
  308. )
  309. eq_(
  310. b'Greek Salad\n',
  311. result.content.depot_file.file.read(),
  312. msg='fiel content should be "Greek Salad\n" but it is {0}'.format(
  313. result.content.depot_file.file.read()
  314. )
  315. )
  316. def test_unit__create_delete_and_create_file__ok(self):
  317. provider = self._get_provider(self.app_config)
  318. environ = self._get_environ(
  319. provider,
  320. 'bob@fsf.local',
  321. )
  322. new_file = provider.getResourceInst(
  323. '/Recipes/Salads/greek_salad.txt',
  324. environ,
  325. )
  326. eq_(None, new_file, msg='Result should be None instead {0}'.format(
  327. new_file
  328. ))
  329. # create it
  330. new_file = self._put_new_text_file(
  331. provider,
  332. environ,
  333. '/Recipes/Salads/greek_salad.txt',
  334. b'Greek Salad\n',
  335. )
  336. assert new_file, 'Result should not be None instead {0}'.format(
  337. new_file
  338. )
  339. content_new_file = self.session.query(ContentRevisionRO) \
  340. .filter(Content.label == 'greek_salad') \
  341. .one() # It must exist only one revision
  342. eq_(
  343. False,
  344. content_new_file.is_deleted,
  345. msg='Content should not be deleted!'
  346. )
  347. content_new_file_id = content_new_file.content_id
  348. # Delete if
  349. new_file.delete()
  350. self.session.flush()
  351. content_pie = self.session.query(ContentRevisionRO) \
  352. .filter(Content.content_id == content_new_file_id) \
  353. .order_by(Content.revision_id.desc()) \
  354. .first()
  355. eq_(
  356. True,
  357. content_pie.is_deleted,
  358. msg='Content should be deleted!'
  359. )
  360. result = provider.getResourceInst(
  361. '/Recipes/Salads/greek_salad.txt',
  362. self._get_environ(
  363. provider,
  364. 'bob@fsf.local',
  365. )
  366. )
  367. eq_(None, result, msg='Result should be None instead {0}'.format(
  368. result
  369. ))
  370. # Then create it again
  371. new_file = self._put_new_text_file(
  372. provider,
  373. environ,
  374. '/Recipes/Salads/greek_salad.txt',
  375. b'greek_salad\n',
  376. )
  377. assert new_file, 'Result should not be None instead {0}'.format(
  378. new_file
  379. )
  380. # Previous file is still dleeted
  381. self.session.flush()
  382. content_pie = self.session.query(ContentRevisionRO) \
  383. .filter(Content.content_id == content_new_file_id) \
  384. .order_by(Content.revision_id.desc()) \
  385. .first()
  386. eq_(
  387. True,
  388. content_pie.is_deleted,
  389. msg='Content should be deleted!'
  390. )
  391. # And an other file exist for this name
  392. content_new_new_file = self.session.query(ContentRevisionRO) \
  393. .filter(Content.label == 'greek_salad') \
  394. .order_by(Content.revision_id.desc()) \
  395. .first()
  396. assert content_new_new_file.content_id != content_new_file_id,\
  397. 'Contents ids should not be same!'
  398. eq_(
  399. False,
  400. content_new_new_file.is_deleted,
  401. msg='Content should not be deleted!'
  402. )
  403. def test_unit__rename_content__ok(self):
  404. provider = self._get_provider(self.app_config)
  405. environ = self._get_environ(
  406. provider,
  407. 'bob@fsf.local',
  408. )
  409. pie = provider.getResourceInst(
  410. '/Recipes/Desserts/Apple_Pie.txt',
  411. environ,
  412. )
  413. content_pie = self.session.query(ContentRevisionRO) \
  414. .filter(Content.label == 'Apple_Pie') \
  415. .one() # It must exist only one revision, cf fixtures
  416. assert content_pie, 'Apple_Pie should be exist'
  417. content_pie_id = content_pie.content_id
  418. pie.moveRecursive('/Recipes/Desserts/Apple_Pie_RENAMED.txt')
  419. # Database content is renamed
  420. content_pie = self.session.query(ContentRevisionRO) \
  421. .filter(ContentRevisionRO.content_id == content_pie_id) \
  422. .order_by(ContentRevisionRO.revision_id.desc()) \
  423. .first()
  424. eq_(
  425. 'Apple_Pie_RENAMED',
  426. content_pie.label,
  427. msg='File should be labeled Apple_Pie_RENAMED, not {0}'.format(
  428. content_pie.label
  429. )
  430. )
  431. def test_unit__move_content__ok(self):
  432. provider = self._get_provider(self.app_config)
  433. environ = self._get_environ(
  434. provider,
  435. 'bob@fsf.local',
  436. )
  437. pie = provider.getResourceInst(
  438. '/Recipes/Desserts/Apple_Pie.txt',
  439. environ,
  440. )
  441. content_pie = self.session.query(ContentRevisionRO) \
  442. .filter(Content.label == 'Apple_Pie') \
  443. .one() # It must exist only one revision, cf fixtures
  444. assert content_pie, 'Apple_Pie should be exist'
  445. content_pie_id = content_pie.content_id
  446. content_pie_parent = content_pie.parent
  447. eq_(
  448. content_pie_parent.label,
  449. 'Desserts',
  450. msg='field parent should be Desserts',
  451. )
  452. pie.moveRecursive('/Recipes/Salads/Apple_Pie.txt') # move in f2
  453. # Database content is moved
  454. content_pie = self.session.query(ContentRevisionRO) \
  455. .filter(ContentRevisionRO.content_id == content_pie_id) \
  456. .order_by(ContentRevisionRO.revision_id.desc()) \
  457. .first()
  458. assert content_pie.parent.label != content_pie_parent.label,\
  459. 'file should be moved in Salads but is in {0}'.format(
  460. content_pie.parent.label
  461. )
  462. def test_unit__move_and_rename_content__ok(self):
  463. provider = self._get_provider(self.app_config)
  464. environ = self._get_environ(
  465. provider,
  466. 'bob@fsf.local',
  467. )
  468. pie = provider.getResourceInst(
  469. '/Recipes/Desserts/Apple_Pie.txt',
  470. environ,
  471. )
  472. content_pie = self.session.query(ContentRevisionRO) \
  473. .filter(Content.label == 'Apple_Pie') \
  474. .one() # It must exist only one revision, cf fixtures
  475. assert content_pie, 'Apple_Pie should be exist'
  476. content_pie_id = content_pie.content_id
  477. content_pie_parent = content_pie.parent
  478. eq_(
  479. content_pie_parent.label,
  480. 'Desserts',
  481. msg='field parent should be Desserts',
  482. )
  483. pie.moveRecursive('/Others/Infos/Apple_Pie_RENAMED.txt')
  484. # Database content is moved
  485. content_pie = self.session.query(ContentRevisionRO) \
  486. .filter(ContentRevisionRO.content_id == content_pie_id) \
  487. .order_by(ContentRevisionRO.revision_id.desc()) \
  488. .first()
  489. assert content_pie.parent.label != content_pie_parent.label,\
  490. 'file should be moved in Recipesf2 but is in {0}'.format(
  491. content_pie.parent.label
  492. )
  493. eq_(
  494. 'Apple_Pie_RENAMED',
  495. content_pie.label,
  496. msg='File should be labeled Apple_Pie_RENAMED, not {0}'.format(
  497. content_pie.label
  498. )
  499. )
  500. def test_unit__move_content__ok__another_workspace(self):
  501. provider = self._get_provider(self.app_config)
  502. environ = self._get_environ(
  503. provider,
  504. 'bob@fsf.local',
  505. )
  506. content_to_move_res = provider.getResourceInst(
  507. '/Recipes/Desserts/Apple_Pie.txt',
  508. environ,
  509. )
  510. content_to_move = self.session.query(ContentRevisionRO) \
  511. .filter(Content.label == 'Apple_Pie') \
  512. .one() # It must exist only one revision, cf fixtures
  513. assert content_to_move, 'Apple_Pie should be exist'
  514. content_to_move_id = content_to_move.content_id
  515. content_to_move_parent = content_to_move.parent
  516. eq_(
  517. content_to_move_parent.label,
  518. 'Desserts',
  519. msg='field parent should be Desserts',
  520. )
  521. content_to_move_res.moveRecursive('/Others/Infos/Apple_Pie.txt') # move in Business, f1
  522. # Database content is moved
  523. content_to_move = self.session.query(ContentRevisionRO) \
  524. .filter(ContentRevisionRO.content_id == content_to_move_id) \
  525. .order_by(ContentRevisionRO.revision_id.desc()) \
  526. .first()
  527. assert content_to_move.parent, 'Content should have a parent'
  528. assert content_to_move.parent.label == 'Infos',\
  529. 'file should be moved in Infos but is in {0}'.format(
  530. content_to_move.parent.label
  531. )
  532. def test_unit__update_content__ok(self):
  533. provider = self._get_provider(self.app_config)
  534. environ = self._get_environ(
  535. provider,
  536. 'bob@fsf.local',
  537. )
  538. result = provider.getResourceInst(
  539. '/Recipes/Salads/greek_salad.txt',
  540. environ,
  541. )
  542. eq_(None, result, msg='Result should be None instead {0}'.format(
  543. result
  544. ))
  545. result = self._put_new_text_file(
  546. provider,
  547. environ,
  548. '/Recipes/Salads/greek_salad.txt',
  549. b'hello\n',
  550. )
  551. assert result, 'Result should not be None instead {0}'.format(
  552. result
  553. )
  554. eq_(
  555. b'hello\n',
  556. result.content.depot_file.file.read(),
  557. msg='fiel content should be "hello\n" but it is {0}'.format(
  558. result.content.depot_file.file.read()
  559. )
  560. )
  561. # ReInit DummyNotifier counter
  562. DummyNotifier.send_count = 0
  563. # Update file content
  564. write_object = result.beginWrite(
  565. contentType='application/octet-stream',
  566. )
  567. write_object.write(b'An other line')
  568. write_object.close()
  569. result.endWrite(withErrors=False)
  570. eq_(
  571. 1,
  572. DummyNotifier.send_count,
  573. msg='DummyNotifier should send 1 mail, not {}'.format(
  574. DummyNotifier.send_count
  575. ),
  576. )