test_content_api.py 69KB


  1. # -*- coding: utf-8 -*-
  2. import transaction
  3. import pytest
  4. from tracim.config import CFG
  5. from tracim.lib.core.content import compare_content_for_sorting_by_type_and_name
  6. from tracim.lib.core.content import ContentApi
  7. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  8. from tracim.lib.core.group import GroupApi
  9. from tracim.lib.core.user import UserApi
  10. from tracim.exceptions import SameValueError
  11. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  12. from tracim.lib.core.workspace import RoleApi
  13. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  14. from tracim.lib.core.workspace import WorkspaceApi
  15. from tracim.models.revision_protection import new_revision
  16. from tracim.models.auth import User
  17. from tracim.models.auth import Group
  18. from tracim.models.data import ActionDescription
  19. from tracim.models.data import ContentRevisionRO
  20. from tracim.models.data import Workspace
  21. from tracim.models.data import Content
  22. from tracim.models.data import ContentType
  23. from tracim.models.data import UserRoleInWorkspace
  24. from tracim.fixtures.users_and_groups import Test as FixtureTest
  25. from tracim.tests import DefaultTest
  26. from tracim.tests import eq_
  27. class TestContentApi(DefaultTest):
  28. def test_compare_content_for_sorting_by_type(self):
  29. c1 = Content()
  30. c1.label = ''
  31. c1.type = 'file'
  32. c2 = Content()
  33. c2.label = ''
  34. c2.type = 'folder'
  35. c11 = c1
  36. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  37. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  38. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  39. def test_compare_content_for_sorting_by_label(self):
  40. c1 = Content()
  41. c1.label = 'bbb'
  42. c1.type = 'file'
  43. c2 = Content()
  44. c2.label = 'aaa'
  45. c2.type = 'file'
  46. c11 = c1
  47. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  48. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  49. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  50. def test_sort_by_label_or_filename(self):
  51. c1 = Content()
  52. c1.label = 'ABCD'
  53. c1.type = 'file'
  54. c2 = Content()
  55. c2.label = ''
  56. c2.type = 'file'
  57. c2.file_name = 'AABC'
  58. c3 = Content()
  59. c3.label = 'BCDE'
  60. c3.type = 'file'
  61. items = [c1, c2, c3]
  62. sorteds = ContentApi.sort_content(items)
  63. eq_(sorteds[0], c2)
  64. eq_(sorteds[1], c1)
  65. eq_(sorteds[2], c3)
  66. def test_sort_by_content_type(self):
  67. c1 = Content()
  68. c1.label = 'AAAA'
  69. c1.type = 'file'
  70. c2 = Content()
  71. c2.label = 'BBBB'
  72. c2.type = 'folder'
  73. items = [c1, c2]
  74. sorteds = ContentApi.sort_content(items)
  75. eq_(sorteds[0], c2,
  76. 'value is {} instead of {}'.format(sorteds[0].content_id,
  77. c2.content_id))
  78. eq_(sorteds[1], c1,
  79. 'value is {} instead of {}'.format(sorteds[1].content_id,
  80. c1.content_id))
  81. def test_delete(self):
  82. uapi = UserApi(
  83. session=self.session,
  84. config=self.app_config,
  85. current_user=None,
  86. )
  87. group_api = GroupApi(
  88. current_user=None,
  89. session=self.session,
  90. config=self.app_config,
  91. )
  92. groups = [group_api.get_one(Group.TIM_USER),
  93. group_api.get_one(Group.TIM_MANAGER),
  94. group_api.get_one(Group.TIM_ADMIN)]
  95. user = uapi.create_minimal_user(email='this.is@user',
  96. groups=groups, save_now=True)
  97. workspace = WorkspaceApi(
  98. current_user=user,
  99. session=self.session,
  100. config=self.app_config,
  101. ).create_workspace('test workspace', save_now=True)
  102. api = ContentApi(
  103. current_user=user,
  104. session=self.session,
  105. config=self.app_config,
  106. )
  107. item = api.create(
  108. content_type=ContentType.Folder,
  109. workspace=workspace,
  110. parent=None,
  111. label='not_deleted',
  112. do_save=True
  113. )
  114. item2 = api.create(
  115. content_type=ContentType.Folder,
  116. workspace=workspace,
  117. parent=None,
  118. label='to_delete',
  119. do_save=True
  120. )
  121. uid = user.user_id
  122. wid = workspace.workspace_id
  123. transaction.commit()
  124. # Refresh instances after commit
  125. user = uapi.get_one(uid)
  126. workspace_api = WorkspaceApi(
  127. current_user=user,
  128. session=self.session,
  129. config=self.app_config
  130. )
  131. workspace = workspace_api.get_one(wid)
  132. api = ContentApi(
  133. current_user=user,
  134. session=self.session,
  135. config=self.app_config,
  136. )
  137. items = api.get_all(None, ContentType.Any, workspace)
  138. eq_(2, len(items))
  139. items = api.get_all(None, ContentType.Any, workspace)
  140. with new_revision(
  141. session=self.session,
  142. tm=transaction.manager,
  143. content=items[0]
  144. ):
  145. api.delete(items[0])
  146. transaction.commit()
  147. # Refresh instances after commit
  148. user = uapi.get_one(uid)
  149. workspace_api = WorkspaceApi(
  150. current_user=user,
  151. session=self.session,
  152. config=self.app_config
  153. )
  154. workspace = workspace_api.get_one(wid)
  155. api = ContentApi(
  156. current_user=user,
  157. session=self.session,
  158. config=self.app_config,
  159. )
  160. items = api.get_all(None, ContentType.Any, workspace)
  161. eq_(1, len(items))
  162. transaction.commit()
  163. # Test that the item is still available if "show deleted" is activated
  164. # Refresh instances after commit
  165. user = uapi.get_one(uid)
  166. workspace_api = WorkspaceApi(
  167. current_user=user,
  168. session=self.session,
  169. config=self.app_config,
  170. )
  171. api = ContentApi(
  172. current_user=user,
  173. session=self.session,
  174. config=self.app_config,
  175. show_deleted=True,
  176. )
  177. items = api.get_all(None, ContentType.Any, workspace)
  178. eq_(2, len(items))
  179. def test_archive(self):
  180. uapi = UserApi(
  181. session=self.session,
  182. config=self.app_config,
  183. current_user=None,
  184. )
  185. group_api = GroupApi(
  186. current_user=None,
  187. session=self.session,
  188. config=self.app_config,
  189. )
  190. groups = [group_api.get_one(Group.TIM_USER),
  191. group_api.get_one(Group.TIM_MANAGER),
  192. group_api.get_one(Group.TIM_ADMIN)]
  193. user = uapi.create_minimal_user(
  194. email='this.is@user',
  195. groups=groups,
  196. save_now=True,
  197. )
  198. workspace_api = WorkspaceApi(
  199. current_user=user,
  200. session=self.session,
  201. config=self.app_config,
  202. )
  203. workspace = workspace_api.create_workspace(
  204. 'test workspace',
  205. save_now=True
  206. )
  207. api = ContentApi(
  208. current_user=user,
  209. session=self.session,
  210. config=self.app_config,
  211. )
  212. item = api.create(
  213. content_type=ContentType.Folder,
  214. workspace=workspace,
  215. parent=None,
  216. label='not_archived',
  217. do_save=True
  218. )
  219. item2 = api.create(
  220. content_type=ContentType.Folder,
  221. workspace=workspace,
  222. parent=None,
  223. label='to_archive',
  224. do_save=True
  225. )
  226. uid = user.user_id
  227. wid = workspace.workspace_id
  228. transaction.commit()
  229. # Refresh instances after commit
  230. user = uapi.get_one(uid)
  231. workspace_api = WorkspaceApi(
  232. current_user=user,
  233. session=self.session,
  234. config=self.app_config,
  235. )
  236. api = ContentApi(
  237. session=self.session,
  238. current_user=user,
  239. config=self.app_config,
  240. )
  241. items = api.get_all(None, ContentType.Any, workspace)
  242. eq_(2, len(items))
  243. items = api.get_all(None, ContentType.Any, workspace)
  244. with new_revision(
  245. session=self.session,
  246. tm=transaction.manager,
  247. content=items[0],
  248. ):
  249. api.archive(items[0])
  250. transaction.commit()
  251. # Refresh instances after commit
  252. user = uapi.get_one(uid)
  253. workspace_api = WorkspaceApi(
  254. current_user=user,
  255. session=self.session,
  256. config=self.app_config,
  257. )
  258. workspace = workspace_api.get_one(wid)
  259. api = ContentApi(
  260. current_user=user,
  261. session=self.session,
  262. config=self.app_config,
  263. )
  264. items = api.get_all(None, ContentType.Any, workspace)
  265. eq_(1, len(items))
  266. transaction.commit()
  267. # Refresh instances after commit
  268. user = uapi.get_one(uid)
  269. workspace_api = WorkspaceApi(
  270. current_user=user,
  271. session=self.session,
  272. config=self.app_config,
  273. )
  274. workspace = workspace_api.get_one(wid)
  275. api = ContentApi(
  276. current_user=user,
  277. session=self.session,
  278. config=self.app_config,
  279. )
  280. # Test that the item is still available if "show deleted" is activated
  281. api = ContentApi(
  282. current_user=None,
  283. session=self.session,
  284. config=self.app_config,
  285. show_archived=True,
  286. )
  287. items = api.get_all(None, ContentType.Any, workspace)
  288. eq_(2, len(items))
  289. def test_get_all_with_filter(self):
  290. uapi = UserApi(
  291. session=self.session,
  292. config=self.app_config,
  293. current_user=None,
  294. )
  295. group_api = GroupApi(
  296. current_user=None,
  297. session=self.session,
  298. config=self.app_config,
  299. )
  300. groups = [group_api.get_one(Group.TIM_USER),
  301. group_api.get_one(Group.TIM_MANAGER),
  302. group_api.get_one(Group.TIM_ADMIN)]
  303. user = uapi.create_minimal_user(
  304. email='this.is@user',
  305. groups=groups,
  306. save_now=True
  307. )
  308. workspace = WorkspaceApi(
  309. current_user=user,
  310. session=self.session,
  311. config=self.app_config,
  312. ).create_workspace(
  313. 'test workspace',
  314. save_now=True
  315. )
  316. api = ContentApi(
  317. current_user=user,
  318. session=self.session,
  319. config=self.app_config,
  320. )
  321. item = api.create(
  322. content_type=ContentType.Folder,
  323. workspace=workspace,
  324. parent=None,
  325. label='thefolder',
  326. do_save=True
  327. )
  328. item2 = api.create(
  329. content_type=ContentType.File,
  330. workspace=workspace,
  331. parent=None,
  332. label='thefile',
  333. do_save=True
  334. )
  335. uid = user.user_id
  336. wid = workspace.workspace_id
  337. transaction.commit()
  338. # Refresh instances after commit
  339. user = uapi.get_one(uid)
  340. workspace_api = WorkspaceApi(
  341. current_user=user,
  342. session=self.session,
  343. config=self.app_config,
  344. )
  345. workspace = workspace_api.get_one(wid)
  346. api = ContentApi(
  347. current_user=user,
  348. session=self.session,
  349. config=self.app_config,
  350. )
  351. items = api.get_all(None, ContentType.Any, workspace)
  352. eq_(2, len(items))
  353. items2 = api.get_all(None, ContentType.File, workspace)
  354. eq_(1, len(items2))
  355. eq_('thefile', items2[0].label)
  356. items3 = api.get_all(None, ContentType.Folder, workspace)
  357. eq_(1, len(items3))
  358. eq_('thefolder', items3[0].label)
  359. def test_get_all_with_parent_id(self):
  360. uapi = UserApi(
  361. session=self.session,
  362. config=self.app_config,
  363. current_user=None,
  364. )
  365. group_api = GroupApi(
  366. current_user=None,
  367. session=self.session,
  368. config=self.app_config
  369. )
  370. groups = [group_api.get_one(Group.TIM_USER),
  371. group_api.get_one(Group.TIM_MANAGER),
  372. group_api.get_one(Group.TIM_ADMIN)]
  373. user = uapi.create_minimal_user(email='this.is@user',
  374. groups=groups, save_now=True)
  375. workspace = WorkspaceApi(
  376. current_user=user,
  377. session=self.session,
  378. config=self.app_config
  379. ).create_workspace('test workspace', save_now=True)
  380. api = ContentApi(
  381. current_user=user,
  382. session=self.session,
  383. config=self.app_config,
  384. )
  385. item = api.create(
  386. ContentType.Folder,
  387. workspace,
  388. None,
  389. 'parent',
  390. do_save=True,
  391. )
  392. item2 = api.create(
  393. ContentType.File,
  394. workspace,
  395. item,
  396. 'file1',
  397. do_save=True,
  398. )
  399. item3 = api.create(
  400. ContentType.File,
  401. workspace,
  402. None,
  403. 'file2',
  404. do_save=True,
  405. )
  406. parent_id = item.content_id
  407. child_id = item2.content_id
  408. uid = user.user_id
  409. wid = workspace.workspace_id
  410. transaction.commit()
  411. # Refresh instances after commit
  412. user = uapi.get_one(uid)
  413. workspace_api = WorkspaceApi(
  414. current_user=user,
  415. session=self.session,
  416. config=self.app_config,
  417. )
  418. workspace = workspace_api.get_one(wid)
  419. api = ContentApi(
  420. current_user=user,
  421. session=self.session,
  422. config=self.app_config,
  423. )
  424. items = api.get_all(None, ContentType.Any, workspace)
  425. eq_(3, len(items))
  426. items2 = api.get_all(parent_id, ContentType.File, workspace)
  427. eq_(1, len(items2))
  428. eq_(child_id, items2[0].content_id)
  429. def test_set_status_unknown_status(self):
  430. uapi = UserApi(
  431. session=self.session,
  432. config=self.app_config,
  433. current_user=None,
  434. )
  435. group_api = GroupApi(
  436. current_user=None,
  437. session=self.session,
  438. config=self.app_config,
  439. )
  440. groups = [group_api.get_one(Group.TIM_USER),
  441. group_api.get_one(Group.TIM_MANAGER),
  442. group_api.get_one(Group.TIM_ADMIN)]
  443. user = uapi.create_minimal_user(email='this.is@user',
  444. groups=groups, save_now=True)
  445. workspace = WorkspaceApi(
  446. current_user=user,
  447. session=self.session,
  448. config=self.app_config,
  449. ).create_workspace(
  450. 'test workspace',
  451. save_now=True
  452. )
  453. api = ContentApi(
  454. current_user=user,
  455. session=self.session,
  456. config=self.app_config,
  457. )
  458. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  459. with new_revision(
  460. session=self.session,
  461. tm=transaction.manager,
  462. content=c,
  463. ):
  464. with pytest.raises(ValueError):
  465. api.set_status(c, 'unknown-status')
  466. def test_set_status_ok(self):
  467. uapi = UserApi(
  468. session=self.session,
  469. config=self.app_config,
  470. current_user=None,
  471. )
  472. group_api = GroupApi(
  473. current_user=None,
  474. session=self.session,
  475. config=self.app_config,
  476. )
  477. groups = [group_api.get_one(Group.TIM_USER),
  478. group_api.get_one(Group.TIM_MANAGER),
  479. group_api.get_one(Group.TIM_ADMIN)]
  480. user = uapi.create_minimal_user(email='this.is@user',
  481. groups=groups, save_now=True)
  482. workspace = WorkspaceApi(
  483. current_user=user,
  484. session=self.session,
  485. config=self.app_config,
  486. ).create_workspace(
  487. 'test workspace',
  488. save_now=True
  489. )
  490. api = ContentApi(
  491. current_user=user,
  492. session=self.session,
  493. config=self.app_config,
  494. )
  495. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  496. with new_revision(
  497. session=self.session,
  498. tm=transaction.manager,
  499. content=c,
  500. ):
  501. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  502. 'closed-deprecated']:
  503. api.set_status(c, new_status)
  504. eq_(new_status, c.status)
  505. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  506. def test_create_comment_ok(self):
  507. uapi = UserApi(
  508. session=self.session,
  509. config=self.app_config,
  510. current_user=None,
  511. )
  512. group_api = GroupApi(
  513. current_user=None,
  514. session=self.session,
  515. config=self.config,
  516. )
  517. groups = [group_api.get_one(Group.TIM_USER),
  518. group_api.get_one(Group.TIM_MANAGER),
  519. group_api.get_one(Group.TIM_ADMIN)]
  520. user = uapi.create_minimal_user(email='this.is@user',
  521. groups=groups, save_now=True)
  522. workspace = WorkspaceApi(
  523. current_user=user,
  524. session=self.session,
  525. config=self.app_config,
  526. ).create_workspace(
  527. 'test workspace',
  528. save_now=True
  529. )
  530. api = ContentApi(
  531. current_user=user,
  532. session=self.session,
  533. config=self.app_config,
  534. )
  535. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  536. c = api.create_comment(workspace, p, 'this is the comment', True)
  537. eq_(Content, c.__class__)
  538. eq_(p.content_id, c.parent_id)
  539. eq_(user, c.owner)
  540. eq_(workspace, c.workspace)
  541. eq_(ContentType.Comment, c.type)
  542. eq_('this is the comment', c.description)
  543. eq_('', c.label)
  544. eq_(ActionDescription.COMMENT, c.revision_type)
  545. def test_unit_copy_file_different_label_different_parent_ok(self):
  546. uapi = UserApi(
  547. session=self.session,
  548. config=self.app_config,
  549. current_user=None,
  550. )
  551. group_api = GroupApi(
  552. current_user=None,
  553. session=self.session,
  554. config=self.app_config
  555. )
  556. groups = [group_api.get_one(Group.TIM_USER),
  557. group_api.get_one(Group.TIM_MANAGER),
  558. group_api.get_one(Group.TIM_ADMIN)]
  559. user = uapi.create_minimal_user(
  560. email='user1@user',
  561. groups=groups,
  562. save_now=True
  563. )
  564. user2 = uapi.create_minimal_user(
  565. email='user2@user',
  566. groups=groups,
  567. save_now=True
  568. )
  569. workspace = WorkspaceApi(
  570. current_user=user,
  571. session=self.session,
  572. config=self.app_config,
  573. ).create_workspace(
  574. 'test workspace',
  575. save_now=True
  576. )
  577. RoleApi(
  578. current_user=user,
  579. session=self.session,
  580. config=self.app_config,
  581. ).create_one(
  582. user2,
  583. workspace,
  584. UserRoleInWorkspace.WORKSPACE_MANAGER,
  585. with_notif=False
  586. )
  587. api = ContentApi(
  588. current_user=user,
  589. session=self.session,
  590. config=self.app_config,
  591. )
  592. foldera = api.create(
  593. ContentType.Folder,
  594. workspace,
  595. None,
  596. 'folder a',
  597. True
  598. )
  599. with self.session.no_autoflush:
  600. text_file = api.create(
  601. content_type=ContentType.File,
  602. workspace=workspace,
  603. parent=foldera,
  604. label='test_file',
  605. do_save=False,
  606. )
  607. api.update_file_data(
  608. text_file,
  609. 'test_file',
  610. 'text/plain',
  611. b'test_content'
  612. )
  613. api.save(text_file, ActionDescription.CREATION)
  614. api2 = ContentApi(
  615. current_user=user2,
  616. session=self.session,
  617. config=self.app_config,
  618. )
  619. workspace2 = WorkspaceApi(
  620. current_user=user2,
  621. session=self.session,
  622. config=self.app_config,
  623. ).create_workspace(
  624. 'test workspace2',
  625. save_now=True
  626. )
  627. folderb = api2.create(
  628. ContentType.Folder,
  629. workspace2,
  630. None,
  631. 'folder b',
  632. True
  633. )
  634. api2.copy(
  635. item=text_file,
  636. new_parent=folderb,
  637. new_label='test_file_copy'
  638. )
  639. transaction.commit()
  640. text_file_copy = api2.get_one_by_label_and_parent(
  641. 'test_file_copy',
  642. folderb,
  643. )
  644. assert text_file != text_file_copy
  645. assert text_file_copy.content_id != text_file.content_id
  646. assert text_file_copy.workspace_id == workspace2.workspace_id
  647. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  648. assert text_file_copy.depot_file.path != text_file.depot_file.path
  649. assert text_file_copy.label == 'test_file_copy'
  650. assert text_file_copy.type == text_file.type
  651. assert text_file_copy.parent.content_id == folderb.content_id
  652. assert text_file_copy.owner.user_id == user.user_id
  653. assert text_file_copy.description == text_file.description
  654. assert text_file_copy.file_extension == text_file.file_extension
  655. assert text_file_copy.file_mimetype == text_file.file_mimetype
  656. assert text_file_copy.revision_type == ActionDescription.COPY
  657. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  658. def test_unit_copy_file__same_label_different_parent_ok(self):
  659. uapi = UserApi(
  660. session=self.session,
  661. config=self.app_config,
  662. current_user=None,
  663. )
  664. group_api = GroupApi(
  665. current_user=None,
  666. session=self.session,
  667. config=self.app_config,
  668. )
  669. groups = [group_api.get_one(Group.TIM_USER),
  670. group_api.get_one(Group.TIM_MANAGER),
  671. group_api.get_one(Group.TIM_ADMIN)]
  672. user = uapi.create_minimal_user(
  673. email='user1@user',
  674. groups=groups,
  675. save_now=True
  676. )
  677. user2 = uapi.create_minimal_user(
  678. email='user2@user',
  679. groups=groups,
  680. save_now=True
  681. )
  682. workspace = WorkspaceApi(
  683. current_user=user,
  684. session=self.session,
  685. config=self.app_config,
  686. ).create_workspace(
  687. 'test workspace',
  688. save_now=True
  689. )
  690. RoleApi(
  691. current_user=user,
  692. session=self.session,
  693. config=self.app_config,
  694. ).create_one(
  695. user2,
  696. workspace,
  697. UserRoleInWorkspace.WORKSPACE_MANAGER,
  698. with_notif=False
  699. )
  700. api = ContentApi(
  701. current_user=user,
  702. session=self.session,
  703. config=self.app_config,
  704. )
  705. foldera = api.create(
  706. ContentType.Folder,
  707. workspace,
  708. None,
  709. 'folder a',
  710. True
  711. )
  712. with self.session.no_autoflush:
  713. text_file = api.create(
  714. content_type=ContentType.File,
  715. workspace=workspace,
  716. parent=foldera,
  717. label='test_file',
  718. do_save=False,
  719. )
  720. api.update_file_data(
  721. text_file,
  722. 'test_file',
  723. 'text/plain',
  724. b'test_content'
  725. )
  726. api.save(text_file, ActionDescription.CREATION)
  727. api2 = ContentApi(
  728. current_user=user2,
  729. session=self.session,
  730. config=self.app_config,
  731. )
  732. workspace2 = WorkspaceApi(
  733. current_user=user2,
  734. session=self.session,
  735. config=self.app_config,
  736. ).create_workspace(
  737. 'test workspace2',
  738. save_now=True
  739. )
  740. folderb = api2.create(
  741. ContentType.Folder,
  742. workspace2,
  743. None,
  744. 'folder b',
  745. True
  746. )
  747. api2.copy(
  748. item=text_file,
  749. new_parent=folderb,
  750. )
  751. transaction.commit()
  752. text_file_copy = api2.get_one_by_label_and_parent(
  753. 'test_file',
  754. folderb,
  755. )
  756. assert text_file != text_file_copy
  757. assert text_file_copy.content_id != text_file.content_id
  758. assert text_file_copy.workspace_id == workspace2.workspace_id
  759. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  760. assert text_file_copy.depot_file.path != text_file.depot_file.path
  761. assert text_file_copy.label == text_file.label
  762. assert text_file_copy.type == text_file.type
  763. assert text_file_copy.parent.content_id == folderb.content_id
  764. assert text_file_copy.owner.user_id == user.user_id
  765. assert text_file_copy.description == text_file.description
  766. assert text_file_copy.file_extension == text_file.file_extension
  767. assert text_file_copy.file_mimetype == text_file.file_mimetype
  768. assert text_file_copy.revision_type == ActionDescription.COPY
  769. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  770. def test_unit_copy_file_different_label_same_parent_ok(self):
  771. uapi = UserApi(
  772. session=self.session,
  773. config=self.app_config,
  774. current_user=None,
  775. )
  776. group_api = GroupApi(
  777. current_user=None,
  778. session=self.session,
  779. config=self.app_config,
  780. )
  781. groups = [group_api.get_one(Group.TIM_USER),
  782. group_api.get_one(Group.TIM_MANAGER),
  783. group_api.get_one(Group.TIM_ADMIN)]
  784. user = uapi.create_minimal_user(
  785. email='user1@user',
  786. groups=groups,
  787. save_now=True,
  788. )
  789. user2 = uapi.create_minimal_user(
  790. email='user2@user',
  791. groups=groups,
  792. save_now=True
  793. )
  794. workspace = WorkspaceApi(
  795. current_user=user,
  796. session=self.session,
  797. config=self.app_config,
  798. ).create_workspace(
  799. 'test workspace',
  800. save_now=True
  801. )
  802. RoleApi(
  803. current_user=user,
  804. session=self.session,
  805. config=self.app_config,
  806. ).create_one(
  807. user2, workspace,
  808. UserRoleInWorkspace.WORKSPACE_MANAGER,
  809. with_notif=False
  810. )
  811. api = ContentApi(
  812. current_user=user,
  813. session=self.session,
  814. config=self.app_config,
  815. )
  816. foldera = api.create(
  817. ContentType.Folder,
  818. workspace,
  819. None,
  820. 'folder a',
  821. True
  822. )
  823. with self.session.no_autoflush:
  824. text_file = api.create(
  825. content_type=ContentType.File,
  826. workspace=workspace,
  827. parent=foldera,
  828. label='test_file',
  829. do_save=False,
  830. )
  831. api.update_file_data(
  832. text_file,
  833. 'test_file',
  834. 'text/plain',
  835. b'test_content'
  836. )
  837. api.save(
  838. text_file,
  839. ActionDescription.CREATION
  840. )
  841. api2 = ContentApi(
  842. current_user=user2,
  843. session=self.session,
  844. config=self.app_config,
  845. )
  846. api2.copy(
  847. item=text_file,
  848. new_label='test_file_copy'
  849. )
  850. transaction.commit()
  851. text_file_copy = api2.get_one_by_label_and_parent(
  852. 'test_file_copy',
  853. foldera,
  854. )
  855. assert text_file != text_file_copy
  856. assert text_file_copy.content_id != text_file.content_id
  857. assert text_file_copy.workspace_id == workspace.workspace_id
  858. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  859. assert text_file_copy.depot_file.path != text_file.depot_file.path
  860. assert text_file_copy.label == 'test_file_copy'
  861. assert text_file_copy.type == text_file.type
  862. assert text_file_copy.parent.content_id == foldera.content_id
  863. assert text_file_copy.owner.user_id == user.user_id
  864. assert text_file_copy.description == text_file.description
  865. assert text_file_copy.file_extension == text_file.file_extension
  866. assert text_file_copy.file_mimetype == text_file.file_mimetype
  867. assert text_file_copy.revision_type == ActionDescription.COPY
  868. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  869. def test_mark_read__workspace(self):
  870. uapi = UserApi(
  871. session=self.session,
  872. config=self.app_config,
  873. current_user=None,
  874. )
  875. group_api = GroupApi(
  876. current_user=None,
  877. session=self.session,
  878. config=self.app_config,
  879. )
  880. groups = [group_api.get_one(Group.TIM_USER),
  881. group_api.get_one(Group.TIM_MANAGER),
  882. group_api.get_one(Group.TIM_ADMIN)]
  883. user_a = uapi.create_minimal_user(email='this.is@user',
  884. groups=groups, save_now=True)
  885. user_b = uapi.create_minimal_user(email='this.is@another.user',
  886. groups=groups, save_now=True)
  887. wapi = WorkspaceApi(
  888. current_user=user_a,
  889. session=self.session,
  890. config=self.app_config,
  891. )
  892. workspace1 = wapi.create_workspace(
  893. 'test workspace n°1',
  894. save_now=True)
  895. workspace2 = wapi.create_workspace(
  896. 'test workspace n°2',
  897. save_now=True)
  898. role_api1 = RoleApi(
  899. current_user=user_a,
  900. session=self.session,
  901. config=self.app_config,
  902. )
  903. role_api1.create_one(
  904. user_b,
  905. workspace1,
  906. UserRoleInWorkspace.READER,
  907. False
  908. )
  909. role_api2 = RoleApi(
  910. current_user=user_b,
  911. session=self.session,
  912. config=self.app_config,
  913. )
  914. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  915. False)
  916. cont_api_a = ContentApi(
  917. current_user=user_a,
  918. session=self.session,
  919. config=self.app_config,
  920. )
  921. cont_api_b = ContentApi(
  922. current_user=user_b,
  923. session=self.session,
  924. config=self.app_config,
  925. )
  926. # Creates page_1 & page_2 in workspace 1
  927. # and page_3 & page_4 in workspace 2
  928. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  929. 'this is a page', do_save=True)
  930. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  931. 'this is page1', do_save=True)
  932. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  933. 'this is page2', do_save=True)
  934. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  935. 'this is page3', do_save=True)
  936. for rev in page_1.revisions:
  937. eq_(user_b not in rev.read_by.keys(), True)
  938. for rev in page_2.revisions:
  939. eq_(user_b not in rev.read_by.keys(), True)
  940. for rev in page_3.revisions:
  941. eq_(user_b not in rev.read_by.keys(), True)
  942. for rev in page_4.revisions:
  943. eq_(user_b not in rev.read_by.keys(), True)
  944. # Set as read the workspace n°1
  945. cont_api_b.mark_read__workspace(workspace=workspace1)
  946. for rev in page_1.revisions:
  947. eq_(user_b in rev.read_by.keys(), True)
  948. for rev in page_2.revisions:
  949. eq_(user_b in rev.read_by.keys(), True)
  950. for rev in page_3.revisions:
  951. eq_(user_b not in rev.read_by.keys(), True)
  952. for rev in page_4.revisions:
  953. eq_(user_b not in rev.read_by.keys(), True)
  954. # Set as read the workspace n°2
  955. cont_api_b.mark_read__workspace(workspace=workspace2)
  956. for rev in page_1.revisions:
  957. eq_(user_b in rev.read_by.keys(), True)
  958. for rev in page_2.revisions:
  959. eq_(user_b in rev.read_by.keys(), True)
  960. for rev in page_3.revisions:
  961. eq_(user_b in rev.read_by.keys(), True)
  962. for rev in page_4.revisions:
  963. eq_(user_b in rev.read_by.keys(), True)
  964. def test_mark_read(self):
  965. uapi = UserApi(
  966. session=self.session,
  967. config=self.app_config,
  968. current_user=None,
  969. )
  970. group_api = GroupApi(
  971. current_user=None,
  972. session=self.session,
  973. config = self.app_config,
  974. )
  975. groups = [group_api.get_one(Group.TIM_USER),
  976. group_api.get_one(Group.TIM_MANAGER),
  977. group_api.get_one(Group.TIM_ADMIN)]
  978. user_a = uapi.create_minimal_user(
  979. email='this.is@user',
  980. groups=groups,
  981. save_now=True
  982. )
  983. user_b = uapi.create_minimal_user(
  984. email='this.is@another.user',
  985. groups=groups,
  986. save_now=True
  987. )
  988. wapi = WorkspaceApi(
  989. current_user=user_a,
  990. session=self.session,
  991. config=self.app_config,
  992. )
  993. workspace_api = WorkspaceApi(
  994. current_user=user_a,
  995. session=self.session,
  996. config=self.app_config,
  997. )
  998. workspace = wapi.create_workspace(
  999. 'test workspace',
  1000. save_now=True)
  1001. role_api = RoleApi(
  1002. current_user=user_a,
  1003. session=self.session,
  1004. config=self.app_config,
  1005. )
  1006. role_api.create_one(
  1007. user_b,
  1008. workspace,
  1009. UserRoleInWorkspace.READER,
  1010. False
  1011. )
  1012. cont_api_a = ContentApi(
  1013. current_user=user_a,
  1014. session=self.session,
  1015. config=self.app_config,
  1016. )
  1017. cont_api_b = ContentApi(
  1018. current_user=user_b,
  1019. session=self.session,
  1020. config=self.app_config,
  1021. )
  1022. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  1023. 'this is a page', do_save=True)
  1024. for rev in page_1.revisions:
  1025. eq_(user_b not in rev.read_by.keys(), True)
  1026. cont_api_b.mark_read(page_1)
  1027. for rev in page_1.revisions:
  1028. eq_(user_b in rev.read_by.keys(), True)
  1029. def test_mark_read__all(self):
  1030. uapi = UserApi(
  1031. session=self.session,
  1032. config=self.app_config,
  1033. current_user=None,
  1034. )
  1035. group_api = GroupApi(
  1036. current_user=None,
  1037. session=self.session,
  1038. config=self.app_config,
  1039. )
  1040. groups = [group_api.get_one(Group.TIM_USER),
  1041. group_api.get_one(Group.TIM_MANAGER),
  1042. group_api.get_one(Group.TIM_ADMIN)]
  1043. user_a = uapi.create_minimal_user(
  1044. email='this.is@user',
  1045. groups=groups,
  1046. save_now=True
  1047. )
  1048. user_b = uapi.create_minimal_user(
  1049. email='this.is@another.user',
  1050. groups=groups,
  1051. save_now=True
  1052. )
  1053. wapi = WorkspaceApi(
  1054. current_user=user_a,
  1055. session=self.session,
  1056. config=self.app_config,
  1057. )
  1058. workspace = wapi.create_workspace(
  1059. 'test workspace',
  1060. save_now=True)
  1061. role_api = RoleApi(
  1062. current_user=user_a,
  1063. session=self.session,
  1064. config=self.app_config,
  1065. )
  1066. role_api.create_one(
  1067. user_b,
  1068. workspace,
  1069. UserRoleInWorkspace.READER,
  1070. False
  1071. )
  1072. cont_api_a = ContentApi(
  1073. current_user=user_a,
  1074. session=self.session,
  1075. config=self.app_config,
  1076. )
  1077. cont_api_b = ContentApi(
  1078. current_user=user_b,
  1079. session=self.session,
  1080. config=self.app_config,
  1081. )
  1082. page_2 = cont_api_a.create(
  1083. ContentType.Page,
  1084. workspace,
  1085. None,
  1086. 'this is page1',
  1087. do_save=True
  1088. )
  1089. page_3 = cont_api_a.create(
  1090. ContentType.Thread,
  1091. workspace,
  1092. None,
  1093. 'this is page2',
  1094. do_save=True
  1095. )
  1096. page_4 = cont_api_a.create(
  1097. ContentType.File,
  1098. workspace,
  1099. None,
  1100. 'this is page3',
  1101. do_save=True
  1102. )
  1103. for rev in page_2.revisions:
  1104. eq_(user_b not in rev.read_by.keys(), True)
  1105. for rev in page_3.revisions:
  1106. eq_(user_b not in rev.read_by.keys(), True)
  1107. for rev in page_4.revisions:
  1108. eq_(user_b not in rev.read_by.keys(), True)
  1109. self.session.refresh(page_2)
  1110. self.session.refresh(page_3)
  1111. self.session.refresh(page_4)
  1112. cont_api_b.mark_read__all()
  1113. for rev in page_2.revisions:
  1114. eq_(user_b in rev.read_by.keys(), True)
  1115. for rev in page_3.revisions:
  1116. eq_(user_b in rev.read_by.keys(), True)
  1117. for rev in page_4.revisions:
  1118. eq_(user_b in rev.read_by.keys(), True)
  1119. def test_update(self):
  1120. uapi = UserApi(
  1121. session=self.session,
  1122. config=self.app_config,
  1123. current_user=None,
  1124. )
  1125. group_api = GroupApi(
  1126. current_user=None,
  1127. session=self.session,
  1128. config=self.app_config,
  1129. )
  1130. groups = [group_api.get_one(Group.TIM_USER),
  1131. group_api.get_one(Group.TIM_MANAGER),
  1132. group_api.get_one(Group.TIM_ADMIN)]
  1133. user1 = uapi.create_minimal_user(
  1134. email='this.is@user',
  1135. groups=groups,
  1136. save_now=True
  1137. )
  1138. workspace_api = WorkspaceApi(
  1139. current_user=user1,
  1140. session=self.session,
  1141. config=self.app_config,
  1142. )
  1143. workspace = workspace_api.create_workspace(
  1144. 'test workspace',
  1145. save_now=True
  1146. )
  1147. wid = workspace.workspace_id
  1148. user2 = uapi.create_minimal_user('this.is@another.user')
  1149. uapi.save(user2)
  1150. RoleApi(
  1151. current_user=user1,
  1152. session=self.session,
  1153. config=self.app_config,
  1154. ).create_one(
  1155. user2,
  1156. workspace,
  1157. UserRoleInWorkspace.CONTENT_MANAGER,
  1158. with_notif=False,
  1159. flush=True
  1160. )
  1161. # Test starts here
  1162. api = ContentApi(
  1163. current_user=user1,
  1164. session=self.session,
  1165. config=self.app_config,
  1166. )
  1167. p = api.create(
  1168. content_type=ContentType.Page,
  1169. workspace=workspace,
  1170. parent=None,
  1171. label='this_is_a_page',
  1172. do_save=True
  1173. )
  1174. u1id = user1.user_id
  1175. u2id = user2.user_id
  1176. pcid = p.content_id
  1177. poid = p.owner_id
  1178. transaction.commit()
  1179. # Refresh instances after commit
  1180. user1 = uapi.get_one(u1id)
  1181. workspace = WorkspaceApi(
  1182. current_user=user1,
  1183. session=self.session,
  1184. config=self.app_config,
  1185. ).get_one(wid)
  1186. api = ContentApi(
  1187. current_user=user1,
  1188. session=self.session,
  1189. config=self.app_config,
  1190. )
  1191. content = api.get_one(pcid, ContentType.Any, workspace)
  1192. eq_(u1id, content.owner_id)
  1193. eq_(poid, content.owner_id)
  1194. u2 = UserApi(
  1195. session=self.session,
  1196. config=self.app_config,
  1197. current_user=None,
  1198. ).get_one(u2id)
  1199. api2 = ContentApi(
  1200. current_user=u2,
  1201. session=self.session,
  1202. config=self.app_config,
  1203. )
  1204. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1205. with new_revision(
  1206. session=self.session,
  1207. tm=transaction.manager,
  1208. content=content2,
  1209. ):
  1210. api2.update_content(
  1211. content2,
  1212. 'this is an updated page',
  1213. 'new content'
  1214. )
  1215. api2.save(content2)
  1216. transaction.commit()
  1217. # Refresh instances after commit
  1218. user1 = uapi.get_one(u1id)
  1219. workspace = WorkspaceApi(
  1220. current_user=user1,
  1221. session=self.session,
  1222. config=self.app_config,
  1223. ).get_one(wid)
  1224. api = ContentApi(
  1225. current_user=user1,
  1226. session=self.session,
  1227. config=self.app_config,
  1228. )
  1229. updated = api.get_one(pcid, ContentType.Any, workspace)
  1230. eq_(u2id, updated.owner_id,
  1231. 'the owner id should be {} (found {})'.format(u2id,
  1232. updated.owner_id))
  1233. eq_('this is an updated page', updated.label)
  1234. eq_('new content', updated.description)
  1235. eq_(ActionDescription.EDITION, updated.revision_type)
  1236. def test_update_no_change(self):
  1237. uapi = UserApi(
  1238. session=self.session,
  1239. config=self.app_config,
  1240. current_user=None,
  1241. )
  1242. group_api = GroupApi(
  1243. current_user=None,
  1244. session=self.session,
  1245. config = self.app_config,
  1246. )
  1247. groups = [group_api.get_one(Group.TIM_USER),
  1248. group_api.get_one(Group.TIM_MANAGER),
  1249. group_api.get_one(Group.TIM_ADMIN)]
  1250. user1 = uapi.create_minimal_user(
  1251. email='this.is@user',
  1252. groups=groups,
  1253. save_now=True,
  1254. )
  1255. workspace = WorkspaceApi(
  1256. current_user=user1,
  1257. session=self.session,
  1258. config=self.app_config,
  1259. ).create_workspace(
  1260. 'test workspace',
  1261. save_now=True
  1262. )
  1263. user2 = uapi.create_minimal_user('this.is@another.user')
  1264. uapi.save(user2)
  1265. RoleApi(
  1266. current_user=user1,
  1267. session=self.session,
  1268. config=self.app_config,
  1269. ).create_one(
  1270. user2,
  1271. workspace,
  1272. UserRoleInWorkspace.CONTENT_MANAGER,
  1273. with_notif=False,
  1274. flush=True
  1275. )
  1276. api = ContentApi(
  1277. current_user=user1,
  1278. session=self.session,
  1279. config=self.app_config,
  1280. )
  1281. with self.session.no_autoflush:
  1282. page = api.create(
  1283. content_type=ContentType.Page,
  1284. workspace=workspace,
  1285. label="same_content",
  1286. do_save=False
  1287. )
  1288. page.description = "Same_content_here"
  1289. api.save(page, ActionDescription.CREATION, do_notify=True)
  1290. transaction.commit()
  1291. api2 = ContentApi(
  1292. current_user=user2,
  1293. session=self.session,
  1294. config=self.app_config,
  1295. )
  1296. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1297. with new_revision(
  1298. session=self.session,
  1299. tm=transaction.manager,
  1300. content=content2,
  1301. ):
  1302. with pytest.raises(SameValueError):
  1303. api2.update_content(
  1304. item=content2,
  1305. new_label='same_content',
  1306. new_content='Same_content_here'
  1307. )
  1308. api2.save(content2)
  1309. transaction.commit()
  1310. def test_update_file_data(self):
  1311. uapi = UserApi(
  1312. session=self.session,
  1313. config=self.app_config,
  1314. current_user=None,
  1315. )
  1316. group_api = GroupApi(
  1317. current_user=None,
  1318. session=self.session,
  1319. config=self.app_config,
  1320. )
  1321. groups = [group_api.get_one(Group.TIM_USER),
  1322. group_api.get_one(Group.TIM_MANAGER),
  1323. group_api.get_one(Group.TIM_ADMIN)]
  1324. user1 = uapi.create_minimal_user(
  1325. email='this.is@user',
  1326. groups=groups,
  1327. save_now=True
  1328. )
  1329. workspace_api = WorkspaceApi(
  1330. current_user=user1,
  1331. session=self.session,
  1332. config=self.app_config,
  1333. )
  1334. workspace = workspace_api.create_workspace(
  1335. 'test workspace',
  1336. save_now=True
  1337. )
  1338. wid = workspace.workspace_id
  1339. user2 = uapi.create_minimal_user('this.is@another.user')
  1340. uapi.save(user2)
  1341. RoleApi(
  1342. current_user=user1,
  1343. session=self.session,
  1344. config=self.app_config,
  1345. ).create_one(
  1346. user2,
  1347. workspace,
  1348. UserRoleInWorkspace.CONTENT_MANAGER,
  1349. with_notif=True,
  1350. flush=True
  1351. )
  1352. # Test starts here
  1353. api = ContentApi(
  1354. current_user=user1,
  1355. session=self.session,
  1356. config=self.app_config,
  1357. )
  1358. p = api.create(
  1359. content_type=ContentType.File,
  1360. workspace=workspace,
  1361. parent=None,
  1362. label='this_is_a_page',
  1363. do_save=True
  1364. )
  1365. u1id = user1.user_id
  1366. u2id = user2.user_id
  1367. pcid = p.content_id
  1368. poid = p.owner_id
  1369. api.save(p)
  1370. transaction.commit()
  1371. # Refresh instances after commit
  1372. user1 = uapi.get_one(u1id)
  1373. workspace_api2 = WorkspaceApi(
  1374. current_user=user1,
  1375. session=self.session,
  1376. config=self.app_config,
  1377. )
  1378. workspace = workspace_api2.get_one(wid)
  1379. api = ContentApi(
  1380. current_user=user1,
  1381. session=self.session,
  1382. config=self.app_config,
  1383. )
  1384. content = api.get_one(pcid, ContentType.Any, workspace)
  1385. eq_(u1id, content.owner_id)
  1386. eq_(poid, content.owner_id)
  1387. u2 = UserApi(
  1388. current_user=None,
  1389. session=self.session,
  1390. config=self.app_config,
  1391. ).get_one(u2id)
  1392. api2 = ContentApi(
  1393. current_user=u2,
  1394. session=self.session,
  1395. config=self.app_config,
  1396. )
  1397. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1398. with new_revision(
  1399. session=self.session,
  1400. tm=transaction.manager,
  1401. content=content2,
  1402. ):
  1403. api2.update_file_data(
  1404. content2,
  1405. 'index.html',
  1406. 'text/html',
  1407. b'<html>hello world</html>'
  1408. )
  1409. api2.save(content2)
  1410. transaction.commit()
  1411. # Refresh instances after commit
  1412. user1 = uapi.get_one(u1id)
  1413. workspace = WorkspaceApi(
  1414. current_user=user1,
  1415. session=self.session,
  1416. config=self.app_config,
  1417. ).get_one(wid)
  1418. updated = api.get_one(pcid, ContentType.Any, workspace)
  1419. eq_(u2id, updated.owner_id,
  1420. 'the owner id should be {} (found {})'.format(u2id,
  1421. updated.owner_id))
  1422. eq_('this_is_a_page.html', updated.file_name)
  1423. eq_('text/html', updated.file_mimetype)
  1424. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1425. eq_(ActionDescription.REVISION, updated.revision_type)
  1426. def test_update_no_change(self):
  1427. uapi = UserApi(
  1428. session=self.session,
  1429. config=self.app_config,
  1430. current_user=None,
  1431. )
  1432. group_api = GroupApi(
  1433. current_user=None,
  1434. session=self.session,
  1435. config=self.app_config,
  1436. )
  1437. groups = [group_api.get_one(Group.TIM_USER),
  1438. group_api.get_one(Group.TIM_MANAGER),
  1439. group_api.get_one(Group.TIM_ADMIN)]
  1440. user1 = uapi.create_minimal_user(
  1441. email='this.is@user',
  1442. groups=groups,
  1443. save_now=True,
  1444. )
  1445. workspace_api = WorkspaceApi(
  1446. current_user=user1,
  1447. session=self.session,
  1448. config=self.app_config,
  1449. )
  1450. workspace = workspace_api.create_workspace(
  1451. 'test workspace',
  1452. save_now=True
  1453. )
  1454. user2 = uapi.create_minimal_user('this.is@another.user')
  1455. uapi.save(user2)
  1456. RoleApi(
  1457. current_user=user1,
  1458. session=self.session,
  1459. config=self.app_config,
  1460. ).create_one(
  1461. user2,
  1462. workspace,
  1463. UserRoleInWorkspace.CONTENT_MANAGER,
  1464. with_notif=False,
  1465. flush=True
  1466. )
  1467. api = ContentApi(
  1468. current_user=user1,
  1469. session=self.session,
  1470. config=self.app_config,
  1471. )
  1472. with self.session.no_autoflush:
  1473. page = api.create(
  1474. content_type=ContentType.Page,
  1475. workspace=workspace,
  1476. label="same_content",
  1477. do_save=False
  1478. )
  1479. api.update_file_data(
  1480. page,
  1481. 'index.html',
  1482. 'text/html',
  1483. b'<html>Same Content Here</html>'
  1484. )
  1485. api.save(page, ActionDescription.CREATION, do_notify=True)
  1486. transaction.commit()
  1487. api2 = ContentApi(
  1488. current_user=user2,
  1489. session=self.session,
  1490. config=self.app_config,
  1491. )
  1492. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1493. with new_revision(
  1494. session=self.session,
  1495. tm=transaction.manager,
  1496. content=content2,
  1497. ):
  1498. with pytest.raises(SameValueError):
  1499. api2.update_file_data(
  1500. page,
  1501. 'index.html',
  1502. 'text/html',
  1503. b'<html>Same Content Here</html>'
  1504. )
  1505. api2.save(content2)
  1506. transaction.commit()
  1507. def test_archive_unarchive(self):
  1508. uapi = UserApi(
  1509. session=self.session,
  1510. config=self.app_config,
  1511. current_user=None,
  1512. )
  1513. group_api = GroupApi(
  1514. current_user=None,
  1515. session=self.session,
  1516. config=self.app_config,
  1517. )
  1518. groups = [group_api.get_one(Group.TIM_USER),
  1519. group_api.get_one(Group.TIM_MANAGER),
  1520. group_api.get_one(Group.TIM_ADMIN)]
  1521. user1 = uapi.create_minimal_user(
  1522. email='this.is@user',
  1523. groups=groups,
  1524. save_now=True
  1525. )
  1526. u1id = user1.user_id
  1527. workspace_api = WorkspaceApi(
  1528. current_user=user1,
  1529. session=self.session,
  1530. config=self.app_config,
  1531. )
  1532. workspace = workspace_api.create_workspace(
  1533. 'test workspace',
  1534. save_now=True
  1535. )
  1536. wid = workspace.workspace_id
  1537. user2 = uapi.create_minimal_user('this.is@another.user')
  1538. uapi.save(user2)
  1539. RoleApi(
  1540. current_user=user1,
  1541. session=self.session,
  1542. config=self.app_config,
  1543. ).create_one(
  1544. user2,
  1545. workspace,
  1546. UserRoleInWorkspace.CONTENT_MANAGER,
  1547. with_notif=True,
  1548. flush=True
  1549. )
  1550. # show archived is used at the top end of the test
  1551. api = ContentApi(
  1552. current_user=user1,
  1553. session=self.session,
  1554. show_archived=True,
  1555. config=self.app_config,
  1556. )
  1557. p = api.create(
  1558. content_type=ContentType.File,
  1559. workspace=workspace,
  1560. parent=None,
  1561. label='this_is_a_page',
  1562. do_save=True
  1563. )
  1564. u1id = user1.user_id
  1565. u2id = user2.user_id
  1566. pcid = p.content_id
  1567. poid = p.owner_id
  1568. transaction.commit()
  1569. ####
  1570. # refresh after commit
  1571. user1 = UserApi(
  1572. current_user=None,
  1573. config=self.app_config,
  1574. session=self.session
  1575. ).get_one(u1id)
  1576. workspace = WorkspaceApi(
  1577. current_user=user1,
  1578. session=self.session,
  1579. config=self.app_config,
  1580. ).get_one(wid)
  1581. content = api.get_one(pcid, ContentType.Any, workspace)
  1582. eq_(u1id, content.owner_id)
  1583. eq_(poid, content.owner_id)
  1584. u2api = UserApi(
  1585. session=self.session,
  1586. config=self.app_config,
  1587. current_user=None,
  1588. )
  1589. u2 = u2api.get_one(u2id)
  1590. api2 = ContentApi(
  1591. current_user=u2,
  1592. session=self.session,
  1593. config=self.app_config,
  1594. show_archived=True,
  1595. )
  1596. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1597. with new_revision(
  1598. session=self.session,
  1599. tm=transaction.manager,
  1600. content=content2,
  1601. ):
  1602. api2.archive(content2)
  1603. api2.save(content2)
  1604. transaction.commit()
  1605. # refresh after commit
  1606. user1 = UserApi(
  1607. current_user=None,
  1608. session=self.session,
  1609. config=self.app_config,
  1610. ).get_one(u1id)
  1611. workspace = WorkspaceApi(
  1612. current_user=user1,
  1613. session=self.session,
  1614. config=self.app_config,
  1615. ).get_one(wid)
  1616. u2 = UserApi(
  1617. current_user=None,
  1618. session=self.session,
  1619. config=self.app_config,
  1620. ).get_one(u2id)
  1621. api = ContentApi(
  1622. current_user=user1,
  1623. session=self.session,
  1624. config=self.app_config,
  1625. show_archived=True,
  1626. )
  1627. api2 = ContentApi(
  1628. current_user=u2,
  1629. session=self.session,
  1630. config=self.app_config,
  1631. show_archived=True,
  1632. )
  1633. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1634. eq_(u2id, updated.owner_id,
  1635. 'the owner id should be {} (found {})'.format(u2id,
  1636. updated.owner_id))
  1637. eq_(True, updated.is_archived)
  1638. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1639. ####
  1640. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1641. with new_revision(
  1642. session=self.session,
  1643. tm=transaction.manager,
  1644. content=updated,
  1645. ):
  1646. api.unarchive(updated)
  1647. api.save(updated2)
  1648. eq_(False, updated2.is_archived)
  1649. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1650. eq_(u1id, updated2.owner_id)
  1651. def test_delete_undelete(self):
  1652. uapi = UserApi(
  1653. session=self.session,
  1654. config=self.app_config,
  1655. current_user=None,
  1656. )
  1657. group_api = GroupApi(
  1658. current_user=None,
  1659. session=self.session,
  1660. config=self.app_config,
  1661. )
  1662. groups = [group_api.get_one(Group.TIM_USER),
  1663. group_api.get_one(Group.TIM_MANAGER),
  1664. group_api.get_one(Group.TIM_ADMIN)]
  1665. user1 = uapi.create_minimal_user(
  1666. email='this.is@user',
  1667. groups=groups,
  1668. save_now=True
  1669. )
  1670. u1id = user1.user_id
  1671. workspace_api = WorkspaceApi(
  1672. current_user=user1,
  1673. session=self.session,
  1674. config=self.app_config,
  1675. )
  1676. workspace = workspace_api.create_workspace(
  1677. 'test workspace',
  1678. save_now=True
  1679. )
  1680. wid = workspace.workspace_id
  1681. user2 = uapi.create_minimal_user('this.is@another.user')
  1682. uapi.save(user2)
  1683. RoleApi(
  1684. current_user=user1,
  1685. session=self.session,
  1686. config=self.app_config,
  1687. ).create_one(
  1688. user2,
  1689. workspace,
  1690. UserRoleInWorkspace.CONTENT_MANAGER,
  1691. with_notif=True,
  1692. flush=True
  1693. )
  1694. # show archived is used at the top end of the test
  1695. api = ContentApi(
  1696. current_user=user1,
  1697. session=self.session,
  1698. config=self.app_config,
  1699. show_deleted=True,
  1700. )
  1701. p = api.create(
  1702. content_type=ContentType.File,
  1703. workspace=workspace,
  1704. parent=None,
  1705. label='this_is_a_page',
  1706. do_save=True
  1707. )
  1708. u1id = user1.user_id
  1709. u2id = user2.user_id
  1710. pcid = p.content_id
  1711. poid = p.owner_id
  1712. transaction.commit()
  1713. ####
  1714. user1 = UserApi(
  1715. current_user=None,
  1716. session=self.session,
  1717. config=self.app_config,
  1718. ).get_one(u1id)
  1719. workspace = WorkspaceApi(
  1720. current_user=user1,
  1721. session=self.session,
  1722. config=self.app_config,
  1723. ).get_one(wid)
  1724. content = api.get_one(pcid, ContentType.Any, workspace)
  1725. eq_(u1id, content.owner_id)
  1726. eq_(poid, content.owner_id)
  1727. u2 = UserApi(
  1728. current_user=None,
  1729. session=self.session,
  1730. config=self.app_config,
  1731. ).get_one(u2id)
  1732. api2 = ContentApi(
  1733. current_user=u2,
  1734. session=self.session,
  1735. config=self.app_config,
  1736. show_deleted=True,
  1737. )
  1738. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1739. with new_revision(
  1740. session=self.session,
  1741. tm=transaction.manager,
  1742. content=content2,
  1743. ):
  1744. api2.delete(content2)
  1745. api2.save(content2)
  1746. transaction.commit()
  1747. ####
  1748. user1 = UserApi(
  1749. current_user=None,
  1750. session=self.session,
  1751. config=self.app_config,
  1752. ).get_one(u1id)
  1753. workspace = WorkspaceApi(
  1754. current_user=user1,
  1755. session=self.session,
  1756. config=self.app_config,
  1757. ).get_one(wid)
  1758. # show archived is used at the top end of the test
  1759. api = ContentApi(
  1760. current_user=user1,
  1761. session=self.session,
  1762. config=self.app_config,
  1763. show_deleted=True,
  1764. )
  1765. u2 = UserApi(
  1766. current_user=None,
  1767. session=self.session,
  1768. config=self.app_config,
  1769. ).get_one(u2id)
  1770. api2 = ContentApi(
  1771. current_user=u2,
  1772. session=self.session,
  1773. config=self.app_config,
  1774. show_deleted=True
  1775. )
  1776. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1777. eq_(u2id, updated.owner_id,
  1778. 'the owner id should be {} (found {})'.format(u2id,
  1779. updated.owner_id))
  1780. eq_(True, updated.is_deleted)
  1781. eq_(ActionDescription.DELETION, updated.revision_type)
  1782. ####
  1783. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1784. with new_revision(
  1785. tm=transaction.manager,
  1786. session=self.session,
  1787. content=updated2,
  1788. ):
  1789. api.undelete(updated2)
  1790. api.save(updated2)
  1791. eq_(False, updated2.is_deleted)
  1792. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1793. eq_(u1id, updated2.owner_id)
  1794. def test_search_in_label(self):
  1795. # HACK - D.A. - 2015-03-09
  1796. # This test is based on a bug which does NOT return results found
  1797. # at root of a workspace (eg a folder)
  1798. uapi = UserApi(
  1799. session=self.session,
  1800. config=self.app_config,
  1801. current_user=None,
  1802. )
  1803. group_api = GroupApi(
  1804. current_user=None,
  1805. session=self.session,
  1806. config=self.app_config,
  1807. )
  1808. groups = [group_api.get_one(Group.TIM_USER),
  1809. group_api.get_one(Group.TIM_MANAGER),
  1810. group_api.get_one(Group.TIM_ADMIN)]
  1811. user = uapi.create_minimal_user(email='this.is@user',
  1812. groups=groups, save_now=True)
  1813. workspace = WorkspaceApi(
  1814. current_user=user,
  1815. session=self.session,
  1816. config=self.app_config,
  1817. ).create_workspace(
  1818. 'test workspace',
  1819. save_now=True
  1820. )
  1821. api = ContentApi(
  1822. current_user=user,
  1823. session=self.session,
  1824. config=self.app_config,
  1825. )
  1826. a = api.create(ContentType.Folder, workspace, None,
  1827. 'this is randomized folder', True)
  1828. p = api.create(ContentType.Page, workspace, a,
  1829. 'this is randomized label content', True)
  1830. with new_revision(
  1831. session=self.session,
  1832. tm=transaction.manager,
  1833. content=p,
  1834. ):
  1835. p.description = 'This is some amazing test'
  1836. api.save(p)
  1837. original_id = p.content_id
  1838. res = api.search(['randomized'])
  1839. eq_(1, len(res.all()))
  1840. item = res.all()[0]
  1841. eq_(original_id, item.content_id)
  1842. def test_search_in_description(self):
  1843. # HACK - D.A. - 2015-03-09
  1844. # This test is based on a bug which does NOT return results found
  1845. # at root of a workspace (eg a folder)
  1846. uapi = UserApi(
  1847. session=self.session,
  1848. config=self.app_config,
  1849. current_user=None,
  1850. )
  1851. group_api = GroupApi(
  1852. current_user=None,
  1853. session=self.session,
  1854. config=self.app_config,
  1855. )
  1856. groups = [group_api.get_one(Group.TIM_USER),
  1857. group_api.get_one(Group.TIM_MANAGER),
  1858. group_api.get_one(Group.TIM_ADMIN)]
  1859. user = uapi.create_minimal_user(email='this.is@user',
  1860. groups=groups, save_now=True)
  1861. workspace = WorkspaceApi(
  1862. current_user=user,
  1863. session=self.session,
  1864. config=self.app_config,
  1865. ).create_workspace(
  1866. 'test workspace',
  1867. save_now=True,
  1868. )
  1869. api = ContentApi(
  1870. current_user=user,
  1871. session=self.session,
  1872. config=self.app_config,
  1873. )
  1874. a = api.create(ContentType.Folder, workspace, None,
  1875. 'this is randomized folder', True)
  1876. p = api.create(ContentType.Page, workspace, a,
  1877. 'this is dummy label content', True)
  1878. with new_revision(
  1879. tm=transaction.manager,
  1880. session=self.session,
  1881. content=p,
  1882. ):
  1883. p.description = 'This is some amazing test'
  1884. api.save(p)
  1885. original_id = p.content_id
  1886. res = api.search(['dummy'])
  1887. eq_(1, len(res.all()))
  1888. item = res.all()[0]
  1889. eq_(original_id, item.content_id)
  1890. def test_search_in_label_or_description(self):
  1891. # HACK - D.A. - 2015-03-09
  1892. # This test is based on a bug which does NOT return results found
  1893. # at root of a workspace (eg a folder)
  1894. uapi = UserApi(
  1895. session=self.session,
  1896. config=self.app_config,
  1897. current_user=None,
  1898. )
  1899. group_api = GroupApi(
  1900. current_user=None,
  1901. session=self.session,
  1902. config=self.app_config,
  1903. )
  1904. groups = [group_api.get_one(Group.TIM_USER),
  1905. group_api.get_one(Group.TIM_MANAGER),
  1906. group_api.get_one(Group.TIM_ADMIN)]
  1907. user = uapi.create_minimal_user(email='this.is@user',
  1908. groups=groups, save_now=True)
  1909. workspace = WorkspaceApi(
  1910. current_user=user,
  1911. session=self.session,
  1912. config=self.app_config,
  1913. ).create_workspace('test workspace', save_now=True)
  1914. api = ContentApi(
  1915. current_user=user,
  1916. session=self.session,
  1917. config=self.app_config,
  1918. )
  1919. a = api.create(
  1920. content_type=ContentType.Folder,
  1921. workspace=workspace,
  1922. parent=None,
  1923. label='this is randomized folder',
  1924. do_save=True
  1925. )
  1926. p1 = api.create(
  1927. content_type=ContentType.Page,
  1928. workspace=workspace,
  1929. parent=a,
  1930. label='this is dummy label content',
  1931. do_save=True
  1932. )
  1933. p2 = api.create(
  1934. content_type=ContentType.Page,
  1935. workspace=workspace,
  1936. parent=a,
  1937. label='Hey ! Jon !',
  1938. do_save=True
  1939. )
  1940. with new_revision(
  1941. session=self.session,
  1942. tm=transaction.manager,
  1943. content=p1,
  1944. ):
  1945. p1.description = 'This is some amazing test'
  1946. with new_revision(
  1947. session=self.session,
  1948. tm=transaction.manager,
  1949. content=p2,
  1950. ):
  1951. p2.description = 'What\'s up?'
  1952. api.save(p1)
  1953. api.save(p2)
  1954. id1 = p1.content_id
  1955. id2 = p2.content_id
  1956. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1957. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1958. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1959. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1960. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1961. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up?').count())
  1962. res = api.search(['dummy', 'jon'])
  1963. eq_(2, len(res.all()))
  1964. eq_(True, id1 in [o.content_id for o in res.all()])
  1965. eq_(True, id2 in [o.content_id for o in res.all()])
  1966. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1967. admin = self.session.query(User)\
  1968. .filter(User.email == 'admin@admin.admin').one()
  1969. workspace = self._create_workspace_and_test(
  1970. 'workspace_1',
  1971. admin
  1972. )
  1973. folder_1 = self._create_content_and_test(
  1974. 'folder_1',
  1975. workspace=workspace,
  1976. type=ContentType.Folder
  1977. )
  1978. folder_2 = self._create_content_and_test(
  1979. 'folder_2',
  1980. workspace=workspace,
  1981. type=ContentType.Folder
  1982. )
  1983. page_1 = self._create_content_and_test(
  1984. 'foo', workspace=workspace,
  1985. type=ContentType.Page,
  1986. parent=folder_1
  1987. )
  1988. page_2 = self._create_content_and_test(
  1989. 'bar',
  1990. workspace=workspace,
  1991. type=ContentType.Page,
  1992. parent=folder_2
  1993. )
  1994. api = ContentApi(
  1995. current_user=admin,
  1996. session=self.session,
  1997. config=self.app_config,
  1998. )
  1999. foo_result = api.search(['foo']).all()
  2000. eq_(1, len(foo_result))
  2001. assert page_1 in foo_result
  2002. bar_result = api.search(['bar']).all()
  2003. eq_(1, len(bar_result))
  2004. assert page_2 in bar_result
  2005. with new_revision(
  2006. session=self.session,
  2007. tm=transaction.manager,
  2008. content=folder_1,
  2009. ):
  2010. api.delete(folder_1)
  2011. with new_revision(
  2012. session=self.session,
  2013. tm=transaction.manager,
  2014. content=folder_2,
  2015. ):
  2016. api.archive(folder_2)
  2017. # Actually ContentApi.search don't filter it
  2018. foo_result = api.search(['foo']).all()
  2019. eq_(1, len(foo_result))
  2020. assert page_1 in foo_result
  2021. bar_result = api.search(['bar']).all()
  2022. eq_(1, len(bar_result))
  2023. assert page_2 in bar_result
  2024. # ContentApi offer exclude_unavailable method to do it
  2025. foo_result = api.search(['foo']).all()
  2026. api.exclude_unavailable(foo_result)
  2027. eq_(0, len(foo_result))
  2028. bar_result = api.search(['bar']).all()
  2029. api.exclude_unavailable(bar_result)
  2030. eq_(0, len(bar_result))
  2031. class TestContentApiSecurity(DefaultTest):
  2032. fixtures = [FixtureTest, ]
  2033. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  2034. admin = self.session.query(User)\
  2035. .filter(User.email == 'admin@admin.admin').one()
  2036. bob = self.session.query(User)\
  2037. .filter(User.email == 'bob@fsf.local').one()
  2038. bob_workspace = WorkspaceApi(
  2039. current_user=bob,
  2040. session=self.session,
  2041. config=self.app_config,
  2042. ).create_workspace(
  2043. 'bob_workspace',
  2044. save_now=True,
  2045. )
  2046. admin_workspace = WorkspaceApi(
  2047. current_user=admin,
  2048. session=self.session,
  2049. config=self.app_config,
  2050. ).create_workspace(
  2051. 'admin_workspace',
  2052. save_now=True,
  2053. )
  2054. bob_page = ContentApi(
  2055. current_user=bob,
  2056. session=self.session,
  2057. config=self.app_config,
  2058. ).create(
  2059. content_type=ContentType.Page,
  2060. workspace=bob_workspace,
  2061. label='bob_page',
  2062. do_save=True,
  2063. )
  2064. admin_page = ContentApi(
  2065. current_user=admin,
  2066. session=self.session,
  2067. config=self.app_config,
  2068. ).create(
  2069. content_type=ContentType.Page,
  2070. workspace=admin_workspace,
  2071. label='admin_page',
  2072. do_save=True,
  2073. )
  2074. bob_viewable = ContentApi(
  2075. current_user=bob,
  2076. session=self.session,
  2077. config=self.app_config,
  2078. ).get_all()
  2079. eq_(1, len(bob_viewable), 'Bob should view only one content')
  2080. eq_(
  2081. 'bob_page',
  2082. bob_viewable[0].label,
  2083. 'Bob should not view "{0}" content'.format(
  2084. bob_viewable[0].label,
  2085. )
  2086. )