test_content_api.py 68KB


  1. # -*- coding: utf-8 -*-
  2. import transaction
  3. import pytest
  4. from tracim.config import CFG
  5. from tracim.lib.core.content import compare_content_for_sorting_by_type_and_name
  6. from tracim.lib.core.content import ContentApi
  7. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  8. from tracim.lib.core.group import GroupApi
  9. from tracim.lib.core.user import UserApi
  10. from tracim.exceptions import SameValueError
  11. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  12. from tracim.lib.core.workspace import RoleApi
  13. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  14. from tracim.lib.core.workspace import WorkspaceApi
  15. from tracim.models.revision_protection import new_revision
  16. from tracim.models.auth import User
  17. from tracim.models.auth import Group
  18. from tracim.models.data import ActionDescription
  19. from tracim.models.data import ContentRevisionRO
  20. from tracim.models.data import Workspace
  21. from tracim.models.data import Content
  22. from tracim.models.data import ContentType
  23. from tracim.models.data import UserRoleInWorkspace
  24. from tracim.fixtures.users_and_groups import Test as FixtureTest
  25. from tracim.tests import DefaultTest
  26. from tracim.tests import eq_
  27. class TestContentApi(DefaultTest):
  28. def test_compare_content_for_sorting_by_type(self):
  29. c1 = Content()
  30. c1.label = ''
  31. c1.type = 'file'
  32. c2 = Content()
  33. c2.label = ''
  34. c2.type = 'folder'
  35. c11 = c1
  36. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  37. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  38. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  39. def test_compare_content_for_sorting_by_label(self):
  40. c1 = Content()
  41. c1.label = 'bbb'
  42. c1.type = 'file'
  43. c2 = Content()
  44. c2.label = 'aaa'
  45. c2.type = 'file'
  46. c11 = c1
  47. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  48. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  49. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  50. def test_sort_by_label_or_filename(self):
  51. c1 = Content()
  52. c1.label = 'ABCD'
  53. c1.type = 'file'
  54. c2 = Content()
  55. c2.label = ''
  56. c2.type = 'file'
  57. c2.file_name = 'AABC'
  58. c3 = Content()
  59. c3.label = 'BCDE'
  60. c3.type = 'file'
  61. items = [c1, c2, c3]
  62. sorteds = ContentApi.sort_content(items)
  63. eq_(sorteds[0], c2)
  64. eq_(sorteds[1], c1)
  65. eq_(sorteds[2], c3)
  66. def test_sort_by_content_type(self):
  67. c1 = Content()
  68. c1.label = 'AAAA'
  69. c1.type = 'file'
  70. c2 = Content()
  71. c2.label = 'BBBB'
  72. c2.type = 'folder'
  73. items = [c1, c2]
  74. sorteds = ContentApi.sort_content(items)
  75. eq_(sorteds[0], c2,
  76. 'value is {} instead of {}'.format(sorteds[0].content_id,
  77. c2.content_id))
  78. eq_(sorteds[1], c1,
  79. 'value is {} instead of {}'.format(sorteds[1].content_id,
  80. c1.content_id))
  81. def test_delete(self):
  82. uapi = UserApi(
  83. session=self.session,
  84. config=self.app_config,
  85. current_user=None,
  86. )
  87. group_api = GroupApi(
  88. current_user=None,
  89. session=self.session,
  90. config=self.app_config,
  91. )
  92. groups = [group_api.get_one(Group.TIM_USER),
  93. group_api.get_one(Group.TIM_MANAGER),
  94. group_api.get_one(Group.TIM_ADMIN)]
  95. user = uapi.create_minimal_user(email='this.is@user',
  96. groups=groups, save_now=True)
  97. workspace = WorkspaceApi(
  98. current_user=user,
  99. session=self.session,
  100. config=self.app_config,
  101. ).create_workspace('test workspace', save_now=True)
  102. api = ContentApi(
  103. current_user=user,
  104. session=self.session,
  105. config=self.app_config,
  106. )
  107. item = api.create(ContentType.Folder, workspace, None,
  108. 'not_deleted', True)
  109. item2 = api.create(ContentType.Folder, workspace, None,
  110. 'to_delete', True)
  111. uid = user.user_id
  112. wid = workspace.workspace_id
  113. transaction.commit()
  114. # Refresh instances after commit
  115. user = uapi.get_one(uid)
  116. workspace_api = WorkspaceApi(
  117. current_user=user,
  118. session=self.session,
  119. config=self.app_config
  120. )
  121. workspace = workspace_api.get_one(wid)
  122. api = ContentApi(
  123. current_user=user,
  124. session=self.session,
  125. config=self.app_config,
  126. )
  127. items = api.get_all(None, ContentType.Any, workspace)
  128. eq_(2, len(items))
  129. items = api.get_all(None, ContentType.Any, workspace)
  130. with new_revision(
  131. session=self.session,
  132. tm=transaction.manager,
  133. content=items[0]
  134. ):
  135. api.delete(items[0])
  136. transaction.commit()
  137. # Refresh instances after commit
  138. user = uapi.get_one(uid)
  139. workspace_api = WorkspaceApi(
  140. current_user=user,
  141. session=self.session,
  142. config=self.app_config
  143. )
  144. workspace = workspace_api.get_one(wid)
  145. api = ContentApi(
  146. current_user=user,
  147. session=self.session,
  148. config=self.app_config,
  149. )
  150. items = api.get_all(None, ContentType.Any, workspace)
  151. eq_(1, len(items))
  152. transaction.commit()
  153. # Test that the item is still available if "show deleted" is activated
  154. # Refresh instances after commit
  155. user = uapi.get_one(uid)
  156. workspace_api = WorkspaceApi(
  157. current_user=user,
  158. session=self.session,
  159. config=self.app_config,
  160. )
  161. api = ContentApi(
  162. current_user=user,
  163. session=self.session,
  164. config=self.app_config,
  165. show_deleted=True,
  166. )
  167. items = api.get_all(None, ContentType.Any, workspace)
  168. eq_(2, len(items))
  169. def test_archive(self):
  170. uapi = UserApi(
  171. session=self.session,
  172. config=self.app_config,
  173. current_user=None,
  174. )
  175. group_api = GroupApi(
  176. current_user=None,
  177. session=self.session,
  178. config=self.app_config,
  179. )
  180. groups = [group_api.get_one(Group.TIM_USER),
  181. group_api.get_one(Group.TIM_MANAGER),
  182. group_api.get_one(Group.TIM_ADMIN)]
  183. user = uapi.create_minimal_user(
  184. email='this.is@user',
  185. groups=groups,
  186. save_now=True,
  187. )
  188. workspace_api = WorkspaceApi(
  189. current_user=user,
  190. session=self.session,
  191. config=self.app_config,
  192. )
  193. workspace = workspace_api.create_workspace(
  194. 'test workspace',
  195. save_now=True
  196. )
  197. api = ContentApi(
  198. current_user=user,
  199. session=self.session,
  200. config=self.app_config,
  201. )
  202. item = api.create(ContentType.Folder, workspace, None,
  203. 'not_archived', True)
  204. item2 = api.create(ContentType.Folder, workspace, None,
  205. 'to_archive', True)
  206. uid = user.user_id
  207. wid = workspace.workspace_id
  208. transaction.commit()
  209. # Refresh instances after commit
  210. user = uapi.get_one(uid)
  211. workspace_api = WorkspaceApi(
  212. current_user=user,
  213. session=self.session,
  214. config=self.app_config,
  215. )
  216. api = ContentApi(
  217. session=self.session,
  218. current_user=user,
  219. config=self.app_config,
  220. )
  221. items = api.get_all(None, ContentType.Any, workspace)
  222. eq_(2, len(items))
  223. items = api.get_all(None, ContentType.Any, workspace)
  224. with new_revision(
  225. session=self.session,
  226. tm=transaction.manager,
  227. content=items[0],
  228. ):
  229. api.archive(items[0])
  230. transaction.commit()
  231. # Refresh instances after commit
  232. user = uapi.get_one(uid)
  233. workspace_api = WorkspaceApi(
  234. current_user=user,
  235. session=self.session,
  236. config=self.app_config,
  237. )
  238. workspace = workspace_api.get_one(wid)
  239. api = ContentApi(
  240. current_user=user,
  241. session=self.session,
  242. config=self.app_config,
  243. )
  244. items = api.get_all(None, ContentType.Any, workspace)
  245. eq_(1, len(items))
  246. transaction.commit()
  247. # Refresh instances after commit
  248. user = uapi.get_one(uid)
  249. workspace_api = WorkspaceApi(
  250. current_user=user,
  251. session=self.session,
  252. config=self.app_config,
  253. )
  254. workspace = workspace_api.get_one(wid)
  255. api = ContentApi(
  256. current_user=user,
  257. session=self.session,
  258. config=self.app_config,
  259. )
  260. # Test that the item is still available if "show deleted" is activated
  261. api = ContentApi(
  262. current_user=None,
  263. session=self.session,
  264. config=self.app_config,
  265. show_archived=True,
  266. )
  267. items = api.get_all(None, ContentType.Any, workspace)
  268. eq_(2, len(items))
  269. def test_get_all_with_filter(self):
  270. uapi = UserApi(
  271. session=self.session,
  272. config=self.app_config,
  273. current_user=None,
  274. )
  275. group_api = GroupApi(
  276. current_user=None,
  277. session=self.session,
  278. config=self.app_config,
  279. )
  280. groups = [group_api.get_one(Group.TIM_USER),
  281. group_api.get_one(Group.TIM_MANAGER),
  282. group_api.get_one(Group.TIM_ADMIN)]
  283. user = uapi.create_minimal_user(
  284. email='this.is@user',
  285. groups=groups,
  286. save_now=True
  287. )
  288. workspace = WorkspaceApi(
  289. current_user=user,
  290. session=self.session,
  291. config=self.app_config,
  292. ).create_workspace(
  293. 'test workspace',
  294. save_now=True
  295. )
  296. api = ContentApi(
  297. current_user=user,
  298. session=self.session,
  299. config=self.app_config,
  300. )
  301. item = api.create(ContentType.Folder, workspace, None,
  302. 'thefolder', True)
  303. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  304. uid = user.user_id
  305. wid = workspace.workspace_id
  306. transaction.commit()
  307. # Refresh instances after commit
  308. user = uapi.get_one(uid)
  309. workspace_api = WorkspaceApi(
  310. current_user=user,
  311. session=self.session,
  312. config=self.app_config,
  313. )
  314. workspace = workspace_api.get_one(wid)
  315. api = ContentApi(
  316. current_user=user,
  317. session=self.session,
  318. config=self.app_config,
  319. )
  320. items = api.get_all(None, ContentType.Any, workspace)
  321. eq_(2, len(items))
  322. items2 = api.get_all(None, ContentType.File, workspace)
  323. eq_(1, len(items2))
  324. eq_('thefile', items2[0].label)
  325. items3 = api.get_all(None, ContentType.Folder, workspace)
  326. eq_(1, len(items3))
  327. eq_('thefolder', items3[0].label)
  328. def test_get_all_with_parent_id(self):
  329. uapi = UserApi(
  330. session=self.session,
  331. config=self.app_config,
  332. current_user=None,
  333. )
  334. group_api = GroupApi(
  335. current_user=None,
  336. session=self.session,
  337. config=self.app_config
  338. )
  339. groups = [group_api.get_one(Group.TIM_USER),
  340. group_api.get_one(Group.TIM_MANAGER),
  341. group_api.get_one(Group.TIM_ADMIN)]
  342. user = uapi.create_minimal_user(email='this.is@user',
  343. groups=groups, save_now=True)
  344. workspace = WorkspaceApi(
  345. current_user=user,
  346. session=self.session,
  347. config=self.app_config
  348. ).create_workspace('test workspace', save_now=True)
  349. api = ContentApi(
  350. current_user=user,
  351. session=self.session,
  352. config=self.app_config,
  353. )
  354. item = api.create(
  355. ContentType.Folder,
  356. workspace,
  357. None,
  358. 'parent',
  359. do_save=True,
  360. )
  361. item2 = api.create(
  362. ContentType.File,
  363. workspace,
  364. item,
  365. 'file1',
  366. do_save=True,
  367. )
  368. item3 = api.create(
  369. ContentType.File,
  370. workspace,
  371. None,
  372. 'file2',
  373. do_save=True,
  374. )
  375. parent_id = item.content_id
  376. child_id = item2.content_id
  377. uid = user.user_id
  378. wid = workspace.workspace_id
  379. transaction.commit()
  380. # Refresh instances after commit
  381. user = uapi.get_one(uid)
  382. workspace_api = WorkspaceApi(
  383. current_user=user,
  384. session=self.session,
  385. config=self.app_config,
  386. )
  387. workspace = workspace_api.get_one(wid)
  388. api = ContentApi(
  389. current_user=user,
  390. session=self.session,
  391. config=self.app_config,
  392. )
  393. items = api.get_all(None, ContentType.Any, workspace)
  394. eq_(3, len(items))
  395. items2 = api.get_all(parent_id, ContentType.File, workspace)
  396. eq_(1, len(items2))
  397. eq_(child_id, items2[0].content_id)
  398. def test_set_status_unknown_status(self):
  399. uapi = UserApi(
  400. session=self.session,
  401. config=self.app_config,
  402. current_user=None,
  403. )
  404. group_api = GroupApi(
  405. current_user=None,
  406. session=self.session,
  407. config=self.app_config,
  408. )
  409. groups = [group_api.get_one(Group.TIM_USER),
  410. group_api.get_one(Group.TIM_MANAGER),
  411. group_api.get_one(Group.TIM_ADMIN)]
  412. user = uapi.create_minimal_user(email='this.is@user',
  413. groups=groups, save_now=True)
  414. workspace = WorkspaceApi(
  415. current_user=user,
  416. session=self.session,
  417. config=self.app_config,
  418. ).create_workspace(
  419. 'test workspace',
  420. save_now=True
  421. )
  422. api = ContentApi(
  423. current_user=user,
  424. session=self.session,
  425. config=self.app_config,
  426. )
  427. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  428. with new_revision(
  429. session=self.session,
  430. tm=transaction.manager,
  431. content=c,
  432. ):
  433. with pytest.raises(ValueError):
  434. api.set_status(c, 'unknown-status')
  435. def test_set_status_ok(self):
  436. uapi = UserApi(
  437. session=self.session,
  438. config=self.app_config,
  439. current_user=None,
  440. )
  441. group_api = GroupApi(
  442. current_user=None,
  443. session=self.session,
  444. config=self.app_config,
  445. )
  446. groups = [group_api.get_one(Group.TIM_USER),
  447. group_api.get_one(Group.TIM_MANAGER),
  448. group_api.get_one(Group.TIM_ADMIN)]
  449. user = uapi.create_minimal_user(email='this.is@user',
  450. groups=groups, save_now=True)
  451. workspace = WorkspaceApi(
  452. current_user=user,
  453. session=self.session,
  454. config=self.app_config,
  455. ).create_workspace(
  456. 'test workspace',
  457. save_now=True
  458. )
  459. api = ContentApi(
  460. current_user=user,
  461. session=self.session,
  462. config=self.app_config,
  463. )
  464. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  465. with new_revision(
  466. session=self.session,
  467. tm=transaction.manager,
  468. content=c,
  469. ):
  470. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  471. 'closed-deprecated']:
  472. api.set_status(c, new_status)
  473. eq_(new_status, c.status)
  474. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  475. def test_create_comment_ok(self):
  476. uapi = UserApi(
  477. session=self.session,
  478. config=self.app_config,
  479. current_user=None,
  480. )
  481. group_api = GroupApi(
  482. current_user=None,
  483. session=self.session,
  484. config=self.config,
  485. )
  486. groups = [group_api.get_one(Group.TIM_USER),
  487. group_api.get_one(Group.TIM_MANAGER),
  488. group_api.get_one(Group.TIM_ADMIN)]
  489. user = uapi.create_minimal_user(email='this.is@user',
  490. groups=groups, save_now=True)
  491. workspace = WorkspaceApi(
  492. current_user=user,
  493. session=self.session,
  494. config=self.app_config,
  495. ).create_workspace(
  496. 'test workspace',
  497. save_now=True
  498. )
  499. api = ContentApi(
  500. current_user=user,
  501. session=self.session,
  502. config=self.app_config,
  503. )
  504. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  505. c = api.create_comment(workspace, p, 'this is the comment', True)
  506. eq_(Content, c.__class__)
  507. eq_(p.content_id, c.parent_id)
  508. eq_(user, c.owner)
  509. eq_(workspace, c.workspace)
  510. eq_(ContentType.Comment, c.type)
  511. eq_('this is the comment', c.description)
  512. eq_('', c.label)
  513. eq_(ActionDescription.COMMENT, c.revision_type)
  514. def test_unit_copy_file_different_label_different_parent_ok(self):
  515. uapi = UserApi(
  516. session=self.session,
  517. config=self.app_config,
  518. current_user=None,
  519. )
  520. group_api = GroupApi(
  521. current_user=None,
  522. session=self.session,
  523. config=self.app_config
  524. )
  525. groups = [group_api.get_one(Group.TIM_USER),
  526. group_api.get_one(Group.TIM_MANAGER),
  527. group_api.get_one(Group.TIM_ADMIN)]
  528. user = uapi.create_minimal_user(
  529. email='user1@user',
  530. groups=groups,
  531. save_now=True
  532. )
  533. user2 = uapi.create_minimal_user(
  534. email='user2@user',
  535. groups=groups,
  536. save_now=True
  537. )
  538. workspace = WorkspaceApi(
  539. current_user=user,
  540. session=self.session,
  541. config=self.app_config,
  542. ).create_workspace(
  543. 'test workspace',
  544. save_now=True
  545. )
  546. RoleApi(
  547. current_user=user,
  548. session=self.session,
  549. config=self.app_config,
  550. ).create_one(
  551. user2,
  552. workspace,
  553. UserRoleInWorkspace.WORKSPACE_MANAGER,
  554. with_notif=False
  555. )
  556. api = ContentApi(
  557. current_user=user,
  558. session=self.session,
  559. config=self.app_config,
  560. )
  561. foldera = api.create(
  562. ContentType.Folder,
  563. workspace,
  564. None,
  565. 'folder a',
  566. True
  567. )
  568. with self.session.no_autoflush:
  569. text_file = api.create(
  570. content_type=ContentType.File,
  571. workspace=workspace,
  572. parent=foldera,
  573. label='test_file',
  574. do_save=False,
  575. )
  576. api.update_file_data(
  577. text_file,
  578. 'test_file',
  579. 'text/plain',
  580. b'test_content'
  581. )
  582. api.save(text_file, ActionDescription.CREATION)
  583. api2 = ContentApi(
  584. current_user=user2,
  585. session=self.session,
  586. config=self.app_config,
  587. )
  588. workspace2 = WorkspaceApi(
  589. current_user=user2,
  590. session=self.session,
  591. config=self.app_config,
  592. ).create_workspace(
  593. 'test workspace2',
  594. save_now=True
  595. )
  596. folderb = api2.create(
  597. ContentType.Folder,
  598. workspace2,
  599. None,
  600. 'folder b',
  601. True
  602. )
  603. api2.copy(
  604. item=text_file,
  605. new_parent=folderb,
  606. new_label='test_file_copy'
  607. )
  608. transaction.commit()
  609. text_file_copy = api2.get_one_by_label_and_parent(
  610. 'test_file_copy',
  611. folderb,
  612. )
  613. assert text_file != text_file_copy
  614. assert text_file_copy.content_id != text_file.content_id
  615. assert text_file_copy.workspace_id == workspace2.workspace_id
  616. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  617. assert text_file_copy.depot_file.path != text_file.depot_file.path
  618. assert text_file_copy.label == 'test_file_copy'
  619. assert text_file_copy.type == text_file.type
  620. assert text_file_copy.parent.content_id == folderb.content_id
  621. assert text_file_copy.owner.user_id == user.user_id
  622. assert text_file_copy.description == text_file.description
  623. assert text_file_copy.file_extension == text_file.file_extension
  624. assert text_file_copy.file_mimetype == text_file.file_mimetype
  625. assert text_file_copy.revision_type == ActionDescription.COPY
  626. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  627. def test_unit_copy_file__same_label_different_parent_ok(self):
  628. uapi = UserApi(
  629. session=self.session,
  630. config=self.app_config,
  631. current_user=None,
  632. )
  633. group_api = GroupApi(
  634. current_user=None,
  635. session=self.session,
  636. config=self.app_config,
  637. )
  638. groups = [group_api.get_one(Group.TIM_USER),
  639. group_api.get_one(Group.TIM_MANAGER),
  640. group_api.get_one(Group.TIM_ADMIN)]
  641. user = uapi.create_minimal_user(
  642. email='user1@user',
  643. groups=groups,
  644. save_now=True
  645. )
  646. user2 = uapi.create_minimal_user(
  647. email='user2@user',
  648. groups=groups,
  649. save_now=True
  650. )
  651. workspace = WorkspaceApi(
  652. current_user=user,
  653. session=self.session,
  654. config=self.app_config,
  655. ).create_workspace(
  656. 'test workspace',
  657. save_now=True
  658. )
  659. RoleApi(
  660. current_user=user,
  661. session=self.session,
  662. config=self.app_config,
  663. ).create_one(
  664. user2,
  665. workspace,
  666. UserRoleInWorkspace.WORKSPACE_MANAGER,
  667. with_notif=False
  668. )
  669. api = ContentApi(
  670. current_user=user,
  671. session=self.session,
  672. config=self.app_config,
  673. )
  674. foldera = api.create(
  675. ContentType.Folder,
  676. workspace,
  677. None,
  678. 'folder a',
  679. True
  680. )
  681. with self.session.no_autoflush:
  682. text_file = api.create(
  683. content_type=ContentType.File,
  684. workspace=workspace,
  685. parent=foldera,
  686. label='test_file',
  687. do_save=False,
  688. )
  689. api.update_file_data(
  690. text_file,
  691. 'test_file',
  692. 'text/plain',
  693. b'test_content'
  694. )
  695. api.save(text_file, ActionDescription.CREATION)
  696. api2 = ContentApi(
  697. current_user=user2,
  698. session=self.session,
  699. config=self.app_config,
  700. )
  701. workspace2 = WorkspaceApi(
  702. current_user=user2,
  703. session=self.session,
  704. config=self.app_config,
  705. ).create_workspace(
  706. 'test workspace2',
  707. save_now=True
  708. )
  709. folderb = api2.create(
  710. ContentType.Folder,
  711. workspace2,
  712. None,
  713. 'folder b',
  714. True
  715. )
  716. api2.copy(
  717. item=text_file,
  718. new_parent=folderb,
  719. )
  720. transaction.commit()
  721. text_file_copy = api2.get_one_by_label_and_parent(
  722. 'test_file',
  723. folderb,
  724. )
  725. assert text_file != text_file_copy
  726. assert text_file_copy.content_id != text_file.content_id
  727. assert text_file_copy.workspace_id == workspace2.workspace_id
  728. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  729. assert text_file_copy.depot_file.path != text_file.depot_file.path
  730. assert text_file_copy.label == text_file.label
  731. assert text_file_copy.type == text_file.type
  732. assert text_file_copy.parent.content_id == folderb.content_id
  733. assert text_file_copy.owner.user_id == user.user_id
  734. assert text_file_copy.description == text_file.description
  735. assert text_file_copy.file_extension == text_file.file_extension
  736. assert text_file_copy.file_mimetype == text_file.file_mimetype
  737. assert text_file_copy.revision_type == ActionDescription.COPY
  738. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  739. def test_unit_copy_file_different_label_same_parent_ok(self):
  740. uapi = UserApi(
  741. session=self.session,
  742. config=self.app_config,
  743. current_user=None,
  744. )
  745. group_api = GroupApi(
  746. current_user=None,
  747. session=self.session,
  748. config=self.app_config,
  749. )
  750. groups = [group_api.get_one(Group.TIM_USER),
  751. group_api.get_one(Group.TIM_MANAGER),
  752. group_api.get_one(Group.TIM_ADMIN)]
  753. user = uapi.create_minimal_user(
  754. email='user1@user',
  755. groups=groups,
  756. save_now=True,
  757. )
  758. user2 = uapi.create_minimal_user(
  759. email='user2@user',
  760. groups=groups,
  761. save_now=True
  762. )
  763. workspace = WorkspaceApi(
  764. current_user=user,
  765. session=self.session,
  766. config=self.app_config,
  767. ).create_workspace(
  768. 'test workspace',
  769. save_now=True
  770. )
  771. RoleApi(
  772. current_user=user,
  773. session=self.session,
  774. config=self.app_config,
  775. ).create_one(
  776. user2, workspace,
  777. UserRoleInWorkspace.WORKSPACE_MANAGER,
  778. with_notif=False
  779. )
  780. api = ContentApi(
  781. current_user=user,
  782. session=self.session,
  783. config=self.app_config,
  784. )
  785. foldera = api.create(
  786. ContentType.Folder,
  787. workspace,
  788. None,
  789. 'folder a',
  790. True
  791. )
  792. with self.session.no_autoflush:
  793. text_file = api.create(
  794. content_type=ContentType.File,
  795. workspace=workspace,
  796. parent=foldera,
  797. label='test_file',
  798. do_save=False,
  799. )
  800. api.update_file_data(
  801. text_file,
  802. 'test_file',
  803. 'text/plain',
  804. b'test_content'
  805. )
  806. api.save(
  807. text_file,
  808. ActionDescription.CREATION
  809. )
  810. api2 = ContentApi(
  811. current_user=user2,
  812. session=self.session,
  813. config=self.app_config,
  814. )
  815. api2.copy(
  816. item=text_file,
  817. new_label='test_file_copy'
  818. )
  819. transaction.commit()
  820. text_file_copy = api2.get_one_by_label_and_parent(
  821. 'test_file_copy',
  822. foldera,
  823. )
  824. assert text_file != text_file_copy
  825. assert text_file_copy.content_id != text_file.content_id
  826. assert text_file_copy.workspace_id == workspace.workspace_id
  827. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  828. assert text_file_copy.depot_file.path != text_file.depot_file.path
  829. assert text_file_copy.label == 'test_file_copy'
  830. assert text_file_copy.type == text_file.type
  831. assert text_file_copy.parent.content_id == foldera.content_id
  832. assert text_file_copy.owner.user_id == user.user_id
  833. assert text_file_copy.description == text_file.description
  834. assert text_file_copy.file_extension == text_file.file_extension
  835. assert text_file_copy.file_mimetype == text_file.file_mimetype
  836. assert text_file_copy.revision_type == ActionDescription.COPY
  837. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  838. def test_mark_read__workspace(self):
  839. uapi = UserApi(
  840. session=self.session,
  841. config=self.app_config,
  842. current_user=None,
  843. )
  844. group_api = GroupApi(
  845. current_user=None,
  846. session=self.session,
  847. config=self.app_config,
  848. )
  849. groups = [group_api.get_one(Group.TIM_USER),
  850. group_api.get_one(Group.TIM_MANAGER),
  851. group_api.get_one(Group.TIM_ADMIN)]
  852. user_a = uapi.create_minimal_user(email='this.is@user',
  853. groups=groups, save_now=True)
  854. user_b = uapi.create_minimal_user(email='this.is@another.user',
  855. groups=groups, save_now=True)
  856. wapi = WorkspaceApi(
  857. current_user=user_a,
  858. session=self.session,
  859. config=self.app_config,
  860. )
  861. workspace1 = wapi.create_workspace(
  862. 'test workspace n°1',
  863. save_now=True)
  864. workspace2 = wapi.create_workspace(
  865. 'test workspace n°2',
  866. save_now=True)
  867. role_api1 = RoleApi(
  868. current_user=user_a,
  869. session=self.session,
  870. config=self.app_config,
  871. )
  872. role_api1.create_one(
  873. user_b,
  874. workspace1,
  875. UserRoleInWorkspace.READER,
  876. False
  877. )
  878. role_api2 = RoleApi(
  879. current_user=user_b,
  880. session=self.session,
  881. config=self.app_config,
  882. )
  883. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  884. False)
  885. cont_api_a = ContentApi(
  886. current_user=user_a,
  887. session=self.session,
  888. config=self.app_config,
  889. )
  890. cont_api_b = ContentApi(
  891. current_user=user_b,
  892. session=self.session,
  893. config=self.app_config,
  894. )
  895. # Creates page_1 & page_2 in workspace 1
  896. # and page_3 & page_4 in workspace 2
  897. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  898. 'this is a page', do_save=True)
  899. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  900. 'this is page1', do_save=True)
  901. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  902. 'this is page2', do_save=True)
  903. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  904. 'this is page3', do_save=True)
  905. for rev in page_1.revisions:
  906. eq_(user_b not in rev.read_by.keys(), True)
  907. for rev in page_2.revisions:
  908. eq_(user_b not in rev.read_by.keys(), True)
  909. for rev in page_3.revisions:
  910. eq_(user_b not in rev.read_by.keys(), True)
  911. for rev in page_4.revisions:
  912. eq_(user_b not in rev.read_by.keys(), True)
  913. # Set as read the workspace n°1
  914. cont_api_b.mark_read__workspace(workspace=workspace1)
  915. for rev in page_1.revisions:
  916. eq_(user_b in rev.read_by.keys(), True)
  917. for rev in page_2.revisions:
  918. eq_(user_b in rev.read_by.keys(), True)
  919. for rev in page_3.revisions:
  920. eq_(user_b not in rev.read_by.keys(), True)
  921. for rev in page_4.revisions:
  922. eq_(user_b not in rev.read_by.keys(), True)
  923. # Set as read the workspace n°2
  924. cont_api_b.mark_read__workspace(workspace=workspace2)
  925. for rev in page_1.revisions:
  926. eq_(user_b in rev.read_by.keys(), True)
  927. for rev in page_2.revisions:
  928. eq_(user_b in rev.read_by.keys(), True)
  929. for rev in page_3.revisions:
  930. eq_(user_b in rev.read_by.keys(), True)
  931. for rev in page_4.revisions:
  932. eq_(user_b in rev.read_by.keys(), True)
  933. def test_mark_read(self):
  934. uapi = UserApi(
  935. session=self.session,
  936. config=self.app_config,
  937. current_user=None,
  938. )
  939. group_api = GroupApi(
  940. current_user=None,
  941. session=self.session,
  942. config = self.app_config,
  943. )
  944. groups = [group_api.get_one(Group.TIM_USER),
  945. group_api.get_one(Group.TIM_MANAGER),
  946. group_api.get_one(Group.TIM_ADMIN)]
  947. user_a = uapi.create_minimal_user(
  948. email='this.is@user',
  949. groups=groups,
  950. save_now=True
  951. )
  952. user_b = uapi.create_minimal_user(
  953. email='this.is@another.user',
  954. groups=groups,
  955. save_now=True
  956. )
  957. wapi = WorkspaceApi(
  958. current_user=user_a,
  959. session=self.session,
  960. config=self.app_config,
  961. )
  962. workspace_api = WorkspaceApi(
  963. current_user=user_a,
  964. session=self.session,
  965. config=self.app_config,
  966. )
  967. workspace = wapi.create_workspace(
  968. 'test workspace',
  969. save_now=True)
  970. role_api = RoleApi(
  971. current_user=user_a,
  972. session=self.session,
  973. config=self.app_config,
  974. )
  975. role_api.create_one(
  976. user_b,
  977. workspace,
  978. UserRoleInWorkspace.READER,
  979. False
  980. )
  981. cont_api_a = ContentApi(
  982. current_user=user_a,
  983. session=self.session,
  984. config=self.app_config,
  985. )
  986. cont_api_b = ContentApi(
  987. current_user=user_b,
  988. session=self.session,
  989. config=self.app_config,
  990. )
  991. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  992. 'this is a page', do_save=True)
  993. for rev in page_1.revisions:
  994. eq_(user_b not in rev.read_by.keys(), True)
  995. cont_api_b.mark_read(page_1)
  996. for rev in page_1.revisions:
  997. eq_(user_b in rev.read_by.keys(), True)
  998. def test_mark_read__all(self):
  999. uapi = UserApi(
  1000. session=self.session,
  1001. config=self.app_config,
  1002. current_user=None,
  1003. )
  1004. group_api = GroupApi(
  1005. current_user=None,
  1006. session=self.session,
  1007. config=self.app_config,
  1008. )
  1009. groups = [group_api.get_one(Group.TIM_USER),
  1010. group_api.get_one(Group.TIM_MANAGER),
  1011. group_api.get_one(Group.TIM_ADMIN)]
  1012. user_a = uapi.create_minimal_user(
  1013. email='this.is@user',
  1014. groups=groups,
  1015. save_now=True
  1016. )
  1017. user_b = uapi.create_minimal_user(
  1018. email='this.is@another.user',
  1019. groups=groups,
  1020. save_now=True
  1021. )
  1022. wapi = WorkspaceApi(
  1023. current_user=user_a,
  1024. session=self.session,
  1025. config=self.app_config,
  1026. )
  1027. workspace = wapi.create_workspace(
  1028. 'test workspace',
  1029. save_now=True)
  1030. role_api = RoleApi(
  1031. current_user=user_a,
  1032. session=self.session,
  1033. config=self.app_config,
  1034. )
  1035. role_api.create_one(
  1036. user_b,
  1037. workspace,
  1038. UserRoleInWorkspace.READER,
  1039. False
  1040. )
  1041. cont_api_a = ContentApi(
  1042. current_user=user_a,
  1043. session=self.session,
  1044. config=self.app_config,
  1045. )
  1046. cont_api_b = ContentApi(
  1047. current_user=user_b,
  1048. session=self.session,
  1049. config=self.app_config,
  1050. )
  1051. page_2 = cont_api_a.create(
  1052. ContentType.Page,
  1053. workspace,
  1054. None,
  1055. 'this is page1',
  1056. do_save=True
  1057. )
  1058. page_3 = cont_api_a.create(
  1059. ContentType.Thread,
  1060. workspace,
  1061. None,
  1062. 'this is page2',
  1063. do_save=True
  1064. )
  1065. page_4 = cont_api_a.create(
  1066. ContentType.File,
  1067. workspace,
  1068. None,
  1069. 'this is page3',
  1070. do_save=True
  1071. )
  1072. for rev in page_2.revisions:
  1073. eq_(user_b not in rev.read_by.keys(), True)
  1074. for rev in page_3.revisions:
  1075. eq_(user_b not in rev.read_by.keys(), True)
  1076. for rev in page_4.revisions:
  1077. eq_(user_b not in rev.read_by.keys(), True)
  1078. self.session.refresh(page_2)
  1079. self.session.refresh(page_3)
  1080. self.session.refresh(page_4)
  1081. cont_api_b.mark_read__all()
  1082. for rev in page_2.revisions:
  1083. eq_(user_b in rev.read_by.keys(), True)
  1084. for rev in page_3.revisions:
  1085. eq_(user_b in rev.read_by.keys(), True)
  1086. for rev in page_4.revisions:
  1087. eq_(user_b in rev.read_by.keys(), True)
  1088. def test_update(self):
  1089. uapi = UserApi(
  1090. session=self.session,
  1091. config=self.app_config,
  1092. current_user=None,
  1093. )
  1094. group_api = GroupApi(
  1095. current_user=None,
  1096. session=self.session,
  1097. config=self.app_config,
  1098. )
  1099. groups = [group_api.get_one(Group.TIM_USER),
  1100. group_api.get_one(Group.TIM_MANAGER),
  1101. group_api.get_one(Group.TIM_ADMIN)]
  1102. user1 = uapi.create_minimal_user(
  1103. email='this.is@user',
  1104. groups=groups,
  1105. save_now=True
  1106. )
  1107. workspace_api = WorkspaceApi(
  1108. current_user=user1,
  1109. session=self.session,
  1110. config=self.app_config,
  1111. )
  1112. workspace = workspace_api.create_workspace(
  1113. 'test workspace',
  1114. save_now=True
  1115. )
  1116. wid = workspace.workspace_id
  1117. user2 = uapi.create_minimal_user('this.is@another.user')
  1118. uapi.save(user2)
  1119. RoleApi(
  1120. current_user=user1,
  1121. session=self.session,
  1122. config=self.app_config,
  1123. ).create_one(
  1124. user2,
  1125. workspace,
  1126. UserRoleInWorkspace.CONTENT_MANAGER,
  1127. with_notif=False,
  1128. flush=True
  1129. )
  1130. # Test starts here
  1131. api = ContentApi(
  1132. current_user=user1,
  1133. session=self.session,
  1134. config=self.app_config,
  1135. )
  1136. p = api.create(ContentType.Page, workspace, None,
  1137. 'this_is_a_page', True)
  1138. u1id = user1.user_id
  1139. u2id = user2.user_id
  1140. pcid = p.content_id
  1141. poid = p.owner_id
  1142. transaction.commit()
  1143. # Refresh instances after commit
  1144. user1 = uapi.get_one(u1id)
  1145. workspace = WorkspaceApi(
  1146. current_user=user1,
  1147. session=self.session,
  1148. config=self.app_config,
  1149. ).get_one(wid)
  1150. api = ContentApi(
  1151. current_user=user1,
  1152. session=self.session,
  1153. config=self.app_config,
  1154. )
  1155. content = api.get_one(pcid, ContentType.Any, workspace)
  1156. eq_(u1id, content.owner_id)
  1157. eq_(poid, content.owner_id)
  1158. u2 = UserApi(
  1159. session=self.session,
  1160. config=self.app_config,
  1161. current_user=None,
  1162. ).get_one(u2id)
  1163. api2 = ContentApi(
  1164. current_user=u2,
  1165. session=self.session,
  1166. config=self.app_config,
  1167. )
  1168. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1169. with new_revision(
  1170. session=self.session,
  1171. tm=transaction.manager,
  1172. content=content2,
  1173. ):
  1174. api2.update_content(
  1175. content2,
  1176. 'this is an updated page',
  1177. 'new content'
  1178. )
  1179. api2.save(content2)
  1180. transaction.commit()
  1181. # Refresh instances after commit
  1182. user1 = uapi.get_one(u1id)
  1183. workspace = WorkspaceApi(
  1184. current_user=user1,
  1185. session=self.session,
  1186. config=self.app_config,
  1187. ).get_one(wid)
  1188. api = ContentApi(
  1189. current_user=user1,
  1190. session=self.session,
  1191. config=self.app_config,
  1192. )
  1193. updated = api.get_one(pcid, ContentType.Any, workspace)
  1194. eq_(u2id, updated.owner_id,
  1195. 'the owner id should be {} (found {})'.format(u2id,
  1196. updated.owner_id))
  1197. eq_('this is an updated page', updated.label)
  1198. eq_('new content', updated.description)
  1199. eq_(ActionDescription.EDITION, updated.revision_type)
  1200. def test_update_no_change(self):
  1201. uapi = UserApi(
  1202. session=self.session,
  1203. config=self.app_config,
  1204. current_user=None,
  1205. )
  1206. group_api = GroupApi(
  1207. current_user=None,
  1208. session=self.session,
  1209. config = self.app_config,
  1210. )
  1211. groups = [group_api.get_one(Group.TIM_USER),
  1212. group_api.get_one(Group.TIM_MANAGER),
  1213. group_api.get_one(Group.TIM_ADMIN)]
  1214. user1 = uapi.create_minimal_user(
  1215. email='this.is@user',
  1216. groups=groups,
  1217. save_now=True,
  1218. )
  1219. workspace = WorkspaceApi(
  1220. current_user=user1,
  1221. session=self.session,
  1222. config=self.app_config,
  1223. ).create_workspace(
  1224. 'test workspace',
  1225. save_now=True
  1226. )
  1227. user2 = uapi.create_minimal_user('this.is@another.user')
  1228. uapi.save(user2)
  1229. RoleApi(
  1230. current_user=user1,
  1231. session=self.session,
  1232. config=self.app_config,
  1233. ).create_one(
  1234. user2,
  1235. workspace,
  1236. UserRoleInWorkspace.CONTENT_MANAGER,
  1237. with_notif=False,
  1238. flush=True
  1239. )
  1240. api = ContentApi(
  1241. current_user=user1,
  1242. session=self.session,
  1243. config=self.app_config,
  1244. )
  1245. with self.session.no_autoflush:
  1246. page = api.create(
  1247. content_type=ContentType.Page,
  1248. workspace=workspace,
  1249. label="same_content",
  1250. do_save=False
  1251. )
  1252. page.description = "Same_content_here"
  1253. api.save(page, ActionDescription.CREATION, do_notify=True)
  1254. transaction.commit()
  1255. api2 = ContentApi(
  1256. current_user=user2,
  1257. session=self.session,
  1258. config=self.app_config,
  1259. )
  1260. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1261. with new_revision(
  1262. session=self.session,
  1263. tm=transaction.manager,
  1264. content=content2,
  1265. ):
  1266. with pytest.raises(SameValueError):
  1267. api2.update_content(
  1268. item=content2,
  1269. new_label='same_content',
  1270. new_content='Same_content_here'
  1271. )
  1272. api2.save(content2)
  1273. transaction.commit()
  1274. def test_update_file_data(self):
  1275. uapi = UserApi(
  1276. session=self.session,
  1277. config=self.app_config,
  1278. current_user=None,
  1279. )
  1280. group_api = GroupApi(
  1281. current_user=None,
  1282. session=self.session,
  1283. config=self.app_config,
  1284. )
  1285. groups = [group_api.get_one(Group.TIM_USER),
  1286. group_api.get_one(Group.TIM_MANAGER),
  1287. group_api.get_one(Group.TIM_ADMIN)]
  1288. user1 = uapi.create_minimal_user(
  1289. email='this.is@user',
  1290. groups=groups,
  1291. save_now=True
  1292. )
  1293. workspace_api = WorkspaceApi(
  1294. current_user=user1,
  1295. session=self.session,
  1296. config=self.app_config,
  1297. )
  1298. workspace = workspace_api.create_workspace(
  1299. 'test workspace',
  1300. save_now=True
  1301. )
  1302. wid = workspace.workspace_id
  1303. user2 = uapi.create_minimal_user('this.is@another.user')
  1304. uapi.save(user2)
  1305. RoleApi(
  1306. current_user=user1,
  1307. session=self.session,
  1308. config=self.app_config,
  1309. ).create_one(
  1310. user2,
  1311. workspace,
  1312. UserRoleInWorkspace.CONTENT_MANAGER,
  1313. with_notif=True,
  1314. flush=True
  1315. )
  1316. # Test starts here
  1317. api = ContentApi(
  1318. current_user=user1,
  1319. session=self.session,
  1320. config=self.app_config,
  1321. )
  1322. p = api.create(ContentType.File, workspace, None,
  1323. 'this_is_a_page', True)
  1324. u1id = user1.user_id
  1325. u2id = user2.user_id
  1326. pcid = p.content_id
  1327. poid = p.owner_id
  1328. api.save(p)
  1329. transaction.commit()
  1330. # Refresh instances after commit
  1331. user1 = uapi.get_one(u1id)
  1332. workspace_api2 = WorkspaceApi(
  1333. current_user=user1,
  1334. session=self.session,
  1335. config=self.app_config,
  1336. )
  1337. workspace = workspace_api2.get_one(wid)
  1338. api = ContentApi(
  1339. current_user=user1,
  1340. session=self.session,
  1341. config=self.app_config,
  1342. )
  1343. content = api.get_one(pcid, ContentType.Any, workspace)
  1344. eq_(u1id, content.owner_id)
  1345. eq_(poid, content.owner_id)
  1346. u2 = UserApi(
  1347. current_user=None,
  1348. session=self.session,
  1349. config=self.app_config,
  1350. ).get_one(u2id)
  1351. api2 = ContentApi(
  1352. current_user=u2,
  1353. session=self.session,
  1354. config=self.app_config,
  1355. )
  1356. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1357. with new_revision(
  1358. session=self.session,
  1359. tm=transaction.manager,
  1360. content=content2,
  1361. ):
  1362. api2.update_file_data(
  1363. content2,
  1364. 'index.html',
  1365. 'text/html',
  1366. b'<html>hello world</html>'
  1367. )
  1368. api2.save(content2)
  1369. transaction.commit()
  1370. # Refresh instances after commit
  1371. user1 = uapi.get_one(u1id)
  1372. workspace = WorkspaceApi(
  1373. current_user=user1,
  1374. session=self.session,
  1375. config=self.app_config,
  1376. ).get_one(wid)
  1377. updated = api.get_one(pcid, ContentType.Any, workspace)
  1378. eq_(u2id, updated.owner_id,
  1379. 'the owner id should be {} (found {})'.format(u2id,
  1380. updated.owner_id))
  1381. eq_('this_is_a_page.html', updated.file_name)
  1382. eq_('text/html', updated.file_mimetype)
  1383. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1384. eq_(ActionDescription.REVISION, updated.revision_type)
  1385. def test_update_no_change(self):
  1386. uapi = UserApi(
  1387. session=self.session,
  1388. config=self.app_config,
  1389. current_user=None,
  1390. )
  1391. group_api = GroupApi(
  1392. current_user=None,
  1393. session=self.session,
  1394. config=self.app_config,
  1395. )
  1396. groups = [group_api.get_one(Group.TIM_USER),
  1397. group_api.get_one(Group.TIM_MANAGER),
  1398. group_api.get_one(Group.TIM_ADMIN)]
  1399. user1 = uapi.create_minimal_user(
  1400. email='this.is@user',
  1401. groups=groups,
  1402. save_now=True,
  1403. )
  1404. workspace_api = WorkspaceApi(
  1405. current_user=user1,
  1406. session=self.session,
  1407. config=self.app_config,
  1408. )
  1409. workspace = workspace_api.create_workspace(
  1410. 'test workspace',
  1411. save_now=True
  1412. )
  1413. user2 = uapi.create_minimal_user('this.is@another.user')
  1414. uapi.save(user2)
  1415. RoleApi(
  1416. current_user=user1,
  1417. session=self.session,
  1418. config=self.app_config,
  1419. ).create_one(
  1420. user2,
  1421. workspace,
  1422. UserRoleInWorkspace.CONTENT_MANAGER,
  1423. with_notif=False,
  1424. flush=True
  1425. )
  1426. api = ContentApi(
  1427. current_user=user1,
  1428. session=self.session,
  1429. config=self.app_config,
  1430. )
  1431. with self.session.no_autoflush:
  1432. page = api.create(
  1433. content_type=ContentType.Page,
  1434. workspace=workspace,
  1435. label="same_content",
  1436. do_save=False
  1437. )
  1438. api.update_file_data(
  1439. page,
  1440. 'index.html',
  1441. 'text/html',
  1442. b'<html>Same Content Here</html>'
  1443. )
  1444. api.save(page, ActionDescription.CREATION, do_notify=True)
  1445. transaction.commit()
  1446. api2 = ContentApi(
  1447. current_user=user2,
  1448. session=self.session,
  1449. config=self.app_config,
  1450. )
  1451. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1452. with new_revision(
  1453. session=self.session,
  1454. tm=transaction.manager,
  1455. content=content2,
  1456. ):
  1457. with pytest.raises(SameValueError):
  1458. api2.update_file_data(
  1459. page,
  1460. 'index.html',
  1461. 'text/html',
  1462. b'<html>Same Content Here</html>'
  1463. )
  1464. api2.save(content2)
  1465. transaction.commit()
  1466. def test_archive_unarchive(self):
  1467. uapi = UserApi(
  1468. session=self.session,
  1469. config=self.app_config,
  1470. current_user=None,
  1471. )
  1472. group_api = GroupApi(
  1473. current_user=None,
  1474. session=self.session,
  1475. config=self.app_config,
  1476. )
  1477. groups = [group_api.get_one(Group.TIM_USER),
  1478. group_api.get_one(Group.TIM_MANAGER),
  1479. group_api.get_one(Group.TIM_ADMIN)]
  1480. user1 = uapi.create_minimal_user(
  1481. email='this.is@user',
  1482. groups=groups,
  1483. save_now=True
  1484. )
  1485. u1id = user1.user_id
  1486. workspace_api = WorkspaceApi(
  1487. current_user=user1,
  1488. session=self.session,
  1489. config=self.app_config,
  1490. )
  1491. workspace = workspace_api.create_workspace(
  1492. 'test workspace',
  1493. save_now=True
  1494. )
  1495. wid = workspace.workspace_id
  1496. user2 = uapi.create_minimal_user('this.is@another.user')
  1497. uapi.save(user2)
  1498. RoleApi(
  1499. current_user=user1,
  1500. session=self.session,
  1501. config=self.app_config,
  1502. ).create_one(
  1503. user2,
  1504. workspace,
  1505. UserRoleInWorkspace.CONTENT_MANAGER,
  1506. with_notif=True,
  1507. flush=True
  1508. )
  1509. # show archived is used at the top end of the test
  1510. api = ContentApi(
  1511. current_user=user1,
  1512. session=self.session,
  1513. show_archived=True,
  1514. config=self.app_config,
  1515. )
  1516. p = api.create(ContentType.File, workspace, None,
  1517. 'this_is_a_page', True)
  1518. u1id = user1.user_id
  1519. u2id = user2.user_id
  1520. pcid = p.content_id
  1521. poid = p.owner_id
  1522. transaction.commit()
  1523. ####
  1524. # refresh after commit
  1525. user1 = UserApi(
  1526. current_user=None,
  1527. config=self.app_config,
  1528. session=self.session
  1529. ).get_one(u1id)
  1530. workspace = WorkspaceApi(
  1531. current_user=user1,
  1532. session=self.session,
  1533. config=self.app_config,
  1534. ).get_one(wid)
  1535. content = api.get_one(pcid, ContentType.Any, workspace)
  1536. eq_(u1id, content.owner_id)
  1537. eq_(poid, content.owner_id)
  1538. u2api = UserApi(
  1539. session=self.session,
  1540. config=self.app_config,
  1541. current_user=None,
  1542. )
  1543. u2 = u2api.get_one(u2id)
  1544. api2 = ContentApi(
  1545. current_user=u2,
  1546. session=self.session,
  1547. config=self.app_config,
  1548. show_archived=True,
  1549. )
  1550. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1551. with new_revision(
  1552. session=self.session,
  1553. tm=transaction.manager,
  1554. content=content2,
  1555. ):
  1556. api2.archive(content2)
  1557. api2.save(content2)
  1558. transaction.commit()
  1559. # refresh after commit
  1560. user1 = UserApi(
  1561. current_user=None,
  1562. session=self.session,
  1563. config=self.app_config,
  1564. ).get_one(u1id)
  1565. workspace = WorkspaceApi(
  1566. current_user=user1,
  1567. session=self.session,
  1568. config=self.app_config,
  1569. ).get_one(wid)
  1570. u2 = UserApi(
  1571. current_user=None,
  1572. session=self.session,
  1573. config=self.app_config,
  1574. ).get_one(u2id)
  1575. api = ContentApi(
  1576. current_user=user1,
  1577. session=self.session,
  1578. config=self.app_config,
  1579. show_archived=True,
  1580. )
  1581. api2 = ContentApi(
  1582. current_user=u2,
  1583. session=self.session,
  1584. config=self.app_config,
  1585. show_archived=True,
  1586. )
  1587. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1588. eq_(u2id, updated.owner_id,
  1589. 'the owner id should be {} (found {})'.format(u2id,
  1590. updated.owner_id))
  1591. eq_(True, updated.is_archived)
  1592. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1593. ####
  1594. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1595. with new_revision(
  1596. session=self.session,
  1597. tm=transaction.manager,
  1598. content=updated,
  1599. ):
  1600. api.unarchive(updated)
  1601. api.save(updated2)
  1602. eq_(False, updated2.is_archived)
  1603. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1604. eq_(u1id, updated2.owner_id)
  1605. def test_delete_undelete(self):
  1606. uapi = UserApi(
  1607. session=self.session,
  1608. config=self.app_config,
  1609. current_user=None,
  1610. )
  1611. group_api = GroupApi(
  1612. current_user=None,
  1613. session=self.session,
  1614. config=self.app_config,
  1615. )
  1616. groups = [group_api.get_one(Group.TIM_USER),
  1617. group_api.get_one(Group.TIM_MANAGER),
  1618. group_api.get_one(Group.TIM_ADMIN)]
  1619. user1 = uapi.create_minimal_user(
  1620. email='this.is@user',
  1621. groups=groups,
  1622. save_now=True
  1623. )
  1624. u1id = user1.user_id
  1625. workspace_api = WorkspaceApi(
  1626. current_user=user1,
  1627. session=self.session,
  1628. config=self.app_config,
  1629. )
  1630. workspace = workspace_api.create_workspace(
  1631. 'test workspace',
  1632. save_now=True
  1633. )
  1634. wid = workspace.workspace_id
  1635. user2 = uapi.create_minimal_user('this.is@another.user')
  1636. uapi.save(user2)
  1637. RoleApi(
  1638. current_user=user1,
  1639. session=self.session,
  1640. config=self.app_config,
  1641. ).create_one(
  1642. user2,
  1643. workspace,
  1644. UserRoleInWorkspace.CONTENT_MANAGER,
  1645. with_notif=True,
  1646. flush=True
  1647. )
  1648. # show archived is used at the top end of the test
  1649. api = ContentApi(
  1650. current_user=user1,
  1651. session=self.session,
  1652. config=self.app_config,
  1653. show_deleted=True,
  1654. )
  1655. p = api.create(ContentType.File, workspace, None,
  1656. 'this_is_a_page', True)
  1657. u1id = user1.user_id
  1658. u2id = user2.user_id
  1659. pcid = p.content_id
  1660. poid = p.owner_id
  1661. transaction.commit()
  1662. ####
  1663. user1 = UserApi(
  1664. current_user=None,
  1665. session=self.session,
  1666. config=self.app_config,
  1667. ).get_one(u1id)
  1668. workspace = WorkspaceApi(
  1669. current_user=user1,
  1670. session=self.session,
  1671. config=self.app_config,
  1672. ).get_one(wid)
  1673. content = api.get_one(pcid, ContentType.Any, workspace)
  1674. eq_(u1id, content.owner_id)
  1675. eq_(poid, content.owner_id)
  1676. u2 = UserApi(
  1677. current_user=None,
  1678. session=self.session,
  1679. config=self.app_config,
  1680. ).get_one(u2id)
  1681. api2 = ContentApi(
  1682. current_user=u2,
  1683. session=self.session,
  1684. config=self.app_config,
  1685. show_deleted=True,
  1686. )
  1687. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1688. with new_revision(
  1689. session=self.session,
  1690. tm=transaction.manager,
  1691. content=content2,
  1692. ):
  1693. api2.delete(content2)
  1694. api2.save(content2)
  1695. transaction.commit()
  1696. ####
  1697. user1 = UserApi(
  1698. current_user=None,
  1699. session=self.session,
  1700. config=self.app_config,
  1701. ).get_one(u1id)
  1702. workspace = WorkspaceApi(
  1703. current_user=user1,
  1704. session=self.session,
  1705. config=self.app_config,
  1706. ).get_one(wid)
  1707. # show archived is used at the top end of the test
  1708. api = ContentApi(
  1709. current_user=user1,
  1710. session=self.session,
  1711. config=self.app_config,
  1712. show_deleted=True,
  1713. )
  1714. u2 = UserApi(
  1715. current_user=None,
  1716. session=self.session,
  1717. config=self.app_config,
  1718. ).get_one(u2id)
  1719. api2 = ContentApi(
  1720. current_user=u2,
  1721. session=self.session,
  1722. config=self.app_config,
  1723. show_deleted=True
  1724. )
  1725. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1726. eq_(u2id, updated.owner_id,
  1727. 'the owner id should be {} (found {})'.format(u2id,
  1728. updated.owner_id))
  1729. eq_(True, updated.is_deleted)
  1730. eq_(ActionDescription.DELETION, updated.revision_type)
  1731. ####
  1732. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1733. with new_revision(
  1734. tm=transaction.manager,
  1735. session=self.session,
  1736. content=updated2,
  1737. ):
  1738. api.undelete(updated2)
  1739. api.save(updated2)
  1740. eq_(False, updated2.is_deleted)
  1741. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1742. eq_(u1id, updated2.owner_id)
  1743. def test_search_in_label(self):
  1744. # HACK - D.A. - 2015-03-09
  1745. # This test is based on a bug which does NOT return results found
  1746. # at root of a workspace (eg a folder)
  1747. uapi = UserApi(
  1748. session=self.session,
  1749. config=self.app_config,
  1750. current_user=None,
  1751. )
  1752. group_api = GroupApi(
  1753. current_user=None,
  1754. session=self.session,
  1755. config=self.app_config,
  1756. )
  1757. groups = [group_api.get_one(Group.TIM_USER),
  1758. group_api.get_one(Group.TIM_MANAGER),
  1759. group_api.get_one(Group.TIM_ADMIN)]
  1760. user = uapi.create_minimal_user(email='this.is@user',
  1761. groups=groups, save_now=True)
  1762. workspace = WorkspaceApi(
  1763. current_user=user,
  1764. session=self.session,
  1765. config=self.app_config,
  1766. ).create_workspace(
  1767. 'test workspace',
  1768. save_now=True
  1769. )
  1770. api = ContentApi(
  1771. current_user=user,
  1772. session=self.session,
  1773. config=self.app_config,
  1774. )
  1775. a = api.create(ContentType.Folder, workspace, None,
  1776. 'this is randomized folder', True)
  1777. p = api.create(ContentType.Page, workspace, a,
  1778. 'this is randomized label content', True)
  1779. with new_revision(
  1780. session=self.session,
  1781. tm=transaction.manager,
  1782. content=p,
  1783. ):
  1784. p.description = 'This is some amazing test'
  1785. api.save(p)
  1786. original_id = p.content_id
  1787. res = api.search(['randomized'])
  1788. eq_(1, len(res.all()))
  1789. item = res.all()[0]
  1790. eq_(original_id, item.content_id)
  1791. def test_search_in_description(self):
  1792. # HACK - D.A. - 2015-03-09
  1793. # This test is based on a bug which does NOT return results found
  1794. # at root of a workspace (eg a folder)
  1795. uapi = UserApi(
  1796. session=self.session,
  1797. config=self.app_config,
  1798. current_user=None,
  1799. )
  1800. group_api = GroupApi(
  1801. current_user=None,
  1802. session=self.session,
  1803. config=self.app_config,
  1804. )
  1805. groups = [group_api.get_one(Group.TIM_USER),
  1806. group_api.get_one(Group.TIM_MANAGER),
  1807. group_api.get_one(Group.TIM_ADMIN)]
  1808. user = uapi.create_minimal_user(email='this.is@user',
  1809. groups=groups, save_now=True)
  1810. workspace = WorkspaceApi(
  1811. current_user=user,
  1812. session=self.session,
  1813. config=self.app_config,
  1814. ).create_workspace(
  1815. 'test workspace',
  1816. save_now=True,
  1817. )
  1818. api = ContentApi(
  1819. current_user=user,
  1820. session=self.session,
  1821. config=self.app_config,
  1822. )
  1823. a = api.create(ContentType.Folder, workspace, None,
  1824. 'this is randomized folder', True)
  1825. p = api.create(ContentType.Page, workspace, a,
  1826. 'this is dummy label content', True)
  1827. with new_revision(
  1828. tm=transaction.manager,
  1829. session=self.session,
  1830. content=p,
  1831. ):
  1832. p.description = 'This is some amazing test'
  1833. api.save(p)
  1834. original_id = p.content_id
  1835. res = api.search(['dummy'])
  1836. eq_(1, len(res.all()))
  1837. item = res.all()[0]
  1838. eq_(original_id, item.content_id)
  1839. def test_search_in_label_or_description(self):
  1840. # HACK - D.A. - 2015-03-09
  1841. # This test is based on a bug which does NOT return results found
  1842. # at root of a workspace (eg a folder)
  1843. uapi = UserApi(
  1844. session=self.session,
  1845. config=self.app_config,
  1846. current_user=None,
  1847. )
  1848. group_api = GroupApi(
  1849. current_user=None,
  1850. session=self.session,
  1851. config=self.app_config,
  1852. )
  1853. groups = [group_api.get_one(Group.TIM_USER),
  1854. group_api.get_one(Group.TIM_MANAGER),
  1855. group_api.get_one(Group.TIM_ADMIN)]
  1856. user = uapi.create_minimal_user(email='this.is@user',
  1857. groups=groups, save_now=True)
  1858. workspace = WorkspaceApi(
  1859. current_user=user,
  1860. session=self.session,
  1861. config=self.app_config,
  1862. ).create_workspace('test workspace', save_now=True)
  1863. api = ContentApi(
  1864. current_user=user,
  1865. session=self.session,
  1866. config=self.app_config,
  1867. )
  1868. a = api.create(ContentType.Folder, workspace, None,
  1869. 'this is randomized folder', True)
  1870. p1 = api.create(ContentType.Page, workspace, a,
  1871. 'this is dummy label content', True)
  1872. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1873. with new_revision(
  1874. session=self.session,
  1875. tm=transaction.manager,
  1876. content=p1,
  1877. ):
  1878. p1.description = 'This is some amazing test'
  1879. with new_revision(
  1880. session=self.session,
  1881. tm=transaction.manager,
  1882. content=p2,
  1883. ):
  1884. p2.description = 'What\'s up?'
  1885. api.save(p1)
  1886. api.save(p2)
  1887. id1 = p1.content_id
  1888. id2 = p2.content_id
  1889. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1890. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1891. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1892. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1893. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1894. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up?').count())
  1895. res = api.search(['dummy', 'jon'])
  1896. eq_(2, len(res.all()))
  1897. eq_(True, id1 in [o.content_id for o in res.all()])
  1898. eq_(True, id2 in [o.content_id for o in res.all()])
  1899. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1900. admin = self.session.query(User)\
  1901. .filter(User.email == 'admin@admin.admin').one()
  1902. workspace = self._create_workspace_and_test(
  1903. 'workspace_1',
  1904. admin
  1905. )
  1906. folder_1 = self._create_content_and_test(
  1907. 'folder_1',
  1908. workspace=workspace,
  1909. type=ContentType.Folder
  1910. )
  1911. folder_2 = self._create_content_and_test(
  1912. 'folder_2',
  1913. workspace=workspace,
  1914. type=ContentType.Folder
  1915. )
  1916. page_1 = self._create_content_and_test(
  1917. 'foo', workspace=workspace,
  1918. type=ContentType.Page,
  1919. parent=folder_1
  1920. )
  1921. page_2 = self._create_content_and_test(
  1922. 'bar',
  1923. workspace=workspace,
  1924. type=ContentType.Page,
  1925. parent=folder_2
  1926. )
  1927. api = ContentApi(
  1928. current_user=admin,
  1929. session=self.session,
  1930. config=self.app_config,
  1931. )
  1932. foo_result = api.search(['foo']).all()
  1933. eq_(1, len(foo_result))
  1934. assert page_1 in foo_result
  1935. bar_result = api.search(['bar']).all()
  1936. eq_(1, len(bar_result))
  1937. assert page_2 in bar_result
  1938. with new_revision(
  1939. session=self.session,
  1940. tm=transaction.manager,
  1941. content=folder_1,
  1942. ):
  1943. api.delete(folder_1)
  1944. with new_revision(
  1945. session=self.session,
  1946. tm=transaction.manager,
  1947. content=folder_2,
  1948. ):
  1949. api.archive(folder_2)
  1950. # Actually ContentApi.search don't filter it
  1951. foo_result = api.search(['foo']).all()
  1952. eq_(1, len(foo_result))
  1953. assert page_1 in foo_result
  1954. bar_result = api.search(['bar']).all()
  1955. eq_(1, len(bar_result))
  1956. assert page_2 in bar_result
  1957. # ContentApi offer exclude_unavailable method to do it
  1958. foo_result = api.search(['foo']).all()
  1959. api.exclude_unavailable(foo_result)
  1960. eq_(0, len(foo_result))
  1961. bar_result = api.search(['bar']).all()
  1962. api.exclude_unavailable(bar_result)
  1963. eq_(0, len(bar_result))
  1964. class TestContentApiSecurity(DefaultTest):
  1965. fixtures = [FixtureTest, ]
  1966. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1967. admin = self.session.query(User)\
  1968. .filter(User.email == 'admin@admin.admin').one()
  1969. bob = self.session.query(User)\
  1970. .filter(User.email == 'bob@fsf.local').one()
  1971. bob_workspace = WorkspaceApi(
  1972. current_user=bob,
  1973. session=self.session,
  1974. config=self.app_config,
  1975. ).create_workspace(
  1976. 'bob_workspace',
  1977. save_now=True,
  1978. )
  1979. admin_workspace = WorkspaceApi(
  1980. current_user=admin,
  1981. session=self.session,
  1982. config=self.app_config,
  1983. ).create_workspace(
  1984. 'admin_workspace',
  1985. save_now=True,
  1986. )
  1987. bob_page = ContentApi(
  1988. current_user=bob,
  1989. session=self.session,
  1990. config=self.app_config,
  1991. ).create(
  1992. content_type=ContentType.Page,
  1993. workspace=bob_workspace,
  1994. label='bob_page',
  1995. do_save=True,
  1996. )
  1997. admin_page = ContentApi(
  1998. current_user=admin,
  1999. session=self.session,
  2000. config=self.app_config,
  2001. ).create(
  2002. content_type=ContentType.Page,
  2003. workspace=admin_workspace,
  2004. label='admin_page',
  2005. do_save=True,
  2006. )
  2007. bob_viewable = ContentApi(
  2008. current_user=bob,
  2009. session=self.session,
  2010. config=self.app_config,
  2011. ).get_all()
  2012. eq_(1, len(bob_viewable), 'Bob should view only one content')
  2013. eq_(
  2014. 'bob_page',
  2015. bob_viewable[0].label,
  2016. 'Bob should not view "{0}" content'.format(
  2017. bob_viewable[0].label,
  2018. )
  2019. )