test_webdav.py 19KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. # -*- coding: utf-8 -*-
  2. import io
  3. from tracim.lib.core.user import UserApi
  4. from tracim.tests import eq_
  5. from tracim.lib.core.notifications import DummyNotifier
  6. from tracim.lib.webdav.dav_provider import Provider
  7. from tracim.lib.webdav.resources import Root
  8. from tracim.models import Content
  9. from tracim.models import ContentRevisionRO
  10. from tracim.tests import StandardTest
  11. from tracim.fixtures.content import Content as ContentFixtures
  12. from tracim.fixtures.users_and_groups import Base as BaseFixture
  13. from wsgidav import util
  14. class TestWebDav(StandardTest):
  15. fixtures = [BaseFixture, ContentFixtures]
  16. def _get_provider(self, config):
  17. return Provider(
  18. show_archived=False,
  19. show_deleted=False,
  20. show_history=False,
  21. app_config=config,
  22. )
  23. def _get_environ(
  24. self,
  25. provider: Provider,
  26. username: str,
  27. ) -> dict:
  28. return {
  29. 'http_authenticator.username': username,
  30. 'http_authenticator.realm': '/',
  31. 'wsgidav.provider': provider,
  32. 'tracim_user': self._get_user(username),
  33. 'tracim_dbsession': self.session,
  34. }
  35. def _get_user(self, email):
  36. return UserApi(None,
  37. self.session,
  38. self.app_config
  39. ).get_one_by_email(email)
  40. def _put_new_text_file(
  41. self,
  42. provider,
  43. environ,
  44. file_path,
  45. file_content,
  46. ):
  47. # This part id a reproduction of
  48. # wsgidav.request_server.RequestServer#doPUT
  49. # Grab parent folder where create file
  50. parentRes = provider.getResourceInst(
  51. util.getUriParent(file_path),
  52. environ,
  53. )
  54. assert parentRes, 'we should found folder for {0}'.format(file_path)
  55. new_resource = parentRes.createEmptyResource(
  56. util.getUriName(file_path),
  57. )
  58. write_object = new_resource.beginWrite(
  59. contentType='application/octet-stream',
  60. )
  61. write_object.write(file_content)
  62. write_object.close()
  63. new_resource.endWrite(withErrors=False)
  64. # Now file should exist
  65. return provider.getResourceInst(
  66. file_path,
  67. environ,
  68. )
  69. def test_unit__get_root__ok(self):
  70. provider = self._get_provider(self.app_config)
  71. root = provider.getResourceInst(
  72. '/',
  73. self._get_environ(
  74. provider,
  75. 'bob@fsf.local',
  76. )
  77. )
  78. assert root, 'Path / should return a Root instance'
  79. assert isinstance(root, Root)
  80. def test_unit__list_workspaces_with_user__ok(self):
  81. provider = self._get_provider(self.app_config)
  82. root = provider.getResourceInst(
  83. '/',
  84. self._get_environ(
  85. provider,
  86. 'bob@fsf.local',
  87. )
  88. )
  89. assert root, 'Path / should return a Root instance'
  90. assert isinstance(root, Root), 'Path / should return a Root instance'
  91. children = root.getMemberList()
  92. eq_(
  93. 2,
  94. len(children),
  95. msg='Root should return 2 workspaces instead {0}'.format(
  96. len(children),
  97. )
  98. )
  99. workspaces_names = [w.name for w in children]
  100. assert 'w1' in workspaces_names, \
  101. 'w1 should be in names ({0})'.format(
  102. workspaces_names,
  103. )
  104. assert 'w2' in workspaces_names, 'w2 should be in names ({0})'.format(
  105. workspaces_names,
  106. )
  107. def test_unit__list_workspaces_with_admin__ok(self):
  108. provider = self._get_provider(self.app_config)
  109. root = provider.getResourceInst(
  110. '/',
  111. self._get_environ(
  112. provider,
  113. 'admin@admin.admin',
  114. )
  115. )
  116. assert root, 'Path / should return a Root instance'
  117. assert isinstance(root, Root), 'Path / should return a Root instance'
  118. children = root.getMemberList()
  119. eq_(
  120. 2,
  121. len(children),
  122. msg='Root should return 2 workspaces instead {0}'.format(
  123. len(children),
  124. )
  125. )
  126. workspaces_names = [w.name for w in children]
  127. assert 'w1' in workspaces_names, 'w1 should be in names ({0})'.format(
  128. workspaces_names,
  129. )
  130. assert 'w3' in workspaces_names, 'w3 should be in names ({0})'.format(
  131. workspaces_names,
  132. )
  133. def test_unit__list_workspace_folders__ok(self):
  134. provider = self._get_provider(self.app_config)
  135. w1 = provider.getResourceInst(
  136. '/w1/',
  137. self._get_environ(
  138. provider,
  139. 'bob@fsf.local',
  140. )
  141. )
  142. assert w1, 'Path /w1 should return a Wrkspace instance'
  143. children = w1.getMemberList()
  144. eq_(
  145. 2,
  146. len(children),
  147. msg='w1 should list 2 folders instead {0}'.format(
  148. len(children),
  149. ),
  150. )
  151. folders_names = [f.name for f in children]
  152. assert 'w1f1' in folders_names, 'w1f1 should be in names ({0})'.format(
  153. folders_names,
  154. )
  155. assert 'w1f2' in folders_names, 'w1f2 should be in names ({0})'.format(
  156. folders_names,
  157. )
  158. def test_unit__list_content__ok(self):
  159. provider = self._get_provider(self.app_config)
  160. w1f1 = provider.getResourceInst(
  161. '/w1/w1f1',
  162. self._get_environ(
  163. provider,
  164. 'bob@fsf.local',
  165. )
  166. )
  167. assert w1f1, 'Path /w1f1 should return a Wrkspace instance'
  168. children = w1f1.getMemberList()
  169. eq_(
  170. 5,
  171. len(children),
  172. msg='w1f1 should list 5 folders instead {0}'.format(
  173. len(children),
  174. ),
  175. )
  176. content_names = [c.name for c in children]
  177. assert 'w1f1p1.html' in content_names, \
  178. 'w1f1.html should be in names ({0})'.format(
  179. content_names,
  180. )
  181. assert 'w1f1t1.html' in content_names,\
  182. 'w1f1t1.html should be in names ({0})'.format(
  183. content_names,
  184. )
  185. assert 'w1f1d1.txt' in content_names,\
  186. 'w1f1d1.txt should be in names ({0})'.format(content_names,)
  187. assert 'w1f1f1' in content_names, \
  188. 'w1f1f1 should be in names ({0})'.format(
  189. content_names,
  190. )
  191. assert 'w1f1d2.html' in content_names,\
  192. 'w1f1d2.html should be in names ({0})'.format(
  193. content_names,
  194. )
  195. def test_unit__get_content__ok(self):
  196. provider = self._get_provider(self.app_config)
  197. w1f1d1 = provider.getResourceInst(
  198. '/w1/w1f1/w1f1d1.txt',
  199. self._get_environ(
  200. provider,
  201. 'bob@fsf.local',
  202. )
  203. )
  204. assert w1f1d1, 'w1f1d1 should be found'
  205. eq_('w1f1d1.txt', w1f1d1.name)
  206. def test_unit__delete_content__ok(self):
  207. provider = self._get_provider(self.app_config)
  208. w1f1d1 = provider.getResourceInst(
  209. '/w1/w1f1/w1f1d1.txt',
  210. self._get_environ(
  211. provider,
  212. 'bob@fsf.local',
  213. )
  214. )
  215. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  216. .filter(Content.label == 'w1f1d1') \
  217. .one() # It must exist only one revision, cf fixtures
  218. eq_(
  219. False,
  220. content_w1f1d1.is_deleted,
  221. msg='Content should not be deleted !'
  222. )
  223. content_w1f1d1_id = content_w1f1d1.content_id
  224. w1f1d1.delete()
  225. self.session.flush()
  226. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  227. .filter(Content.content_id == content_w1f1d1_id) \
  228. .order_by(Content.revision_id.desc()) \
  229. .first()
  230. eq_(
  231. True,
  232. content_w1f1d1.is_deleted,
  233. msg='Content should be deleted !'
  234. )
  235. result = provider.getResourceInst(
  236. '/w1/w1f1/w1f1d1.txt',
  237. self._get_environ(
  238. provider,
  239. 'bob@fsf.local',
  240. )
  241. )
  242. eq_(None, result, msg='Result should be None instead {0}'.format(
  243. result
  244. ))
  245. def test_unit__create_content__ok(self):
  246. provider = self._get_provider(self.app_config)
  247. environ = self._get_environ(
  248. provider,
  249. 'bob@fsf.local',
  250. )
  251. result = provider.getResourceInst(
  252. '/w1/w1f1/new_file.txt',
  253. environ,
  254. )
  255. eq_(None, result, msg='Result should be None instead {0}'.format(
  256. result
  257. ))
  258. result = self._put_new_text_file(
  259. provider,
  260. environ,
  261. '/w1/w1f1/new_file.txt',
  262. b'hello\n',
  263. )
  264. assert result, 'Result should not be None instead {0}'.format(
  265. result
  266. )
  267. eq_(
  268. b'hello\n',
  269. result.content.depot_file.file.read(),
  270. msg='fiel content should be "hello\n" but it is {0}'.format(
  271. result.content.depot_file.file.read()
  272. )
  273. )
  274. def test_unit__create_delete_and_create_file__ok(self):
  275. provider = self._get_provider(self.app_config)
  276. environ = self._get_environ(
  277. provider,
  278. 'bob@fsf.local',
  279. )
  280. new_file = provider.getResourceInst(
  281. '/w1/w1f1/new_file.txt',
  282. environ,
  283. )
  284. eq_(None, new_file, msg='Result should be None instead {0}'.format(
  285. new_file
  286. ))
  287. # create it
  288. new_file = self._put_new_text_file(
  289. provider,
  290. environ,
  291. '/w1/w1f1/new_file.txt',
  292. b'hello\n',
  293. )
  294. assert new_file, 'Result should not be None instead {0}'.format(
  295. new_file
  296. )
  297. content_new_file = self.session.query(ContentRevisionRO) \
  298. .filter(Content.label == 'new_file') \
  299. .one() # It must exist only one revision
  300. eq_(
  301. False,
  302. content_new_file.is_deleted,
  303. msg='Content should not be deleted !'
  304. )
  305. content_new_file_id = content_new_file.content_id
  306. # Delete if
  307. new_file.delete()
  308. self.session.flush()
  309. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  310. .filter(Content.content_id == content_new_file_id) \
  311. .order_by(Content.revision_id.desc()) \
  312. .first()
  313. eq_(
  314. True,
  315. content_w1f1d1.is_deleted,
  316. msg='Content should be deleted !'
  317. )
  318. result = provider.getResourceInst(
  319. '/w1/w1f1/new_file.txt',
  320. self._get_environ(
  321. provider,
  322. 'bob@fsf.local',
  323. )
  324. )
  325. eq_(None, result, msg='Result should be None instead {0}'.format(
  326. result
  327. ))
  328. # Then create it again
  329. new_file = self._put_new_text_file(
  330. provider,
  331. environ,
  332. '/w1/w1f1/new_file.txt',
  333. b'hello\n',
  334. )
  335. assert new_file, 'Result should not be None instead {0}'.format(
  336. new_file
  337. )
  338. # Previous file is still dleeted
  339. self.session.flush()
  340. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  341. .filter(Content.content_id == content_new_file_id) \
  342. .order_by(Content.revision_id.desc()) \
  343. .first()
  344. eq_(
  345. True,
  346. content_w1f1d1.is_deleted,
  347. msg='Content should be deleted !'
  348. )
  349. # And an other file exist for this name
  350. content_new_new_file = self.session.query(ContentRevisionRO) \
  351. .filter(Content.label == 'new_file') \
  352. .order_by(Content.revision_id.desc()) \
  353. .first()
  354. assert content_new_new_file.content_id != content_new_file_id,\
  355. 'Contents ids should not be same !'
  356. eq_(
  357. False,
  358. content_new_new_file.is_deleted,
  359. msg='Content should not be deleted !'
  360. )
  361. def test_unit__rename_content__ok(self):
  362. provider = self._get_provider(self.app_config)
  363. environ = self._get_environ(
  364. provider,
  365. 'bob@fsf.local',
  366. )
  367. w1f1d1 = provider.getResourceInst(
  368. '/w1/w1f1/w1f1d1.txt',
  369. environ,
  370. )
  371. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  372. .filter(Content.label == 'w1f1d1') \
  373. .one() # It must exist only one revision, cf fixtures
  374. assert content_w1f1d1, 'w1f1d1 should be exist'
  375. content_w1f1d1_id = content_w1f1d1.content_id
  376. w1f1d1.moveRecursive('/w1/w1f1/w1f1d1_RENAMED.txt')
  377. # Database content is renamed
  378. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  379. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  380. .order_by(ContentRevisionRO.revision_id.desc()) \
  381. .first()
  382. eq_(
  383. 'w1f1d1_RENAMED',
  384. content_w1f1d1.label,
  385. msg='File should be labeled w1f1d1_RENAMED, not {0}'.format(
  386. content_w1f1d1.label
  387. )
  388. )
  389. def test_unit__move_content__ok(self):
  390. provider = self._get_provider(self.app_config)
  391. environ = self._get_environ(
  392. provider,
  393. 'bob@fsf.local',
  394. )
  395. w1f1d1 = provider.getResourceInst(
  396. '/w1/w1f1/w1f1d1.txt',
  397. environ,
  398. )
  399. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  400. .filter(Content.label == 'w1f1d1') \
  401. .one() # It must exist only one revision, cf fixtures
  402. assert content_w1f1d1, 'w1f1d1 should be exist'
  403. content_w1f1d1_id = content_w1f1d1.content_id
  404. content_w1f1d1_parent = content_w1f1d1.parent
  405. eq_(
  406. content_w1f1d1_parent.label,
  407. 'w1f1',
  408. msg='field parent should be w1f1',
  409. )
  410. w1f1d1.moveRecursive('/w1/w1f2/w1f1d1.txt') # move in f2
  411. # Database content is moved
  412. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  413. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  414. .order_by(ContentRevisionRO.revision_id.desc()) \
  415. .first()
  416. assert content_w1f1d1.parent.label != content_w1f1d1_parent.label,\
  417. 'file should be moved in w1f2 but is in {0}'.format(
  418. content_w1f1d1.parent.label
  419. )
  420. def test_unit__move_and_rename_content__ok(self):
  421. provider = self._get_provider(self.app_config)
  422. environ = self._get_environ(
  423. provider,
  424. 'bob@fsf.local',
  425. )
  426. w1f1d1 = provider.getResourceInst(
  427. '/w1/w1f1/w1f1d1.txt',
  428. environ,
  429. )
  430. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  431. .filter(Content.label == 'w1f1d1') \
  432. .one() # It must exist only one revision, cf fixtures
  433. assert content_w1f1d1, 'w1f1d1 should be exist'
  434. content_w1f1d1_id = content_w1f1d1.content_id
  435. content_w1f1d1_parent = content_w1f1d1.parent
  436. eq_(
  437. content_w1f1d1_parent.label,
  438. 'w1f1',
  439. msg='field parent should be w1f1',
  440. )
  441. w1f1d1.moveRecursive('/w1/w1f2/w1f1d1_RENAMED.txt')
  442. # Database content is moved
  443. content_w1f1d1 = self.session.query(ContentRevisionRO) \
  444. .filter(ContentRevisionRO.content_id == content_w1f1d1_id) \
  445. .order_by(ContentRevisionRO.revision_id.desc()) \
  446. .first()
  447. assert content_w1f1d1.parent.label != content_w1f1d1_parent.label,\
  448. 'file should be moved in w1f2 but is in {0}'.format(
  449. content_w1f1d1.parent.label
  450. )
  451. eq_(
  452. 'w1f1d1_RENAMED',
  453. content_w1f1d1.label,
  454. msg='File should be labeled w1f1d1_RENAMED, not {0}'.format(
  455. content_w1f1d1.label
  456. )
  457. )
  458. def test_unit__move_content__ok__another_workspace(self):
  459. provider = self._get_provider(self.app_config)
  460. environ = self._get_environ(
  461. provider,
  462. 'bob@fsf.local',
  463. )
  464. content_to_move_res = provider.getResourceInst(
  465. '/w1/w1f1/w1f1d1.txt',
  466. environ,
  467. )
  468. content_to_move = self.session.query(ContentRevisionRO) \
  469. .filter(Content.label == 'w1f1d1') \
  470. .one() # It must exist only one revision, cf fixtures
  471. assert content_to_move, 'w1f1d1 should be exist'
  472. content_to_move_id = content_to_move.content_id
  473. content_to_move_parent = content_to_move.parent
  474. eq_(
  475. content_to_move_parent.label,
  476. 'w1f1',
  477. msg='field parent should be w1f1',
  478. )
  479. content_to_move_res.moveRecursive('/w2/w2f1/w1f1d1.txt') # move in w2, f1
  480. # Database content is moved
  481. content_to_move = self.session.query(ContentRevisionRO) \
  482. .filter(ContentRevisionRO.content_id == content_to_move_id) \
  483. .order_by(ContentRevisionRO.revision_id.desc()) \
  484. .first()
  485. assert content_to_move.parent, 'Content should have a parent'
  486. assert content_to_move.parent.label == 'w2f1',\
  487. 'file should be moved in w2f1 but is in {0}'.format(
  488. content_to_move.parent.label
  489. )
  490. def test_unit__update_content__ok(self):
  491. provider = self._get_provider(self.app_config)
  492. environ = self._get_environ(
  493. provider,
  494. 'bob@fsf.local',
  495. )
  496. result = provider.getResourceInst(
  497. '/w1/w1f1/new_file.txt',
  498. environ,
  499. )
  500. eq_(None, result, msg='Result should be None instead {0}'.format(
  501. result
  502. ))
  503. result = self._put_new_text_file(
  504. provider,
  505. environ,
  506. '/w1/w1f1/new_file.txt',
  507. b'hello\n',
  508. )
  509. assert result, 'Result should not be None instead {0}'.format(
  510. result
  511. )
  512. eq_(
  513. b'hello\n',
  514. result.content.depot_file.file.read(),
  515. msg='fiel content should be "hello\n" but it is {0}'.format(
  516. result.content.depot_file.file.read()
  517. )
  518. )
  519. # ReInit DummyNotifier counter
  520. DummyNotifier.send_count = 0
  521. # Update file content
  522. write_object = result.beginWrite(
  523. contentType='application/octet-stream',
  524. )
  525. write_object.write(b'An other line')
  526. write_object.close()
  527. result.endWrite(withErrors=False)
  528. eq_(
  529. 1,
  530. DummyNotifier.send_count,
  531. msg='DummyNotifier should send 1 mail, not {}'.format(
  532. DummyNotifier.send_count
  533. ),
  534. )