test_webdav.py 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. # -*- coding: utf-8 -*-
  2. import io
  3. import pytest
  4. from sqlalchemy.exc import InvalidRequestError
  5. from wsgidav.wsgidav_app import DEFAULT_CONFIG
  6. from tracim import WebdavAppFactory
  7. from tracim.lib.core.user import UserApi
  8. from tracim.lib.webdav import TracimDomainController
  9. from tracim.tests import eq_
  10. from tracim.lib.core.notifications import DummyNotifier
  11. from tracim.lib.webdav.dav_provider import Provider
  12. from tracim.lib.webdav.resources import RootResource
  13. from tracim.models import Content
  14. from tracim.models import ContentRevisionRO
  15. from tracim.tests import StandardTest
  16. from tracim.fixtures.content import Content as ContentFixtures
  17. from tracim.fixtures.users_and_groups import Base as BaseFixture
  18. from wsgidav import util
  19. from unittest.mock import MagicMock
  20. class TestWebdavFactory(StandardTest):
  21. def test_unit__initConfig__ok__nominal_case(self):
  22. """
  23. Check if config is correctly modify for wsgidav using mocked
  24. wsgidav and tracim conf (as dict)
  25. :return:
  26. """
  27. tracim_settings = {
  28. 'sqlalchemy.url': 'sqlite:///:memory:',
  29. 'user.auth_token.validity': '604800',
  30. 'depot_storage_dir': '/tmp/test/depot',
  31. 'depot_storage_name': 'test',
  32. 'preview_cache_dir': '/tmp/test/preview_cache',
  33. 'wsgidav.config_path': 'development.ini'
  34. }
  35. wsgidav_setting = DEFAULT_CONFIG.copy()
  36. wsgidav_setting.update(
  37. {
  38. 'root_path': '',
  39. 'acceptbasic': True,
  40. 'acceptdigest': False,
  41. 'defaultdigest': False,
  42. }
  43. )
  44. mock = MagicMock()
  45. mock._initConfig = WebdavAppFactory._initConfig
  46. mock._readConfigFile.return_value = wsgidav_setting
  47. mock._get_tracim_settings.return_value = tracim_settings
  48. config = mock._initConfig(mock)
  49. assert config
  50. assert config['acceptbasic'] is True
  51. assert config['acceptdigest'] is False
  52. assert config['defaultdigest'] is False
  53. # TODO - G.M - 25-05-2018 - Better check for middleware stack config
  54. assert 'middleware_stack' in config
  55. assert len(config['middleware_stack']) == 7
  56. assert 'root_path' in config
  57. assert 'provider_mapping' in config
  58. assert config['root_path'] in config['provider_mapping']
  59. assert isinstance(config['provider_mapping'][config['root_path']], Provider) # nopep8
  60. assert 'domaincontroller' in config
  61. assert isinstance(config['domaincontroller'], TracimDomainController)
  62. class TestWebDav(StandardTest):
  63. fixtures = [BaseFixture, ContentFixtures]
  64. def _get_provider(self, config):
  65. return Provider(
  66. show_archived=False,
  67. show_deleted=False,
  68. show_history=False,
  69. app_config=config,
  70. )
  71. def _get_environ(
  72. self,
  73. provider: Provider,
  74. username: str,
  75. ) -> dict:
  76. return {
  77. 'http_authenticator.username': username,
  78. 'http_authenticator.realm': '/',
  79. 'wsgidav.provider': provider,
  80. 'tracim_user': self._get_user(username),
  81. 'tracim_dbsession': self.session,
  82. }
  83. def _get_user(self, email):
  84. return UserApi(None,
  85. self.session,
  86. self.app_config
  87. ).get_one_by_email(email)
  88. def _put_new_text_file(
  89. self,
  90. provider,
  91. environ,
  92. file_path,
  93. file_content,
  94. ):
  95. # This part id a reproduction of
  96. # wsgidav.request_server.RequestServer#doPUT
  97. # Grab parent folder where create file
  98. parentRes = provider.getResourceInst(
  99. util.getUriParent(file_path),
  100. environ,
  101. )
  102. assert parentRes, 'we should found folder for {0}'.format(file_path)
  103. new_resource = parentRes.createEmptyResource(
  104. util.getUriName(file_path),
  105. )
  106. write_object = new_resource.beginWrite(
  107. contentType='application/octet-stream',
  108. )
  109. write_object.write(file_content)
  110. write_object.close()
  111. new_resource.endWrite(withErrors=False)
  112. # Now file should exist
  113. return provider.getResourceInst(
  114. file_path,
  115. environ,
  116. )
  117. def test_unit__get_root__ok(self):
  118. provider = self._get_provider(self.app_config)
  119. root = provider.getResourceInst(
  120. '/',
  121. self._get_environ(
  122. provider,
  123. 'bob@fsf.local',
  124. )
  125. )
  126. assert root, 'Path / should return a RootResource instance'
  127. assert isinstance(root, RootResource)
  128. def test_unit__list_workspaces_with_user__ok(self):
  129. provider = self._get_provider(self.app_config)
  130. root = provider.getResourceInst(
  131. '/',
  132. self._get_environ(
  133. provider,
  134. 'bob@fsf.local',
  135. )
  136. )
  137. assert root, 'Path / should return a RootResource instance'
  138. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  139. children = root.getMemberList()
  140. eq_(
  141. 2,
  142. len(children),
  143. msg='RootResource should return 2 workspaces instead {0}'.format(
  144. len(children),
  145. )
  146. )
  147. workspaces_names = [w.name for w in children]
  148. assert 'w1' in workspaces_names, \
  149. 'w1 should be in names ({0})'.format(
  150. workspaces_names,
  151. )
  152. assert 'w2' in workspaces_names, 'w2 should be in names ({0})'.format(
  153. workspaces_names,
  154. )
  155. def test_unit__list_workspaces_with_admin__ok(self):
  156. provider = self._get_provider(self.app_config)
  157. root = provider.getResourceInst(
  158. '/',
  159. self._get_environ(
  160. provider,
  161. 'admin@admin.admin',
  162. )
  163. )
  164. assert root, 'Path / should return a RootResource instance'
  165. assert isinstance(root, RootResource), 'Path / should return a RootResource instance'
  166. children = root.getMemberList()
  167. eq_(
  168. 2,
  169. len(children),
  170. msg='RootResource should return 2 workspaces instead {0}'.format(
  171. len(children),
  172. )
  173. )
  174. workspaces_names = [w.name for w in children]
  175. assert 'w1' in workspaces_names, 'w1 should be in names ({0})'.format(
  176. workspaces_names,
  177. )
  178. assert 'w3' in workspaces_names, 'w3 should be in names ({0})'.format(
  179. workspaces_names,
  180. )
  181. def test_unit__list_workspace_folders__ok(self):
  182. provider = self._get_provider(self.app_config)
  183. w1 = provider.getResourceInst(
  184. '/w1/',
  185. self._get_environ(
  186. provider,
  187. 'bob@fsf.local',
  188. )
  189. )
  190. assert w1, 'Path /w1 should return a Wrkspace instance'
  191. children = w1.getMemberList()
  192. eq_(
  193. 2,
  194. len(children),
  195. msg='w1 should list 2 folders instead {0}'.format(
  196. len(children),
  197. ),
  198. )
  199. folders_names = [f.name for f in children]
  200. assert 'w1f1' in folders_names, 'w1f1 should be in names ({0})'.format(
  201. folders_names,
  202. )
  203. assert 'w1f2' in folders_names, 'w1f2 should be in names ({0})'.format(
  204. folders_names,
  205. )
  206. def test_unit__list_content__ok(self):
  207. provider = self._get_provider(self.app_config)
  208. w1f1 = provider.getResourceInst(
  209. '/w1/w1f1',
  210. self._get_environ(
  211. provider,
  212. 'bob@fsf.local',
  213. )
  214. )
  215. assert w1f1, 'Path /w1f1 should return a Wrkspace instance'
  216. children = w1f1.getMemberList()
  217. eq_(
  218. 5,
  219. len(children),
  220. msg='w1f1 should list 5 folders instead {0}'.format(
  221. len(children),
  222. ),
  223. )
  224. content_names = [c.name for c in children]
  225. assert 'w1f1p1.html' in content_names, \
  226. 'w1f1.html should be in names ({0})'.format(
  227. content_names,
  228. )
  229. assert 'w1f1t1.html' in content_names,\
  230. 'w1f1t1.html should be in names ({0})'.format(
  231. content_names,
  232. )
  233. assert 'w1f1d1.txt' in content_names,\
  234. 'w1f1d1.txt should be in names ({0})'.format(content_names,)
  235. assert 'w1f1f1' in content_names, \
  236. 'w1f1f1 should be in names ({0})'.format(
  237. content_names,
  238. )
  239. assert 'w1f1d2.html' in content_names,\
  240. 'w1f1d2.html should be in names ({0})'.format(
  241. content_names,
  242. )
  243. def test_unit__get_content__ok(self):
  244. provider = self._get_provider(self.app_config)
  245. w1f1d1 = provider.getResourceInst(
  246. '/w1/w1f1/w1f1d1.txt',
  247. self._get_environ(
  248. provider,
  249. 'bob@fsf.local',
  250. )
  251. )
  252. assert w1f1d1, 'w1f1d1 should be found'
  253. eq_('w1f1d1.txt', w1f1d1.name)
  254. def test_unit__delete_content__ok(self):
  255. provider = self._get_provider(self.app_config)
  256. w1f1d1 = provider.getResourceInst(
  257. '/w1/w1f1/w1f1d1.txt',
  258. self._get_environ(
  259. provider,
  260. 'bob@fsf.local',
  261. )
  262. )
  263. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  264. .filter(Content.label == 'w1f1d1') \
  265. .one() # It must exist only one revision, cf fixtures
  266. eq_(
  267. False,
  268. content_w1f1d1.is_deleted,
  269. msg='Content should not be deleted !'
  270. )
  271. content_w1f1d1_id = content_w1f1d1.content_id
  272. w1f1d1.delete()
  273. self.session.flush()
  274. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  275. .filter(Content.content_id == content_w1f1d1_id) \
  276. .order_by(Content.revision_id.desc()) \
  277. .first()
  278. eq_(
  279. True,
  280. content_w1f1d1.is_deleted,
  281. msg='Content should be deleted !'
  282. )
  283. result = provider.getResourceInst(
  284. '/w1/w1f1/w1f1d1.txt',
  285. self._get_environ(
  286. provider,
  287. 'bob@fsf.local',
  288. )
  289. )
  290. eq_(None, result, msg='Result should be None instead {0}'.format(
  291. result
  292. ))
  293. def test_unit__create_content__ok(self):
  294. provider = self._get_provider(self.app_config)
  295. environ = self._get_environ(
  296. provider,
  297. 'bob@fsf.local',
  298. )
  299. result = provider.getResourceInst(
  300. '/w1/w1f1/new_file.txt',
  301. environ,
  302. )
  303. eq_(None, result, msg='Result should be None instead {0}'.format(
  304. result
  305. ))
  306. result = self._put_new_text_file(
  307. provider,
  308. environ,
  309. '/w1/w1f1/new_file.txt',
  310. b'hello\n',
  311. )
  312. assert result, 'Result should not be None instead {0}'.format(
  313. result
  314. )
  315. eq_(
  316. b'hello\n',
  317. result.content.depot_file.file.read(),
  318. msg='fiel content should be "hello\n" but it is {0}'.format(
  319. result.content.depot_file.file.read()
  320. )
  321. )
  322. def test_unit__create_delete_and_create_file__ok(self):
  323. provider = self._get_provider(self.app_config)
  324. environ = self._get_environ(
  325. provider,
  326. 'bob@fsf.local',
  327. )
  328. new_file = provider.getResourceInst(
  329. '/w1/w1f1/new_file.txt',
  330. environ,
  331. )
  332. eq_(None, new_file, msg='Result should be None instead {0}'.format(
  333. new_file
  334. ))
  335. # create it
  336. new_file = self._put_new_text_file(
  337. provider,
  338. environ,
  339. '/w1/w1f1/new_file.txt',
  340. b'hello\n',
  341. )
  342. assert new_file, 'Result should not be None instead {0}'.format(
  343. new_file
  344. )
  345. content_new_file = self.session.query(ContentRevisionRO) \
  346. .filter(Content.label == 'new_file') \
  347. .one() # It must exist only one revision
  348. eq_(
  349. False,
  350. content_new_file.is_deleted,
  351. msg='Content should not be deleted !'
  352. )
  353. content_new_file_id = content_new_file.content_id
  354. # Delete if
  355. new_file.delete()
  356. self.session.flush()
  357. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  358. .filter(Content.content_id == content_new_file_id) \
  359. .order_by(Content.revision_id.desc()) \
  360. .first()
  361. eq_(
  362. True,
  363. content_w1f1d1.is_deleted,
  364. msg='Content should be deleted !'
  365. )
  366. result = provider.getResourceInst(
  367. '/w1/w1f1/new_file.txt',
  368. self._get_environ(
  369. provider,
  370. 'bob@fsf.local',
  371. )
  372. )
  373. eq_(None, result, msg='Result should be None instead {0}'.format(
  374. result
  375. ))
  376. # Then create it again
  377. new_file = self._put_new_text_file(
  378. provider,
  379. environ,
  380. '/w1/w1f1/new_file.txt',
  381. b'hello\n',
  382. )
  383. assert new_file, 'Result should not be None instead {0}'.format(
  384. new_file
  385. )
  386. # Previous file is still dleeted
  387. self.session.flush()
  388. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  389. .filter(Content.content_id == content_new_file_id) \
  390. .order_by(Content.revision_id.desc()) \
  391. .first()
  392. eq_(
  393. True,
  394. content_w1f1d1.is_deleted,
  395. msg='Content should be deleted !'
  396. )
  397. # And an other file exist for this name
  398. content_new_new_file = self.session.query(ContentRevisionRO) \
  399. .filter(Content.label == 'new_file') \
  400. .order_by(Content.revision_id.desc()) \
  401. .first()
  402. assert content_new_new_file.content_id != content_new_file_id,\
  403. 'Contents ids should not be same !'
  404. eq_(
  405. False,
  406. content_new_new_file.is_deleted,
  407. msg='Content should not be deleted !'
  408. )
  409. def test_unit__rename_content__ok(self):
  410. provider = self._get_provider(self.app_config)
  411. environ = self._get_environ(
  412. provider,
  413. 'bob@fsf.local',
  414. )
  415. w1f1d1 = provider.getResourceInst(
  416. '/w1/w1f1/w1f1d1.txt',
  417. environ,
  418. )
  419. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  420. .filter(Content.label == 'w1f1d1') \
  421. .one() # It must exist only one revision, cf fixtures
  422. assert content_w1f1d1, 'w1f1d1 should be exist'
  423. content_w1f1d1_id = content_w1f1d1.content_id
  424. w1f1d1.moveRecursive('/w1/w1f1/w1f1d1_RENAMED.txt')
  425. # Database content is renamed
  426. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  427. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  428. .order_by(ContentRevisionRO.revision_id.desc()) \
  429. .first()
  430. eq_(
  431. 'w1f1d1_RENAMED',
  432. content_w1f1d1.label,
  433. msg='File should be labeled w1f1d1_RENAMED, not {0}'.format(
  434. content_w1f1d1.label
  435. )
  436. )
  437. def test_unit__move_content__ok(self):
  438. provider = self._get_provider(self.app_config)
  439. environ = self._get_environ(
  440. provider,
  441. 'bob@fsf.local',
  442. )
  443. w1f1d1 = provider.getResourceInst(
  444. '/w1/w1f1/w1f1d1.txt',
  445. environ,
  446. )
  447. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  448. .filter(Content.label == 'w1f1d1') \
  449. .one() # It must exist only one revision, cf fixtures
  450. assert content_w1f1d1, 'w1f1d1 should be exist'
  451. content_w1f1d1_id = content_w1f1d1.content_id
  452. content_w1f1d1_parent = content_w1f1d1.parent
  453. eq_(
  454. content_w1f1d1_parent.label,
  455. 'w1f1',
  456. msg='field parent should be w1f1',
  457. )
  458. w1f1d1.moveRecursive('/w1/w1f2/w1f1d1.txt') # move in f2
  459. # Database content is moved
  460. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  461. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  462. .order_by(ContentRevisionRO.revision_id.desc()) \
  463. .first()
  464. assert content_w1f1d1.parent.label != content_w1f1d1_parent.label,\
  465. 'file should be moved in w1f2 but is in {0}'.format(
  466. content_w1f1d1.parent.label
  467. )
  468. def test_unit__move_and_rename_content__ok(self):
  469. provider = self._get_provider(self.app_config)
  470. environ = self._get_environ(
  471. provider,
  472. 'bob@fsf.local',
  473. )
  474. w1f1d1 = provider.getResourceInst(
  475. '/w1/w1f1/w1f1d1.txt',
  476. environ,
  477. )
  478. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  479. .filter(Content.label == 'w1f1d1') \
  480. .one() # It must exist only one revision, cf fixtures
  481. assert content_w1f1d1, 'w1f1d1 should be exist'
  482. content_w1f1d1_id = content_w1f1d1.content_id
  483. content_w1f1d1_parent = content_w1f1d1.parent
  484. eq_(
  485. content_w1f1d1_parent.label,
  486. 'w1f1',
  487. msg='field parent should be w1f1',
  488. )
  489. w1f1d1.moveRecursive('/w1/w1f2/w1f1d1_RENAMED.txt')
  490. # Database content is moved
  491. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  492. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  493. .order_by(ContentRevisionRO.revision_id.desc()) \
  494. .first()
  495. assert content_w1f1d1.parent.label != content_w1f1d1_parent.label,\
  496. 'file should be moved in w1f2 but is in {0}'.format(
  497. content_w1f1d1.parent.label
  498. )
  499. eq_(
  500. 'w1f1d1_RENAMED',
  501. content_w1f1d1.label,
  502. msg='File should be labeled w1f1d1_RENAMED, not {0}'.format(
  503. content_w1f1d1.label
  504. )
  505. )
  506. def test_unit__move_content__ok__another_workspace(self):
  507. provider = self._get_provider(self.app_config)
  508. environ = self._get_environ(
  509. provider,
  510. 'bob@fsf.local',
  511. )
  512. content_to_move_res = provider.getResourceInst(
  513. '/w1/w1f1/w1f1d1.txt',
  514. environ,
  515. )
  516. content_to_move = self.session.query(ContentRevisionRO) \
  517. .filter(Content.label == 'w1f1d1') \
  518. .one() # It must exist only one revision, cf fixtures
  519. assert content_to_move, 'w1f1d1 should be exist'
  520. content_to_move_id = content_to_move.content_id
  521. content_to_move_parent = content_to_move.parent
  522. eq_(
  523. content_to_move_parent.label,
  524. 'w1f1',
  525. msg='field parent should be w1f1',
  526. )
  527. content_to_move_res.moveRecursive('/w2/w2f1/w1f1d1.txt') # move in w2, f1
  528. # Database content is moved
  529. content_to_move = self.session.query(ContentRevisionRO) \
  530. .filter(ContentRevisionRO.content_id == content_to_move_id) \
  531. .order_by(ContentRevisionRO.revision_id.desc()) \
  532. .first()
  533. assert content_to_move.parent, 'Content should have a parent'
  534. assert content_to_move.parent.label == 'w2f1',\
  535. 'file should be moved in w2f1 but is in {0}'.format(
  536. content_to_move.parent.label
  537. )
  538. def test_unit__update_content__ok(self):
  539. provider = self._get_provider(self.app_config)
  540. environ = self._get_environ(
  541. provider,
  542. 'bob@fsf.local',
  543. )
  544. result = provider.getResourceInst(
  545. '/w1/w1f1/new_file.txt',
  546. environ,
  547. )
  548. eq_(None, result, msg='Result should be None instead {0}'.format(
  549. result
  550. ))
  551. result = self._put_new_text_file(
  552. provider,
  553. environ,
  554. '/w1/w1f1/new_file.txt',
  555. b'hello\n',
  556. )
  557. assert result, 'Result should not be None instead {0}'.format(
  558. result
  559. )
  560. eq_(
  561. b'hello\n',
  562. result.content.depot_file.file.read(),
  563. msg='fiel content should be "hello\n" but it is {0}'.format(
  564. result.content.depot_file.file.read()
  565. )
  566. )
  567. # ReInit DummyNotifier counter
  568. DummyNotifier.send_count = 0
  569. # Update file content
  570. write_object = result.beginWrite(
  571. contentType='application/octet-stream',
  572. )
  573. write_object.write(b'An other line')
  574. write_object.close()
  575. result.endWrite(withErrors=False)
  576. eq_(
  577. 1,
  578. DummyNotifier.send_count,
  579. msg='DummyNotifier should send 1 mail, not {}'.format(
  580. DummyNotifier.send_count
  581. ),
  582. )