test_content_api.py 68KB


  1. # -*- coding: utf-8 -*-
  2. import transaction
  3. import pytest
  4. from tracim.config import CFG
  5. from tracim.lib.core.content import compare_content_for_sorting_by_type_and_name
  6. from tracim.lib.core.content import ContentApi
  7. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  8. from tracim.lib.core.group import GroupApi
  9. from tracim.lib.core.user import UserApi
  10. from tracim.exceptions import SameValueError
  11. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  12. from tracim.lib.core.workspace import RoleApi
  13. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  14. from tracim.lib.core.workspace import WorkspaceApi
  15. from tracim.models.revision_protection import new_revision
  16. from tracim.models.auth import User
  17. from tracim.models.auth import Group
  18. from tracim.models.data import ActionDescription
  19. from tracim.models.data import ContentRevisionRO
  20. from tracim.models.data import Workspace
  21. from tracim.models.data import Content
  22. from tracim.models.data import ContentType
  23. from tracim.models.data import UserRoleInWorkspace
  24. from tracim.fixtures.users_and_groups import Test as FixtureTest
  25. from tracim.tests import DefaultTest
  26. from tracim.tests import eq_
  27. class TestContentApi(DefaultTest):
  28. def test_compare_content_for_sorting_by_type(self):
  29. c1 = Content()
  30. c1.label = ''
  31. c1.type = 'file'
  32. c2 = Content()
  33. c2.label = ''
  34. c2.type = 'folder'
  35. c11 = c1
  36. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  37. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  38. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  39. def test_compare_content_for_sorting_by_label(self):
  40. c1 = Content()
  41. c1.label = 'bbb'
  42. c1.type = 'file'
  43. c2 = Content()
  44. c2.label = 'aaa'
  45. c2.type = 'file'
  46. c11 = c1
  47. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  48. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  49. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  50. def test_sort_by_label_or_filename(self):
  51. c1 = Content()
  52. c1.label = 'ABCD'
  53. c1.type = 'file'
  54. c2 = Content()
  55. c2.label = ''
  56. c2.type = 'file'
  57. c2.file_name = 'AABC'
  58. c3 = Content()
  59. c3.label = 'BCDE'
  60. c3.type = 'file'
  61. items = [c1, c2, c3]
  62. sorteds = ContentApi.sort_content(items)
  63. eq_(sorteds[0], c2)
  64. eq_(sorteds[1], c1)
  65. eq_(sorteds[2], c3)
  66. def test_sort_by_content_type(self):
  67. c1 = Content()
  68. c1.label = 'AAAA'
  69. c1.type = 'file'
  70. c2 = Content()
  71. c2.label = 'BBBB'
  72. c2.type = 'folder'
  73. items = [c1, c2]
  74. sorteds = ContentApi.sort_content(items)
  75. eq_(sorteds[0], c2,
  76. 'value is {} instead of {}'.format(sorteds[0].content_id,
  77. c2.content_id))
  78. eq_(sorteds[1], c1,
  79. 'value is {} instead of {}'.format(sorteds[1].content_id,
  80. c1.content_id))
  81. def test_delete(self):
  82. uapi = UserApi(
  83. session=self.session,
  84. config=self.app_config,
  85. current_user=None,
  86. )
  87. group_api = GroupApi(
  88. current_user=None,
  89. session=self.session,
  90. config=self.app_config,
  91. )
  92. groups = [group_api.get_one(Group.TIM_USER),
  93. group_api.get_one(Group.TIM_MANAGER),
  94. group_api.get_one(Group.TIM_ADMIN)]
  95. user = uapi.create_user(email='this.is@user',
  96. groups=groups, save_now=True)
  97. workspace = WorkspaceApi(
  98. current_user=user,
  99. session=self.session,
  100. config=self.app_config,
  101. ).create_workspace('test workspace', save_now=True)
  102. api = ContentApi(
  103. current_user=user,
  104. session=self.session,
  105. config=self.app_config,
  106. )
  107. item = api.create(ContentType.Folder, workspace, None,
  108. 'not_deleted', True)
  109. item2 = api.create(ContentType.Folder, workspace, None,
  110. 'to_delete', True)
  111. uid = user.user_id
  112. wid = workspace.workspace_id
  113. transaction.commit()
  114. # Refresh instances after commit
  115. user = uapi.get_one(uid)
  116. workspace_api = WorkspaceApi(
  117. current_user=user,
  118. session=self.session,
  119. config=self.app_config
  120. )
  121. workspace = workspace_api.get_one(wid)
  122. api = ContentApi(
  123. current_user=user,
  124. session=self.session,
  125. config=self.app_config,
  126. )
  127. items = api.get_all(None, ContentType.Any, workspace)
  128. eq_(2, len(items))
  129. items = api.get_all(None, ContentType.Any, workspace)
  130. with new_revision(
  131. session=self.session,
  132. tm=transaction.manager,
  133. content=items[0]
  134. ):
  135. api.delete(items[0])
  136. transaction.commit()
  137. # Refresh instances after commit
  138. user = uapi.get_one(uid)
  139. workspace_api = WorkspaceApi(
  140. current_user=user,
  141. session=self.session,
  142. config=self.app_config
  143. )
  144. workspace = workspace_api.get_one(wid)
  145. api = ContentApi(
  146. current_user=user,
  147. session=self.session,
  148. config=self.app_config,
  149. )
  150. items = api.get_all(None, ContentType.Any, workspace)
  151. eq_(1, len(items))
  152. transaction.commit()
  153. # Test that the item is still available if "show deleted" is activated
  154. # Refresh instances after commit
  155. user = uapi.get_one(uid)
  156. workspace_api = WorkspaceApi(
  157. current_user=user,
  158. session=self.session,
  159. config=self.app_config,
  160. )
  161. api = ContentApi(
  162. current_user=user,
  163. session=self.session,
  164. config=self.app_config,
  165. show_deleted=True,
  166. )
  167. items = api.get_all(None, ContentType.Any, workspace)
  168. eq_(2, len(items))
  169. def test_archive(self):
  170. uapi = UserApi(
  171. session=self.session,
  172. config=self.app_config,
  173. current_user=None,
  174. )
  175. group_api = GroupApi(
  176. current_user=None,
  177. session=self.session,
  178. config=self.app_config,
  179. )
  180. groups = [group_api.get_one(Group.TIM_USER),
  181. group_api.get_one(Group.TIM_MANAGER),
  182. group_api.get_one(Group.TIM_ADMIN)]
  183. user = uapi.create_user(email='this.is@user',
  184. groups=groups, save_now=True)
  185. workspace_api = WorkspaceApi(
  186. current_user=user,
  187. session=self.session,
  188. config=self.app_config,
  189. )
  190. workspace = workspace_api.create_workspace(
  191. 'test workspace',
  192. save_now=True
  193. )
  194. api = ContentApi(
  195. current_user=user,
  196. session=self.session,
  197. config=self.app_config,
  198. )
  199. item = api.create(ContentType.Folder, workspace, None,
  200. 'not_archived', True)
  201. item2 = api.create(ContentType.Folder, workspace, None,
  202. 'to_archive', True)
  203. uid = user.user_id
  204. wid = workspace.workspace_id
  205. transaction.commit()
  206. # Refresh instances after commit
  207. user = uapi.get_one(uid)
  208. workspace_api = WorkspaceApi(
  209. current_user=user,
  210. session=self.session,
  211. config=self.app_config,
  212. )
  213. api = ContentApi(
  214. session=self.session,
  215. current_user=user,
  216. config=self.app_config,
  217. )
  218. items = api.get_all(None, ContentType.Any, workspace)
  219. eq_(2, len(items))
  220. items = api.get_all(None, ContentType.Any, workspace)
  221. with new_revision(
  222. session=self.session,
  223. tm=transaction.manager,
  224. content=items[0],
  225. ):
  226. api.archive(items[0])
  227. transaction.commit()
  228. # Refresh instances after commit
  229. user = uapi.get_one(uid)
  230. workspace_api = WorkspaceApi(
  231. current_user=user,
  232. session=self.session,
  233. config=self.app_config,
  234. )
  235. workspace = workspace_api.get_one(wid)
  236. api = ContentApi(
  237. current_user=user,
  238. session=self.session,
  239. config=self.app_config,
  240. )
  241. items = api.get_all(None, ContentType.Any, workspace)
  242. eq_(1, len(items))
  243. transaction.commit()
  244. # Refresh instances after commit
  245. user = uapi.get_one(uid)
  246. workspace_api = WorkspaceApi(
  247. current_user=user,
  248. session=self.session,
  249. config=self.app_config,
  250. )
  251. workspace = workspace_api.get_one(wid)
  252. api = ContentApi(
  253. current_user=user,
  254. session=self.session,
  255. config=self.app_config,
  256. )
  257. # Test that the item is still available if "show deleted" is activated
  258. api = ContentApi(
  259. current_user=None,
  260. session=self.session,
  261. config=self.app_config,
  262. show_archived=True,
  263. )
  264. items = api.get_all(None, ContentType.Any, workspace)
  265. eq_(2, len(items))
  266. def test_get_all_with_filter(self):
  267. uapi = UserApi(
  268. session=self.session,
  269. config=self.app_config,
  270. current_user=None,
  271. )
  272. group_api = GroupApi(
  273. current_user=None,
  274. session=self.session,
  275. config=self.app_config,
  276. )
  277. groups = [group_api.get_one(Group.TIM_USER),
  278. group_api.get_one(Group.TIM_MANAGER),
  279. group_api.get_one(Group.TIM_ADMIN)]
  280. user = uapi.create_user(
  281. email='this.is@user',
  282. groups=groups,
  283. save_now=True
  284. )
  285. workspace = WorkspaceApi(
  286. current_user=user,
  287. session=self.session,
  288. config=self.app_config,
  289. ).create_workspace(
  290. 'test workspace',
  291. save_now=True
  292. )
  293. api = ContentApi(
  294. current_user=user,
  295. session=self.session,
  296. config=self.app_config,
  297. )
  298. item = api.create(ContentType.Folder, workspace, None,
  299. 'thefolder', True)
  300. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  301. uid = user.user_id
  302. wid = workspace.workspace_id
  303. transaction.commit()
  304. # Refresh instances after commit
  305. user = uapi.get_one(uid)
  306. workspace_api = WorkspaceApi(
  307. current_user=user,
  308. session=self.session,
  309. config=self.app_config,
  310. )
  311. workspace = workspace_api.get_one(wid)
  312. api = ContentApi(
  313. current_user=user,
  314. session=self.session,
  315. config=self.app_config,
  316. )
  317. items = api.get_all(None, ContentType.Any, workspace)
  318. eq_(2, len(items))
  319. items2 = api.get_all(None, ContentType.File, workspace)
  320. eq_(1, len(items2))
  321. eq_('thefile', items2[0].label)
  322. items3 = api.get_all(None, ContentType.Folder, workspace)
  323. eq_(1, len(items3))
  324. eq_('thefolder', items3[0].label)
  325. def test_get_all_with_parent_id(self):
  326. uapi = UserApi(
  327. session=self.session,
  328. config=self.app_config,
  329. current_user=None,
  330. )
  331. group_api = GroupApi(
  332. current_user=None,
  333. session=self.session,
  334. config=self.app_config
  335. )
  336. groups = [group_api.get_one(Group.TIM_USER),
  337. group_api.get_one(Group.TIM_MANAGER),
  338. group_api.get_one(Group.TIM_ADMIN)]
  339. user = uapi.create_user(email='this.is@user',
  340. groups=groups, save_now=True)
  341. workspace = WorkspaceApi(
  342. current_user=user,
  343. session=self.session,
  344. config=self.app_config
  345. ).create_workspace('test workspace', save_now=True)
  346. api = ContentApi(
  347. current_user=user,
  348. session=self.session,
  349. config=self.app_config,
  350. )
  351. item = api.create(
  352. ContentType.Folder,
  353. workspace,
  354. None,
  355. 'parent',
  356. do_save=True,
  357. )
  358. item2 = api.create(
  359. ContentType.File,
  360. workspace,
  361. item,
  362. 'file1',
  363. do_save=True,
  364. )
  365. item3 = api.create(
  366. ContentType.File,
  367. workspace,
  368. None,
  369. 'file2',
  370. do_save=True,
  371. )
  372. parent_id = item.content_id
  373. child_id = item2.content_id
  374. uid = user.user_id
  375. wid = workspace.workspace_id
  376. transaction.commit()
  377. # Refresh instances after commit
  378. user = uapi.get_one(uid)
  379. workspace_api = WorkspaceApi(
  380. current_user=user,
  381. session=self.session,
  382. config=self.app_config,
  383. )
  384. workspace = workspace_api.get_one(wid)
  385. api = ContentApi(
  386. current_user=user,
  387. session=self.session,
  388. config=self.app_config,
  389. )
  390. items = api.get_all(None, ContentType.Any, workspace)
  391. eq_(3, len(items))
  392. items2 = api.get_all(parent_id, ContentType.File, workspace)
  393. eq_(1, len(items2))
  394. eq_(child_id, items2[0].content_id)
  395. def test_set_status_unknown_status(self):
  396. uapi = UserApi(
  397. session=self.session,
  398. config=self.app_config,
  399. current_user=None,
  400. )
  401. group_api = GroupApi(
  402. current_user=None,
  403. session=self.session,
  404. config=self.app_config,
  405. )
  406. groups = [group_api.get_one(Group.TIM_USER),
  407. group_api.get_one(Group.TIM_MANAGER),
  408. group_api.get_one(Group.TIM_ADMIN)]
  409. user = uapi.create_user(email='this.is@user',
  410. groups=groups, save_now=True)
  411. workspace = WorkspaceApi(
  412. current_user=user,
  413. session=self.session,
  414. config=self.app_config,
  415. ).create_workspace(
  416. 'test workspace',
  417. save_now=True
  418. )
  419. api = ContentApi(
  420. current_user=user,
  421. session=self.session,
  422. config=self.app_config,
  423. )
  424. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  425. with new_revision(
  426. session=self.session,
  427. tm=transaction.manager,
  428. content=c,
  429. ):
  430. with pytest.raises(ValueError):
  431. api.set_status(c, 'unknown-status')
  432. def test_set_status_ok(self):
  433. uapi = UserApi(
  434. session=self.session,
  435. config=self.app_config,
  436. current_user=None,
  437. )
  438. group_api = GroupApi(
  439. current_user=None,
  440. session=self.session,
  441. config=self.app_config,
  442. )
  443. groups = [group_api.get_one(Group.TIM_USER),
  444. group_api.get_one(Group.TIM_MANAGER),
  445. group_api.get_one(Group.TIM_ADMIN)]
  446. user = uapi.create_user(email='this.is@user',
  447. groups=groups, save_now=True)
  448. workspace = WorkspaceApi(
  449. current_user=user,
  450. session=self.session,
  451. config=self.app_config,
  452. ).create_workspace(
  453. 'test workspace',
  454. save_now=True
  455. )
  456. api = ContentApi(
  457. current_user=user,
  458. session=self.session,
  459. config=self.app_config,
  460. )
  461. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  462. with new_revision(
  463. session=self.session,
  464. tm=transaction.manager,
  465. content=c,
  466. ):
  467. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  468. 'closed-deprecated']:
  469. api.set_status(c, new_status)
  470. eq_(new_status, c.status)
  471. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  472. def test_create_comment_ok(self):
  473. uapi = UserApi(
  474. session=self.session,
  475. config=self.app_config,
  476. current_user=None,
  477. )
  478. group_api = GroupApi(
  479. current_user=None,
  480. session=self.session,
  481. config=self.config,
  482. )
  483. groups = [group_api.get_one(Group.TIM_USER),
  484. group_api.get_one(Group.TIM_MANAGER),
  485. group_api.get_one(Group.TIM_ADMIN)]
  486. user = uapi.create_user(email='this.is@user',
  487. groups=groups, save_now=True)
  488. workspace = WorkspaceApi(
  489. current_user=user,
  490. session=self.session,
  491. config=self.app_config,
  492. ).create_workspace(
  493. 'test workspace',
  494. save_now=True
  495. )
  496. api = ContentApi(
  497. current_user=user,
  498. session=self.session,
  499. config=self.app_config,
  500. )
  501. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  502. c = api.create_comment(workspace, p, 'this is the comment', True)
  503. eq_(Content, c.__class__)
  504. eq_(p.content_id, c.parent_id)
  505. eq_(user, c.owner)
  506. eq_(workspace, c.workspace)
  507. eq_(ContentType.Comment, c.type)
  508. eq_('this is the comment', c.description)
  509. eq_('', c.label)
  510. eq_(ActionDescription.COMMENT, c.revision_type)
  511. def test_unit_copy_file_different_label_different_parent_ok(self):
  512. uapi = UserApi(
  513. session=self.session,
  514. config=self.app_config,
  515. current_user=None,
  516. )
  517. group_api = GroupApi(
  518. current_user=None,
  519. session=self.session,
  520. config=self.app_config
  521. )
  522. groups = [group_api.get_one(Group.TIM_USER),
  523. group_api.get_one(Group.TIM_MANAGER),
  524. group_api.get_one(Group.TIM_ADMIN)]
  525. user = uapi.create_user(
  526. email='user1@user',
  527. groups=groups,
  528. save_now=True
  529. )
  530. user2 = uapi.create_user(
  531. email='user2@user',
  532. groups=groups,
  533. save_now=True
  534. )
  535. workspace = WorkspaceApi(
  536. current_user=user,
  537. session=self.session,
  538. config=self.app_config,
  539. ).create_workspace(
  540. 'test workspace',
  541. save_now=True
  542. )
  543. RoleApi(
  544. current_user=user,
  545. session=self.session,
  546. config=self.app_config,
  547. ).create_one(
  548. user2,
  549. workspace,
  550. UserRoleInWorkspace.WORKSPACE_MANAGER,
  551. with_notif=False
  552. )
  553. api = ContentApi(
  554. current_user=user,
  555. session=self.session,
  556. config=self.app_config,
  557. )
  558. foldera = api.create(
  559. ContentType.Folder,
  560. workspace,
  561. None,
  562. 'folder a',
  563. True
  564. )
  565. with self.session.no_autoflush:
  566. text_file = api.create(
  567. content_type=ContentType.File,
  568. workspace=workspace,
  569. parent=foldera,
  570. label='test_file',
  571. do_save=False,
  572. )
  573. api.update_file_data(
  574. text_file,
  575. 'test_file',
  576. 'text/plain',
  577. b'test_content'
  578. )
  579. api.save(text_file, ActionDescription.CREATION)
  580. api2 = ContentApi(
  581. current_user=user2,
  582. session=self.session,
  583. config=self.app_config,
  584. )
  585. workspace2 = WorkspaceApi(
  586. current_user=user2,
  587. session=self.session,
  588. config=self.app_config,
  589. ).create_workspace(
  590. 'test workspace2',
  591. save_now=True
  592. )
  593. folderb = api2.create(
  594. ContentType.Folder,
  595. workspace2,
  596. None,
  597. 'folder b',
  598. True
  599. )
  600. api2.copy(
  601. item=text_file,
  602. new_parent=folderb,
  603. new_label='test_file_copy'
  604. )
  605. transaction.commit()
  606. text_file_copy = api2.get_one_by_label_and_parent(
  607. 'test_file_copy',
  608. folderb,
  609. )
  610. assert text_file != text_file_copy
  611. assert text_file_copy.content_id != text_file.content_id
  612. assert text_file_copy.workspace_id == workspace2.workspace_id
  613. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  614. assert text_file_copy.depot_file.path != text_file.depot_file.path
  615. assert text_file_copy.label == 'test_file_copy'
  616. assert text_file_copy.type == text_file.type
  617. assert text_file_copy.parent.content_id == folderb.content_id
  618. assert text_file_copy.owner.user_id == user.user_id
  619. assert text_file_copy.description == text_file.description
  620. assert text_file_copy.file_extension == text_file.file_extension
  621. assert text_file_copy.file_mimetype == text_file.file_mimetype
  622. assert text_file_copy.revision_type == ActionDescription.COPY
  623. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  624. def test_unit_copy_file__same_label_different_parent_ok(self):
  625. uapi = UserApi(
  626. session=self.session,
  627. config=self.app_config,
  628. current_user=None,
  629. )
  630. group_api = GroupApi(
  631. current_user=None,
  632. session=self.session,
  633. config=self.app_config,
  634. )
  635. groups = [group_api.get_one(Group.TIM_USER),
  636. group_api.get_one(Group.TIM_MANAGER),
  637. group_api.get_one(Group.TIM_ADMIN)]
  638. user = uapi.create_user(
  639. email='user1@user',
  640. groups=groups,
  641. save_now=True
  642. )
  643. user2 = uapi.create_user(
  644. email='user2@user',
  645. groups=groups,
  646. save_now=True
  647. )
  648. workspace = WorkspaceApi(
  649. current_user=user,
  650. session=self.session,
  651. config=self.app_config,
  652. ).create_workspace(
  653. 'test workspace',
  654. save_now=True
  655. )
  656. RoleApi(
  657. current_user=user,
  658. session=self.session,
  659. config=self.app_config,
  660. ).create_one(
  661. user2,
  662. workspace,
  663. UserRoleInWorkspace.WORKSPACE_MANAGER,
  664. with_notif=False
  665. )
  666. api = ContentApi(
  667. current_user=user,
  668. session=self.session,
  669. config=self.app_config,
  670. )
  671. foldera = api.create(
  672. ContentType.Folder,
  673. workspace,
  674. None,
  675. 'folder a',
  676. True
  677. )
  678. with self.session.no_autoflush:
  679. text_file = api.create(
  680. content_type=ContentType.File,
  681. workspace=workspace,
  682. parent=foldera,
  683. label='test_file',
  684. do_save=False,
  685. )
  686. api.update_file_data(
  687. text_file,
  688. 'test_file',
  689. 'text/plain',
  690. b'test_content'
  691. )
  692. api.save(text_file, ActionDescription.CREATION)
  693. api2 = ContentApi(
  694. current_user=user2,
  695. session=self.session,
  696. config=self.app_config,
  697. )
  698. workspace2 = WorkspaceApi(
  699. current_user=user2,
  700. session=self.session,
  701. config=self.app_config,
  702. ).create_workspace(
  703. 'test workspace2',
  704. save_now=True
  705. )
  706. folderb = api2.create(
  707. ContentType.Folder,
  708. workspace2,
  709. None,
  710. 'folder b',
  711. True
  712. )
  713. api2.copy(
  714. item=text_file,
  715. new_parent=folderb,
  716. )
  717. transaction.commit()
  718. text_file_copy = api2.get_one_by_label_and_parent(
  719. 'test_file',
  720. folderb,
  721. )
  722. assert text_file != text_file_copy
  723. assert text_file_copy.content_id != text_file.content_id
  724. assert text_file_copy.workspace_id == workspace2.workspace_id
  725. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  726. assert text_file_copy.depot_file.path != text_file.depot_file.path
  727. assert text_file_copy.label == text_file.label
  728. assert text_file_copy.type == text_file.type
  729. assert text_file_copy.parent.content_id == folderb.content_id
  730. assert text_file_copy.owner.user_id == user.user_id
  731. assert text_file_copy.description == text_file.description
  732. assert text_file_copy.file_extension == text_file.file_extension
  733. assert text_file_copy.file_mimetype == text_file.file_mimetype
  734. assert text_file_copy.revision_type == ActionDescription.COPY
  735. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  736. def test_unit_copy_file_different_label_same_parent_ok(self):
  737. uapi = UserApi(
  738. session=self.session,
  739. config=self.app_config,
  740. current_user=None,
  741. )
  742. group_api = GroupApi(
  743. current_user=None,
  744. session=self.session,
  745. config=self.app_config,
  746. )
  747. groups = [group_api.get_one(Group.TIM_USER),
  748. group_api.get_one(Group.TIM_MANAGER),
  749. group_api.get_one(Group.TIM_ADMIN)]
  750. user = uapi.create_user(
  751. email='user1@user',
  752. groups=groups,
  753. save_now=True,
  754. )
  755. user2 = uapi.create_user(
  756. email='user2@user',
  757. groups=groups,
  758. save_now=True
  759. )
  760. workspace = WorkspaceApi(
  761. current_user=user,
  762. session=self.session,
  763. config=self.app_config,
  764. ).create_workspace(
  765. 'test workspace',
  766. save_now=True
  767. )
  768. RoleApi(
  769. current_user=user,
  770. session=self.session,
  771. config=self.app_config,
  772. ).create_one(
  773. user2, workspace,
  774. UserRoleInWorkspace.WORKSPACE_MANAGER,
  775. with_notif=False
  776. )
  777. api = ContentApi(
  778. current_user=user,
  779. session=self.session,
  780. config=self.app_config,
  781. )
  782. foldera = api.create(
  783. ContentType.Folder,
  784. workspace,
  785. None,
  786. 'folder a',
  787. True
  788. )
  789. with self.session.no_autoflush:
  790. text_file = api.create(
  791. content_type=ContentType.File,
  792. workspace=workspace,
  793. parent=foldera,
  794. label='test_file',
  795. do_save=False,
  796. )
  797. api.update_file_data(
  798. text_file,
  799. 'test_file',
  800. 'text/plain',
  801. b'test_content'
  802. )
  803. api.save(
  804. text_file,
  805. ActionDescription.CREATION
  806. )
  807. api2 = ContentApi(
  808. current_user=user2,
  809. session=self.session,
  810. config=self.app_config,
  811. )
  812. api2.copy(
  813. item=text_file,
  814. new_label='test_file_copy'
  815. )
  816. transaction.commit()
  817. text_file_copy = api2.get_one_by_label_and_parent(
  818. 'test_file_copy',
  819. foldera,
  820. )
  821. assert text_file != text_file_copy
  822. assert text_file_copy.content_id != text_file.content_id
  823. assert text_file_copy.workspace_id == workspace.workspace_id
  824. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  825. assert text_file_copy.depot_file.path != text_file.depot_file.path
  826. assert text_file_copy.label == 'test_file_copy'
  827. assert text_file_copy.type == text_file.type
  828. assert text_file_copy.parent.content_id == foldera.content_id
  829. assert text_file_copy.owner.user_id == user.user_id
  830. assert text_file_copy.description == text_file.description
  831. assert text_file_copy.file_extension == text_file.file_extension
  832. assert text_file_copy.file_mimetype == text_file.file_mimetype
  833. assert text_file_copy.revision_type == ActionDescription.COPY
  834. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  835. def test_mark_read__workspace(self):
  836. uapi = UserApi(
  837. session=self.session,
  838. config=self.app_config,
  839. current_user=None,
  840. )
  841. group_api = GroupApi(
  842. current_user=None,
  843. session=self.session,
  844. config=self.app_config,
  845. )
  846. groups = [group_api.get_one(Group.TIM_USER),
  847. group_api.get_one(Group.TIM_MANAGER),
  848. group_api.get_one(Group.TIM_ADMIN)]
  849. user_a = uapi.create_user(email='this.is@user',
  850. groups=groups, save_now=True)
  851. user_b = uapi.create_user(email='this.is@another.user',
  852. groups=groups, save_now=True)
  853. wapi = WorkspaceApi(
  854. current_user=user_a,
  855. session=self.session,
  856. config=self.app_config,
  857. )
  858. workspace1 = wapi.create_workspace(
  859. 'test workspace n°1',
  860. save_now=True)
  861. workspace2 = wapi.create_workspace(
  862. 'test workspace n°2',
  863. save_now=True)
  864. role_api1 = RoleApi(
  865. current_user=user_a,
  866. session=self.session,
  867. config=self.app_config,
  868. )
  869. role_api1.create_one(
  870. user_b,
  871. workspace1,
  872. UserRoleInWorkspace.READER,
  873. False
  874. )
  875. role_api2 = RoleApi(
  876. current_user=user_b,
  877. session=self.session,
  878. config=self.app_config,
  879. )
  880. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  881. False)
  882. cont_api_a = ContentApi(
  883. current_user=user_a,
  884. session=self.session,
  885. config=self.app_config,
  886. )
  887. cont_api_b = ContentApi(
  888. current_user=user_b,
  889. session=self.session,
  890. config=self.app_config,
  891. )
  892. # Creates page_1 & page_2 in workspace 1
  893. # and page_3 & page_4 in workspace 2
  894. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  895. 'this is a page', do_save=True)
  896. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  897. 'this is page1', do_save=True)
  898. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  899. 'this is page2', do_save=True)
  900. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  901. 'this is page3', do_save=True)
  902. for rev in page_1.revisions:
  903. eq_(user_b not in rev.read_by.keys(), True)
  904. for rev in page_2.revisions:
  905. eq_(user_b not in rev.read_by.keys(), True)
  906. for rev in page_3.revisions:
  907. eq_(user_b not in rev.read_by.keys(), True)
  908. for rev in page_4.revisions:
  909. eq_(user_b not in rev.read_by.keys(), True)
  910. # Set as read the workspace n°1
  911. cont_api_b.mark_read__workspace(workspace=workspace1)
  912. for rev in page_1.revisions:
  913. eq_(user_b in rev.read_by.keys(), True)
  914. for rev in page_2.revisions:
  915. eq_(user_b in rev.read_by.keys(), True)
  916. for rev in page_3.revisions:
  917. eq_(user_b not in rev.read_by.keys(), True)
  918. for rev in page_4.revisions:
  919. eq_(user_b not in rev.read_by.keys(), True)
  920. # Set as read the workspace n°2
  921. cont_api_b.mark_read__workspace(workspace=workspace2)
  922. for rev in page_1.revisions:
  923. eq_(user_b in rev.read_by.keys(), True)
  924. for rev in page_2.revisions:
  925. eq_(user_b in rev.read_by.keys(), True)
  926. for rev in page_3.revisions:
  927. eq_(user_b in rev.read_by.keys(), True)
  928. for rev in page_4.revisions:
  929. eq_(user_b in rev.read_by.keys(), True)
  930. def test_mark_read(self):
  931. uapi = UserApi(
  932. session=self.session,
  933. config=self.app_config,
  934. current_user=None,
  935. )
  936. group_api = GroupApi(
  937. current_user=None,
  938. session=self.session,
  939. config = self.app_config,
  940. )
  941. groups = [group_api.get_one(Group.TIM_USER),
  942. group_api.get_one(Group.TIM_MANAGER),
  943. group_api.get_one(Group.TIM_ADMIN)]
  944. user_a = uapi.create_user(
  945. email='this.is@user',
  946. groups=groups,
  947. save_now=True
  948. )
  949. user_b = uapi.create_user(
  950. email='this.is@another.user',
  951. groups=groups,
  952. save_now=True
  953. )
  954. wapi = WorkspaceApi(
  955. current_user=user_a,
  956. session=self.session,
  957. config=self.app_config,
  958. )
  959. workspace_api = WorkspaceApi(
  960. current_user=user_a,
  961. session=self.session,
  962. config=self.app_config,
  963. )
  964. workspace = wapi.create_workspace(
  965. 'test workspace',
  966. save_now=True)
  967. role_api = RoleApi(
  968. current_user=user_a,
  969. session=self.session,
  970. config=self.app_config,
  971. )
  972. role_api.create_one(
  973. user_b,
  974. workspace,
  975. UserRoleInWorkspace.READER,
  976. False
  977. )
  978. cont_api_a = ContentApi(
  979. current_user=user_a,
  980. session=self.session,
  981. config=self.app_config,
  982. )
  983. cont_api_b = ContentApi(
  984. current_user=user_b,
  985. session=self.session,
  986. config=self.app_config,
  987. )
  988. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  989. 'this is a page', do_save=True)
  990. for rev in page_1.revisions:
  991. eq_(user_b not in rev.read_by.keys(), True)
  992. cont_api_b.mark_read(page_1)
  993. for rev in page_1.revisions:
  994. eq_(user_b in rev.read_by.keys(), True)
  995. def test_mark_read__all(self):
  996. uapi = UserApi(
  997. session=self.session,
  998. config=self.app_config,
  999. current_user=None,
  1000. )
  1001. group_api = GroupApi(
  1002. current_user=None,
  1003. session=self.session,
  1004. config=self.app_config,
  1005. )
  1006. groups = [group_api.get_one(Group.TIM_USER),
  1007. group_api.get_one(Group.TIM_MANAGER),
  1008. group_api.get_one(Group.TIM_ADMIN)]
  1009. user_a = uapi.create_user(
  1010. email='this.is@user',
  1011. groups=groups,
  1012. save_now=True
  1013. )
  1014. user_b = uapi.create_user(
  1015. email='this.is@another.user',
  1016. groups=groups,
  1017. save_now=True
  1018. )
  1019. wapi = WorkspaceApi(
  1020. current_user=user_a,
  1021. session=self.session,
  1022. config=self.app_config,
  1023. )
  1024. workspace = wapi.create_workspace(
  1025. 'test workspace',
  1026. save_now=True)
  1027. role_api = RoleApi(
  1028. current_user=user_a,
  1029. session=self.session,
  1030. config=self.app_config,
  1031. )
  1032. role_api.create_one(
  1033. user_b,
  1034. workspace,
  1035. UserRoleInWorkspace.READER,
  1036. False
  1037. )
  1038. cont_api_a = ContentApi(
  1039. current_user=user_a,
  1040. session=self.session,
  1041. config=self.app_config,
  1042. )
  1043. cont_api_b = ContentApi(
  1044. current_user=user_b,
  1045. session=self.session,
  1046. config=self.app_config,
  1047. )
  1048. page_2 = cont_api_a.create(
  1049. ContentType.Page,
  1050. workspace,
  1051. None,
  1052. 'this is page1',
  1053. do_save=True
  1054. )
  1055. page_3 = cont_api_a.create(
  1056. ContentType.Thread,
  1057. workspace,
  1058. None,
  1059. 'this is page2',
  1060. do_save=True
  1061. )
  1062. page_4 = cont_api_a.create(
  1063. ContentType.File,
  1064. workspace,
  1065. None,
  1066. 'this is page3',
  1067. do_save=True
  1068. )
  1069. for rev in page_2.revisions:
  1070. eq_(user_b not in rev.read_by.keys(), True)
  1071. for rev in page_3.revisions:
  1072. eq_(user_b not in rev.read_by.keys(), True)
  1073. for rev in page_4.revisions:
  1074. eq_(user_b not in rev.read_by.keys(), True)
  1075. self.session.refresh(page_2)
  1076. self.session.refresh(page_3)
  1077. self.session.refresh(page_4)
  1078. cont_api_b.mark_read__all()
  1079. for rev in page_2.revisions:
  1080. eq_(user_b in rev.read_by.keys(), True)
  1081. for rev in page_3.revisions:
  1082. eq_(user_b in rev.read_by.keys(), True)
  1083. for rev in page_4.revisions:
  1084. eq_(user_b in rev.read_by.keys(), True)
  1085. def test_update(self):
  1086. uapi = UserApi(
  1087. session=self.session,
  1088. config=self.app_config,
  1089. current_user=None,
  1090. )
  1091. group_api = GroupApi(
  1092. current_user=None,
  1093. session=self.session,
  1094. config=self.app_config,
  1095. )
  1096. groups = [group_api.get_one(Group.TIM_USER),
  1097. group_api.get_one(Group.TIM_MANAGER),
  1098. group_api.get_one(Group.TIM_ADMIN)]
  1099. user1 = uapi.create_user(
  1100. email='this.is@user',
  1101. groups=groups,
  1102. save_now=True
  1103. )
  1104. workspace_api = WorkspaceApi(
  1105. current_user=user1,
  1106. session=self.session,
  1107. config=self.app_config,
  1108. )
  1109. workspace = workspace_api.create_workspace(
  1110. 'test workspace',
  1111. save_now=True
  1112. )
  1113. wid = workspace.workspace_id
  1114. user2 = uapi.create_user()
  1115. user2.email = 'this.is@another.user'
  1116. uapi.save(user2)
  1117. RoleApi(
  1118. current_user=user1,
  1119. session=self.session,
  1120. config=self.app_config,
  1121. ).create_one(
  1122. user2,
  1123. workspace,
  1124. UserRoleInWorkspace.CONTENT_MANAGER,
  1125. with_notif=False,
  1126. flush=True
  1127. )
  1128. # Test starts here
  1129. api = ContentApi(
  1130. current_user=user1,
  1131. session=self.session,
  1132. config=self.app_config,
  1133. )
  1134. p = api.create(ContentType.Page, workspace, None,
  1135. 'this_is_a_page', True)
  1136. u1id = user1.user_id
  1137. u2id = user2.user_id
  1138. pcid = p.content_id
  1139. poid = p.owner_id
  1140. transaction.commit()
  1141. # Refresh instances after commit
  1142. user1 = uapi.get_one(u1id)
  1143. workspace = WorkspaceApi(
  1144. current_user=user1,
  1145. session=self.session,
  1146. config=self.app_config,
  1147. ).get_one(wid)
  1148. api = ContentApi(
  1149. current_user=user1,
  1150. session=self.session,
  1151. config=self.app_config,
  1152. )
  1153. content = api.get_one(pcid, ContentType.Any, workspace)
  1154. eq_(u1id, content.owner_id)
  1155. eq_(poid, content.owner_id)
  1156. u2 = UserApi(
  1157. session=self.session,
  1158. config=self.app_config,
  1159. current_user=None,
  1160. ).get_one(u2id)
  1161. api2 = ContentApi(
  1162. current_user=u2,
  1163. session=self.session,
  1164. config=self.app_config,
  1165. )
  1166. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1167. with new_revision(
  1168. session=self.session,
  1169. tm=transaction.manager,
  1170. content=content2,
  1171. ):
  1172. api2.update_content(
  1173. content2,
  1174. 'this is an updated page',
  1175. 'new content'
  1176. )
  1177. api2.save(content2)
  1178. transaction.commit()
  1179. # Refresh instances after commit
  1180. user1 = uapi.get_one(u1id)
  1181. workspace = WorkspaceApi(
  1182. current_user=user1,
  1183. session=self.session,
  1184. config=self.app_config,
  1185. ).get_one(wid)
  1186. api = ContentApi(
  1187. current_user=user1,
  1188. session=self.session,
  1189. config=self.app_config,
  1190. )
  1191. updated = api.get_one(pcid, ContentType.Any, workspace)
  1192. eq_(u2id, updated.owner_id,
  1193. 'the owner id should be {} (found {})'.format(u2id,
  1194. updated.owner_id))
  1195. eq_('this is an updated page', updated.label)
  1196. eq_('new content', updated.description)
  1197. eq_(ActionDescription.EDITION, updated.revision_type)
  1198. def test_update_no_change(self):
  1199. uapi = UserApi(
  1200. session=self.session,
  1201. config=self.app_config,
  1202. current_user=None,
  1203. )
  1204. group_api = GroupApi(
  1205. current_user=None,
  1206. session=self.session,
  1207. config = self.app_config,
  1208. )
  1209. groups = [group_api.get_one(Group.TIM_USER),
  1210. group_api.get_one(Group.TIM_MANAGER),
  1211. group_api.get_one(Group.TIM_ADMIN)]
  1212. user1 = uapi.create_user(
  1213. email='this.is@user',
  1214. groups=groups,
  1215. save_now=True,
  1216. )
  1217. workspace = WorkspaceApi(
  1218. current_user=user1,
  1219. session=self.session,
  1220. config=self.app_config,
  1221. ).create_workspace(
  1222. 'test workspace',
  1223. save_now=True
  1224. )
  1225. user2 = uapi.create_user()
  1226. user2.email = 'this.is@another.user'
  1227. uapi.save(user2)
  1228. RoleApi(
  1229. current_user=user1,
  1230. session=self.session,
  1231. config=self.app_config,
  1232. ).create_one(
  1233. user2,
  1234. workspace,
  1235. UserRoleInWorkspace.CONTENT_MANAGER,
  1236. with_notif=False,
  1237. flush=True
  1238. )
  1239. api = ContentApi(
  1240. current_user=user1,
  1241. session=self.session,
  1242. config=self.app_config,
  1243. )
  1244. with self.session.no_autoflush:
  1245. page = api.create(
  1246. content_type=ContentType.Page,
  1247. workspace=workspace,
  1248. label="same_content",
  1249. do_save=False
  1250. )
  1251. page.description = "Same_content_here"
  1252. api.save(page, ActionDescription.CREATION, do_notify=True)
  1253. transaction.commit()
  1254. api2 = ContentApi(
  1255. current_user=user2,
  1256. session=self.session,
  1257. config=self.app_config,
  1258. )
  1259. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1260. with new_revision(
  1261. session=self.session,
  1262. tm=transaction.manager,
  1263. content=content2,
  1264. ):
  1265. with pytest.raises(SameValueError):
  1266. api2.update_content(
  1267. item=content2,
  1268. new_label='same_content',
  1269. new_content='Same_content_here'
  1270. )
  1271. api2.save(content2)
  1272. transaction.commit()
  1273. def test_update_file_data(self):
  1274. uapi = UserApi(
  1275. session=self.session,
  1276. config=self.app_config,
  1277. current_user=None,
  1278. )
  1279. group_api = GroupApi(
  1280. current_user=None,
  1281. session=self.session,
  1282. config=self.app_config,
  1283. )
  1284. groups = [group_api.get_one(Group.TIM_USER),
  1285. group_api.get_one(Group.TIM_MANAGER),
  1286. group_api.get_one(Group.TIM_ADMIN)]
  1287. user1 = uapi.create_user(
  1288. email='this.is@user',
  1289. groups=groups,
  1290. save_now=True
  1291. )
  1292. workspace_api = WorkspaceApi(
  1293. current_user=user1,
  1294. session=self.session,
  1295. config=self.app_config,
  1296. )
  1297. workspace = workspace_api.create_workspace(
  1298. 'test workspace',
  1299. save_now=True
  1300. )
  1301. wid = workspace.workspace_id
  1302. user2 = uapi.create_user()
  1303. user2.email = 'this.is@another.user'
  1304. uapi.save(user2)
  1305. RoleApi(
  1306. current_user=user1,
  1307. session=self.session,
  1308. config=self.app_config,
  1309. ).create_one(
  1310. user2,
  1311. workspace,
  1312. UserRoleInWorkspace.CONTENT_MANAGER,
  1313. with_notif=True,
  1314. flush=True
  1315. )
  1316. # Test starts here
  1317. api = ContentApi(
  1318. current_user=user1,
  1319. session=self.session,
  1320. config=self.app_config,
  1321. )
  1322. p = api.create(ContentType.File, workspace, None,
  1323. 'this_is_a_page', True)
  1324. u1id = user1.user_id
  1325. u2id = user2.user_id
  1326. pcid = p.content_id
  1327. poid = p.owner_id
  1328. api.save(p)
  1329. transaction.commit()
  1330. # Refresh instances after commit
  1331. user1 = uapi.get_one(u1id)
  1332. workspace_api2 = WorkspaceApi(
  1333. current_user=user1,
  1334. session=self.session,
  1335. config=self.app_config,
  1336. )
  1337. workspace = workspace_api2.get_one(wid)
  1338. api = ContentApi(
  1339. current_user=user1,
  1340. session=self.session,
  1341. config=self.app_config,
  1342. )
  1343. content = api.get_one(pcid, ContentType.Any, workspace)
  1344. eq_(u1id, content.owner_id)
  1345. eq_(poid, content.owner_id)
  1346. u2 = UserApi(
  1347. current_user=None,
  1348. session=self.session,
  1349. config=self.app_config,
  1350. ).get_one(u2id)
  1351. api2 = ContentApi(
  1352. current_user=u2,
  1353. session=self.session,
  1354. config=self.app_config,
  1355. )
  1356. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1357. with new_revision(
  1358. session=self.session,
  1359. tm=transaction.manager,
  1360. content=content2,
  1361. ):
  1362. api2.update_file_data(
  1363. content2,
  1364. 'index.html',
  1365. 'text/html',
  1366. b'<html>hello world</html>'
  1367. )
  1368. api2.save(content2)
  1369. transaction.commit()
  1370. # Refresh instances after commit
  1371. user1 = uapi.get_one(u1id)
  1372. workspace = WorkspaceApi(
  1373. current_user=user1,
  1374. session=self.session,
  1375. config=self.app_config,
  1376. ).get_one(wid)
  1377. updated = api.get_one(pcid, ContentType.Any, workspace)
  1378. eq_(u2id, updated.owner_id,
  1379. 'the owner id should be {} (found {})'.format(u2id,
  1380. updated.owner_id))
  1381. eq_('this_is_a_page.html', updated.file_name)
  1382. eq_('text/html', updated.file_mimetype)
  1383. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1384. eq_(ActionDescription.REVISION, updated.revision_type)
  1385. def test_update_no_change(self):
  1386. uapi = UserApi(
  1387. session=self.session,
  1388. config=self.app_config,
  1389. current_user=None,
  1390. )
  1391. group_api = GroupApi(
  1392. current_user=None,
  1393. session=self.session,
  1394. config=self.app_config,
  1395. )
  1396. groups = [group_api.get_one(Group.TIM_USER),
  1397. group_api.get_one(Group.TIM_MANAGER),
  1398. group_api.get_one(Group.TIM_ADMIN)]
  1399. user1 = uapi.create_user(
  1400. email='this.is@user',
  1401. groups=groups,
  1402. save_now=True,
  1403. )
  1404. workspace_api = WorkspaceApi(
  1405. current_user=user1,
  1406. session=self.session,
  1407. config=self.app_config,
  1408. )
  1409. workspace = workspace_api.create_workspace(
  1410. 'test workspace',
  1411. save_now=True
  1412. )
  1413. user2 = uapi.create_user()
  1414. user2.email = 'this.is@another.user'
  1415. uapi.save(user2)
  1416. RoleApi(
  1417. current_user=user1,
  1418. session=self.session,
  1419. config=self.app_config,
  1420. ).create_one(
  1421. user2,
  1422. workspace,
  1423. UserRoleInWorkspace.CONTENT_MANAGER,
  1424. with_notif=False,
  1425. flush=True
  1426. )
  1427. api = ContentApi(
  1428. current_user=user1,
  1429. session=self.session,
  1430. config=self.app_config,
  1431. )
  1432. with self.session.no_autoflush:
  1433. page = api.create(
  1434. content_type=ContentType.Page,
  1435. workspace=workspace,
  1436. label="same_content",
  1437. do_save=False
  1438. )
  1439. api.update_file_data(
  1440. page,
  1441. 'index.html',
  1442. 'text/html',
  1443. b'<html>Same Content Here</html>'
  1444. )
  1445. api.save(page, ActionDescription.CREATION, do_notify=True)
  1446. transaction.commit()
  1447. api2 = ContentApi(
  1448. current_user=user2,
  1449. session=self.session,
  1450. config=self.app_config,
  1451. )
  1452. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1453. with new_revision(
  1454. session=self.session,
  1455. tm=transaction.manager,
  1456. content=content2,
  1457. ):
  1458. with pytest.raises(SameValueError):
  1459. api2.update_file_data(
  1460. page,
  1461. 'index.html',
  1462. 'text/html',
  1463. b'<html>Same Content Here</html>'
  1464. )
  1465. api2.save(content2)
  1466. transaction.commit()
  1467. def test_archive_unarchive(self):
  1468. uapi = UserApi(
  1469. session=self.session,
  1470. config=self.app_config,
  1471. current_user=None,
  1472. )
  1473. group_api = GroupApi(
  1474. current_user=None,
  1475. session=self.session,
  1476. config=self.app_config,
  1477. )
  1478. groups = [group_api.get_one(Group.TIM_USER),
  1479. group_api.get_one(Group.TIM_MANAGER),
  1480. group_api.get_one(Group.TIM_ADMIN)]
  1481. user1 = uapi.create_user(
  1482. email='this.is@user',
  1483. groups=groups,
  1484. save_now=True
  1485. )
  1486. u1id = user1.user_id
  1487. workspace_api = WorkspaceApi(
  1488. current_user=user1,
  1489. session=self.session,
  1490. config=self.app_config,
  1491. )
  1492. workspace = workspace_api.create_workspace(
  1493. 'test workspace',
  1494. save_now=True
  1495. )
  1496. wid = workspace.workspace_id
  1497. user2 = uapi.create_user()
  1498. user2.email = 'this.is@another.user'
  1499. uapi.save(user2)
  1500. RoleApi(
  1501. current_user=user1,
  1502. session=self.session,
  1503. config=self.app_config,
  1504. ).create_one(
  1505. user2,
  1506. workspace,
  1507. UserRoleInWorkspace.CONTENT_MANAGER,
  1508. with_notif=True,
  1509. flush=True
  1510. )
  1511. # show archived is used at the top end of the test
  1512. api = ContentApi(
  1513. current_user=user1,
  1514. session=self.session,
  1515. show_archived=True,
  1516. config=self.app_config,
  1517. )
  1518. p = api.create(ContentType.File, workspace, None,
  1519. 'this_is_a_page', True)
  1520. u1id = user1.user_id
  1521. u2id = user2.user_id
  1522. pcid = p.content_id
  1523. poid = p.owner_id
  1524. transaction.commit()
  1525. ####
  1526. # refresh after commit
  1527. user1 = UserApi(
  1528. current_user=None,
  1529. config=self.app_config,
  1530. session=self.session
  1531. ).get_one(u1id)
  1532. workspace = WorkspaceApi(
  1533. current_user=user1,
  1534. session=self.session,
  1535. config=self.app_config,
  1536. ).get_one(wid)
  1537. content = api.get_one(pcid, ContentType.Any, workspace)
  1538. eq_(u1id, content.owner_id)
  1539. eq_(poid, content.owner_id)
  1540. u2api = UserApi(
  1541. session=self.session,
  1542. config=self.app_config,
  1543. current_user=None,
  1544. )
  1545. u2 = u2api.get_one(u2id)
  1546. api2 = ContentApi(
  1547. current_user=u2,
  1548. session=self.session,
  1549. config=self.app_config,
  1550. show_archived=True,
  1551. )
  1552. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1553. with new_revision(
  1554. session=self.session,
  1555. tm=transaction.manager,
  1556. content=content2,
  1557. ):
  1558. api2.archive(content2)
  1559. api2.save(content2)
  1560. transaction.commit()
  1561. # refresh after commit
  1562. user1 = UserApi(
  1563. current_user=None,
  1564. session=self.session,
  1565. config=self.app_config,
  1566. ).get_one(u1id)
  1567. workspace = WorkspaceApi(
  1568. current_user=user1,
  1569. session=self.session,
  1570. config=self.app_config,
  1571. ).get_one(wid)
  1572. u2 = UserApi(
  1573. current_user=None,
  1574. session=self.session,
  1575. config=self.app_config,
  1576. ).get_one(u2id)
  1577. api = ContentApi(
  1578. current_user=user1,
  1579. session=self.session,
  1580. config=self.app_config,
  1581. show_archived=True,
  1582. )
  1583. api2 = ContentApi(
  1584. current_user=u2,
  1585. session=self.session,
  1586. config=self.app_config,
  1587. show_archived=True,
  1588. )
  1589. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1590. eq_(u2id, updated.owner_id,
  1591. 'the owner id should be {} (found {})'.format(u2id,
  1592. updated.owner_id))
  1593. eq_(True, updated.is_archived)
  1594. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1595. ####
  1596. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1597. with new_revision(
  1598. session=self.session,
  1599. tm=transaction.manager,
  1600. content=updated,
  1601. ):
  1602. api.unarchive(updated)
  1603. api.save(updated2)
  1604. eq_(False, updated2.is_archived)
  1605. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1606. eq_(u1id, updated2.owner_id)
  1607. def test_delete_undelete(self):
  1608. uapi = UserApi(
  1609. session=self.session,
  1610. config=self.app_config,
  1611. current_user=None,
  1612. )
  1613. group_api = GroupApi(
  1614. current_user=None,
  1615. session=self.session,
  1616. config=self.app_config,
  1617. )
  1618. groups = [group_api.get_one(Group.TIM_USER),
  1619. group_api.get_one(Group.TIM_MANAGER),
  1620. group_api.get_one(Group.TIM_ADMIN)]
  1621. user1 = uapi.create_user(
  1622. email='this.is@user',
  1623. groups=groups,
  1624. save_now=True
  1625. )
  1626. u1id = user1.user_id
  1627. workspace_api = WorkspaceApi(
  1628. current_user=user1,
  1629. session=self.session,
  1630. config=self.app_config,
  1631. )
  1632. workspace = workspace_api.create_workspace(
  1633. 'test workspace',
  1634. save_now=True
  1635. )
  1636. wid = workspace.workspace_id
  1637. user2 = uapi.create_user()
  1638. user2.email = 'this.is@another.user'
  1639. uapi.save(user2)
  1640. RoleApi(
  1641. current_user=user1,
  1642. session=self.session,
  1643. config=self.app_config,
  1644. ).create_one(
  1645. user2,
  1646. workspace,
  1647. UserRoleInWorkspace.CONTENT_MANAGER,
  1648. with_notif=True,
  1649. flush=True
  1650. )
  1651. # show archived is used at the top end of the test
  1652. api = ContentApi(
  1653. current_user=user1,
  1654. session=self.session,
  1655. config=self.app_config,
  1656. show_deleted=True,
  1657. )
  1658. p = api.create(ContentType.File, workspace, None,
  1659. 'this_is_a_page', True)
  1660. u1id = user1.user_id
  1661. u2id = user2.user_id
  1662. pcid = p.content_id
  1663. poid = p.owner_id
  1664. transaction.commit()
  1665. ####
  1666. user1 = UserApi(
  1667. current_user=None,
  1668. session=self.session,
  1669. config=self.app_config,
  1670. ).get_one(u1id)
  1671. workspace = WorkspaceApi(
  1672. current_user=user1,
  1673. session=self.session,
  1674. config=self.app_config,
  1675. ).get_one(wid)
  1676. content = api.get_one(pcid, ContentType.Any, workspace)
  1677. eq_(u1id, content.owner_id)
  1678. eq_(poid, content.owner_id)
  1679. u2 = UserApi(
  1680. current_user=None,
  1681. session=self.session,
  1682. config=self.app_config,
  1683. ).get_one(u2id)
  1684. api2 = ContentApi(
  1685. current_user=u2,
  1686. session=self.session,
  1687. config=self.app_config,
  1688. show_deleted=True,
  1689. )
  1690. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1691. with new_revision(
  1692. session=self.session,
  1693. tm=transaction.manager,
  1694. content=content2,
  1695. ):
  1696. api2.delete(content2)
  1697. api2.save(content2)
  1698. transaction.commit()
  1699. ####
  1700. user1 = UserApi(
  1701. current_user=None,
  1702. session=self.session,
  1703. config=self.app_config,
  1704. ).get_one(u1id)
  1705. workspace = WorkspaceApi(
  1706. current_user=user1,
  1707. session=self.session,
  1708. config=self.app_config,
  1709. ).get_one(wid)
  1710. # show archived is used at the top end of the test
  1711. api = ContentApi(
  1712. current_user=user1,
  1713. session=self.session,
  1714. config=self.app_config,
  1715. show_deleted=True,
  1716. )
  1717. u2 = UserApi(
  1718. current_user=None,
  1719. session=self.session,
  1720. config=self.app_config,
  1721. ).get_one(u2id)
  1722. api2 = ContentApi(
  1723. current_user=u2,
  1724. session=self.session,
  1725. config=self.app_config,
  1726. show_deleted=True
  1727. )
  1728. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1729. eq_(u2id, updated.owner_id,
  1730. 'the owner id should be {} (found {})'.format(u2id,
  1731. updated.owner_id))
  1732. eq_(True, updated.is_deleted)
  1733. eq_(ActionDescription.DELETION, updated.revision_type)
  1734. ####
  1735. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1736. with new_revision(
  1737. tm=transaction.manager,
  1738. session=self.session,
  1739. content=updated2,
  1740. ):
  1741. api.undelete(updated2)
  1742. api.save(updated2)
  1743. eq_(False, updated2.is_deleted)
  1744. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1745. eq_(u1id, updated2.owner_id)
  1746. def test_search_in_label(self):
  1747. # HACK - D.A. - 2015-03-09
  1748. # This test is based on a bug which does NOT return results found
  1749. # at root of a workspace (eg a folder)
  1750. uapi = UserApi(
  1751. session=self.session,
  1752. config=self.app_config,
  1753. current_user=None,
  1754. )
  1755. group_api = GroupApi(
  1756. current_user=None,
  1757. session=self.session,
  1758. config=self.app_config,
  1759. )
  1760. groups = [group_api.get_one(Group.TIM_USER),
  1761. group_api.get_one(Group.TIM_MANAGER),
  1762. group_api.get_one(Group.TIM_ADMIN)]
  1763. user = uapi.create_user(email='this.is@user',
  1764. groups=groups, save_now=True)
  1765. workspace = WorkspaceApi(
  1766. current_user=user,
  1767. session=self.session,
  1768. config=self.app_config,
  1769. ).create_workspace(
  1770. 'test workspace',
  1771. save_now=True
  1772. )
  1773. api = ContentApi(
  1774. current_user=user,
  1775. session=self.session,
  1776. config=self.app_config,
  1777. )
  1778. a = api.create(ContentType.Folder, workspace, None,
  1779. 'this is randomized folder', True)
  1780. p = api.create(ContentType.Page, workspace, a,
  1781. 'this is randomized label content', True)
  1782. with new_revision(
  1783. session=self.session,
  1784. tm=transaction.manager,
  1785. content=p,
  1786. ):
  1787. p.description = 'This is some amazing test'
  1788. api.save(p)
  1789. original_id = p.content_id
  1790. res = api.search(['randomized'])
  1791. eq_(1, len(res.all()))
  1792. item = res.all()[0]
  1793. eq_(original_id, item.content_id)
  1794. def test_search_in_description(self):
  1795. # HACK - D.A. - 2015-03-09
  1796. # This test is based on a bug which does NOT return results found
  1797. # at root of a workspace (eg a folder)
  1798. uapi = UserApi(
  1799. session=self.session,
  1800. config=self.app_config,
  1801. current_user=None,
  1802. )
  1803. group_api = GroupApi(
  1804. current_user=None,
  1805. session=self.session,
  1806. config=self.app_config,
  1807. )
  1808. groups = [group_api.get_one(Group.TIM_USER),
  1809. group_api.get_one(Group.TIM_MANAGER),
  1810. group_api.get_one(Group.TIM_ADMIN)]
  1811. user = uapi.create_user(email='this.is@user',
  1812. groups=groups, save_now=True)
  1813. workspace = WorkspaceApi(
  1814. current_user=user,
  1815. session=self.session,
  1816. config=self.app_config,
  1817. ).create_workspace(
  1818. 'test workspace',
  1819. save_now=True,
  1820. )
  1821. api = ContentApi(
  1822. current_user=user,
  1823. session=self.session,
  1824. config=self.app_config,
  1825. )
  1826. a = api.create(ContentType.Folder, workspace, None,
  1827. 'this is randomized folder', True)
  1828. p = api.create(ContentType.Page, workspace, a,
  1829. 'this is dummy label content', True)
  1830. with new_revision(
  1831. tm=transaction.manager,
  1832. session=self.session,
  1833. content=p,
  1834. ):
  1835. p.description = 'This is some amazing test'
  1836. api.save(p)
  1837. original_id = p.content_id
  1838. res = api.search(['dummy'])
  1839. eq_(1, len(res.all()))
  1840. item = res.all()[0]
  1841. eq_(original_id, item.content_id)
  1842. def test_search_in_label_or_description(self):
  1843. # HACK - D.A. - 2015-03-09
  1844. # This test is based on a bug which does NOT return results found
  1845. # at root of a workspace (eg a folder)
  1846. uapi = UserApi(
  1847. session=self.session,
  1848. config=self.app_config,
  1849. current_user=None,
  1850. )
  1851. group_api = GroupApi(
  1852. current_user=None,
  1853. session=self.session,
  1854. config=self.app_config,
  1855. )
  1856. groups = [group_api.get_one(Group.TIM_USER),
  1857. group_api.get_one(Group.TIM_MANAGER),
  1858. group_api.get_one(Group.TIM_ADMIN)]
  1859. user = uapi.create_user(email='this.is@user',
  1860. groups=groups, save_now=True)
  1861. workspace = WorkspaceApi(
  1862. current_user=user,
  1863. session=self.session,
  1864. config=self.app_config,
  1865. ).create_workspace('test workspace', save_now=True)
  1866. api = ContentApi(
  1867. current_user=user,
  1868. session=self.session,
  1869. config=self.app_config,
  1870. )
  1871. a = api.create(ContentType.Folder, workspace, None,
  1872. 'this is randomized folder', True)
  1873. p1 = api.create(ContentType.Page, workspace, a,
  1874. 'this is dummy label content', True)
  1875. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1876. with new_revision(
  1877. session=self.session,
  1878. tm=transaction.manager,
  1879. content=p1,
  1880. ):
  1881. p1.description = 'This is some amazing test'
  1882. with new_revision(
  1883. session=self.session,
  1884. tm=transaction.manager,
  1885. content=p2,
  1886. ):
  1887. p2.description = 'What\'s up ?'
  1888. api.save(p1)
  1889. api.save(p2)
  1890. id1 = p1.content_id
  1891. id2 = p2.content_id
  1892. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1893. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1894. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1895. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1896. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1897. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
  1898. res = api.search(['dummy', 'jon'])
  1899. eq_(2, len(res.all()))
  1900. eq_(True, id1 in [o.content_id for o in res.all()])
  1901. eq_(True, id2 in [o.content_id for o in res.all()])
  1902. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1903. admin = self.session.query(User)\
  1904. .filter(User.email == 'admin@admin.admin').one()
  1905. workspace = self._create_workspace_and_test(
  1906. 'workspace_1',
  1907. admin
  1908. )
  1909. folder_1 = self._create_content_and_test(
  1910. 'folder_1',
  1911. workspace=workspace,
  1912. type=ContentType.Folder
  1913. )
  1914. folder_2 = self._create_content_and_test(
  1915. 'folder_2',
  1916. workspace=workspace,
  1917. type=ContentType.Folder
  1918. )
  1919. page_1 = self._create_content_and_test(
  1920. 'foo', workspace=workspace,
  1921. type=ContentType.Page,
  1922. parent=folder_1
  1923. )
  1924. page_2 = self._create_content_and_test(
  1925. 'bar',
  1926. workspace=workspace,
  1927. type=ContentType.Page,
  1928. parent=folder_2
  1929. )
  1930. api = ContentApi(
  1931. current_user=admin,
  1932. session=self.session,
  1933. config=self.app_config,
  1934. )
  1935. foo_result = api.search(['foo']).all()
  1936. eq_(1, len(foo_result))
  1937. assert page_1 in foo_result
  1938. bar_result = api.search(['bar']).all()
  1939. eq_(1, len(bar_result))
  1940. assert page_2 in bar_result
  1941. with new_revision(
  1942. session=self.session,
  1943. tm=transaction.manager,
  1944. content=folder_1,
  1945. ):
  1946. api.delete(folder_1)
  1947. with new_revision(
  1948. session=self.session,
  1949. tm=transaction.manager,
  1950. content=folder_2,
  1951. ):
  1952. api.archive(folder_2)
  1953. # Actually ContentApi.search don't filter it
  1954. foo_result = api.search(['foo']).all()
  1955. eq_(1, len(foo_result))
  1956. assert page_1 in foo_result
  1957. bar_result = api.search(['bar']).all()
  1958. eq_(1, len(bar_result))
  1959. assert page_2 in bar_result
  1960. # ContentApi offer exclude_unavailable method to do it
  1961. foo_result = api.search(['foo']).all()
  1962. api.exclude_unavailable(foo_result)
  1963. eq_(0, len(foo_result))
  1964. bar_result = api.search(['bar']).all()
  1965. api.exclude_unavailable(bar_result)
  1966. eq_(0, len(bar_result))
  1967. class TestContentApiSecurity(DefaultTest):
  1968. fixtures = [FixtureTest, ]
  1969. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1970. admin = self.session.query(User)\
  1971. .filter(User.email == 'admin@admin.admin').one()
  1972. bob = self.session.query(User)\
  1973. .filter(User.email == 'bob@fsf.local').one()
  1974. bob_workspace = WorkspaceApi(
  1975. current_user=bob,
  1976. session=self.session,
  1977. config=self.app_config,
  1978. ).create_workspace(
  1979. 'bob_workspace',
  1980. save_now=True,
  1981. )
  1982. admin_workspace = WorkspaceApi(
  1983. current_user=admin,
  1984. session=self.session,
  1985. config=self.app_config,
  1986. ).create_workspace(
  1987. 'admin_workspace',
  1988. save_now=True,
  1989. )
  1990. bob_page = ContentApi(
  1991. current_user=bob,
  1992. session=self.session,
  1993. config=self.app_config,
  1994. ).create(
  1995. content_type=ContentType.Page,
  1996. workspace=bob_workspace,
  1997. label='bob_page',
  1998. do_save=True,
  1999. )
  2000. admin_page = ContentApi(
  2001. current_user=admin,
  2002. session=self.session,
  2003. config=self.app_config,
  2004. ).create(
  2005. content_type=ContentType.Page,
  2006. workspace=admin_workspace,
  2007. label='admin_page',
  2008. do_save=True,
  2009. )
  2010. bob_viewable = ContentApi(
  2011. current_user=bob,
  2012. session=self.session,
  2013. config=self.app_config,
  2014. ).get_all()
  2015. eq_(1, len(bob_viewable), 'Bob should view only one content')
  2016. eq_(
  2017. 'bob_page',
  2018. bob_viewable[0].label,
  2019. 'Bob should not view "{0}" content'.format(
  2020. bob_viewable[0].label,
  2021. )
  2022. )