test_content_api.py 64KB


  1. # -*- coding: utf-8 -*-
  2. import transaction
  3. import pytest
  4. from tracim.config import CFG
  5. from tracim.lib.core.content import compare_content_for_sorting_by_type_and_name
  6. from tracim.lib.core.content import ContentApi
  7. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  8. from tracim.lib.core.group import GroupApi
  9. from tracim.lib.core.user import UserApi
  10. from tracim.exceptions import SameValueError
  11. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  12. from tracim.lib.core.workspace import RoleApi
  13. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  14. from tracim.lib.core.workspace import WorkspaceApi
  15. from tracim.models.revision_protection import new_revision
  16. from tracim.models.auth import User
  17. from tracim.models.auth import Group
  18. from tracim.models.data import ActionDescription
  19. from tracim.models.data import ContentRevisionRO
  20. from tracim.models.data import Workspace
  21. from tracim.models.data import Content
  22. from tracim.models.data import ContentType
  23. from tracim.models.data import UserRoleInWorkspace
  24. from tracim.fixtures.users_and_groups import Test as FixtureTest
  25. from tracim.tests import DefaultTest
  26. from tracim.tests import eq_
  27. class TestContentApi(DefaultTest):
  28. def test_compare_content_for_sorting_by_type(self):
  29. c1 = Content()
  30. c1.label = ''
  31. c1.type = 'file'
  32. c2 = Content()
  33. c2.label = ''
  34. c2.type = 'folder'
  35. c11 = c1
  36. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  37. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  38. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  39. def test_compare_content_for_sorting_by_label(self):
  40. c1 = Content()
  41. c1.label = 'bbb'
  42. c1.type = 'file'
  43. c2 = Content()
  44. c2.label = 'aaa'
  45. c2.type = 'file'
  46. c11 = c1
  47. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  48. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  49. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  50. def test_sort_by_label_or_filename(self):
  51. c1 = Content()
  52. c1.label = 'ABCD'
  53. c1.type = 'file'
  54. c2 = Content()
  55. c2.label = ''
  56. c2.type = 'file'
  57. c2.file_name = 'AABC'
  58. c3 = Content()
  59. c3.label = 'BCDE'
  60. c3.type = 'file'
  61. items = [c1, c2, c3]
  62. sorteds = ContentApi.sort_content(items)
  63. eq_(sorteds[0], c2)
  64. eq_(sorteds[1], c1)
  65. eq_(sorteds[2], c3)
  66. def test_sort_by_content_type(self):
  67. c1 = Content()
  68. c1.label = 'AAAA'
  69. c1.type = 'file'
  70. c2 = Content()
  71. c2.label = 'BBBB'
  72. c2.type = 'folder'
  73. items = [c1, c2]
  74. sorteds = ContentApi.sort_content(items)
  75. eq_(sorteds[0], c2,
  76. 'value is {} instead of {}'.format(sorteds[0].content_id,
  77. c2.content_id))
  78. eq_(sorteds[1], c1,
  79. 'value is {} instead of {}'.format(sorteds[1].content_id,
  80. c1.content_id))
  81. def test_delete(self):
  82. uapi = UserApi(
  83. session=self.session,
  84. config=self.app_config,
  85. current_user=None,
  86. )
  87. group_api = GroupApi(current_user=None,session=self.session)
  88. groups = [group_api.get_one(Group.TIM_USER),
  89. group_api.get_one(Group.TIM_MANAGER),
  90. group_api.get_one(Group.TIM_ADMIN)]
  91. user = uapi.create_user(email='this.is@user',
  92. groups=groups, save_now=True)
  93. workspace = WorkspaceApi(
  94. current_user=user,
  95. session=self.session
  96. ).create_workspace('test workspace', save_now=True)
  97. api = ContentApi(
  98. current_user=user,
  99. session=self.session,
  100. config=self.app_config,
  101. )
  102. item = api.create(ContentType.Folder, workspace, None,
  103. 'not_deleted', True)
  104. item2 = api.create(ContentType.Folder, workspace, None,
  105. 'to_delete', True)
  106. uid = user.user_id
  107. wid = workspace.workspace_id
  108. transaction.commit()
  109. # Refresh instances after commit
  110. user = uapi.get_one(uid)
  111. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  112. workspace = workspace_api.get_one(wid)
  113. api = ContentApi(
  114. current_user=user,
  115. session=self.session,
  116. config=self.app_config,
  117. )
  118. items = api.get_all(None, ContentType.Any, workspace)
  119. eq_(2, len(items))
  120. items = api.get_all(None, ContentType.Any, workspace)
  121. with new_revision(
  122. session=self.session,
  123. tm=transaction.manager,
  124. content=items[0]
  125. ):
  126. api.delete(items[0])
  127. transaction.commit()
  128. # Refresh instances after commit
  129. user = uapi.get_one(uid)
  130. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  131. workspace = workspace_api.get_one(wid)
  132. api = ContentApi(
  133. current_user=user,
  134. session=self.session,
  135. config=self.app_config,
  136. )
  137. items = api.get_all(None, ContentType.Any, workspace)
  138. eq_(1, len(items))
  139. transaction.commit()
  140. # Test that the item is still available if "show deleted" is activated
  141. # Refresh instances after commit
  142. user = uapi.get_one(uid)
  143. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  144. api = ContentApi(
  145. current_user=user,
  146. session=self.session,
  147. config=self.app_config,
  148. show_deleted=True,
  149. )
  150. items = api.get_all(None, ContentType.Any, workspace)
  151. eq_(2, len(items))
  152. def test_archive(self):
  153. uapi = UserApi(
  154. session=self.session,
  155. config=self.app_config,
  156. current_user=None,
  157. )
  158. group_api = GroupApi(current_user=None, session=self.session)
  159. groups = [group_api.get_one(Group.TIM_USER),
  160. group_api.get_one(Group.TIM_MANAGER),
  161. group_api.get_one(Group.TIM_ADMIN)]
  162. user = uapi.create_user(email='this.is@user',
  163. groups=groups, save_now=True)
  164. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  165. workspace = workspace_api.create_workspace(
  166. 'test workspace',
  167. save_now=True
  168. )
  169. api = ContentApi(
  170. current_user=user,
  171. session=self.session,
  172. config=self.app_config,
  173. )
  174. item = api.create(ContentType.Folder, workspace, None,
  175. 'not_archived', True)
  176. item2 = api.create(ContentType.Folder, workspace, None,
  177. 'to_archive', True)
  178. uid = user.user_id
  179. wid = workspace.workspace_id
  180. transaction.commit()
  181. # Refresh instances after commit
  182. user = uapi.get_one(uid)
  183. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  184. api = ContentApi(
  185. session=self.session,
  186. current_user=user,
  187. config=self.app_config,
  188. )
  189. items = api.get_all(None, ContentType.Any, workspace)
  190. eq_(2, len(items))
  191. items = api.get_all(None, ContentType.Any, workspace)
  192. with new_revision(
  193. session=self.session,
  194. tm=transaction.manager,
  195. content=items[0],
  196. ):
  197. api.archive(items[0])
  198. transaction.commit()
  199. # Refresh instances after commit
  200. user = uapi.get_one(uid)
  201. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  202. workspace = workspace_api.get_one(wid)
  203. api = ContentApi(
  204. current_user=user,
  205. session=self.session,
  206. config=self.app_config,
  207. )
  208. items = api.get_all(None, ContentType.Any, workspace)
  209. eq_(1, len(items))
  210. transaction.commit()
  211. # Refresh instances after commit
  212. user = uapi.get_one(uid)
  213. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  214. workspace = workspace_api.get_one(wid)
  215. api = ContentApi(
  216. current_user=user,
  217. session=self.session,
  218. config=self.app_config,
  219. )
  220. # Test that the item is still available if "show deleted" is activated
  221. api = ContentApi(
  222. current_user=None,
  223. session=self.session,
  224. config=self.app_config,
  225. show_archived=True,
  226. )
  227. items = api.get_all(None, ContentType.Any, workspace)
  228. eq_(2, len(items))
  229. def test_get_all_with_filter(self):
  230. uapi = UserApi(
  231. session=self.session,
  232. config=self.app_config,
  233. current_user=None,
  234. )
  235. group_api = GroupApi(current_user=None, session=self.session)
  236. groups = [group_api.get_one(Group.TIM_USER),
  237. group_api.get_one(Group.TIM_MANAGER),
  238. group_api.get_one(Group.TIM_ADMIN)]
  239. user = uapi.create_user(
  240. email='this.is@user',
  241. groups=groups,
  242. save_now=True
  243. )
  244. workspace = WorkspaceApi(
  245. current_user=user,
  246. session=self.session
  247. ).create_workspace(
  248. 'test workspace',
  249. save_now=True
  250. )
  251. api = ContentApi(
  252. current_user=user,
  253. session=self.session,
  254. config=self.app_config,
  255. )
  256. item = api.create(ContentType.Folder, workspace, None,
  257. 'thefolder', True)
  258. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  259. uid = user.user_id
  260. wid = workspace.workspace_id
  261. transaction.commit()
  262. # Refresh instances after commit
  263. user = uapi.get_one(uid)
  264. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  265. workspace = workspace_api.get_one(wid)
  266. api = ContentApi(
  267. current_user=user,
  268. session=self.session,
  269. config=self.app_config,
  270. )
  271. items = api.get_all(None, ContentType.Any, workspace)
  272. eq_(2, len(items))
  273. items2 = api.get_all(None, ContentType.File, workspace)
  274. eq_(1, len(items2))
  275. eq_('thefile', items2[0].label)
  276. items3 = api.get_all(None, ContentType.Folder, workspace)
  277. eq_(1, len(items3))
  278. eq_('thefolder', items3[0].label)
  279. def test_get_all_with_parent_id(self):
  280. uapi = UserApi(
  281. session=self.session,
  282. config=self.app_config,
  283. current_user=None,
  284. )
  285. group_api = GroupApi(current_user=None, session=self.session)
  286. groups = [group_api.get_one(Group.TIM_USER),
  287. group_api.get_one(Group.TIM_MANAGER),
  288. group_api.get_one(Group.TIM_ADMIN)]
  289. user = uapi.create_user(email='this.is@user',
  290. groups=groups, save_now=True)
  291. workspace = WorkspaceApi(
  292. current_user=user,
  293. session=self.session
  294. ).create_workspace('test workspace', save_now=True)
  295. api = ContentApi(
  296. current_user=user,
  297. session=self.session,
  298. config=self.app_config,
  299. )
  300. item = api.create(
  301. ContentType.Folder,
  302. workspace,
  303. None,
  304. 'parent',
  305. do_save=True,
  306. )
  307. item2 = api.create(
  308. ContentType.File,
  309. workspace,
  310. item,
  311. 'file1',
  312. do_save=True,
  313. )
  314. item3 = api.create(
  315. ContentType.File,
  316. workspace,
  317. None,
  318. 'file2',
  319. do_save=True,
  320. )
  321. parent_id = item.content_id
  322. child_id = item2.content_id
  323. uid = user.user_id
  324. wid = workspace.workspace_id
  325. transaction.commit()
  326. # Refresh instances after commit
  327. user = uapi.get_one(uid)
  328. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  329. workspace = workspace_api.get_one(wid)
  330. api = ContentApi(
  331. current_user=user,
  332. session=self.session,
  333. config=self.app_config,
  334. )
  335. items = api.get_all(None, ContentType.Any, workspace)
  336. eq_(3, len(items))
  337. items2 = api.get_all(parent_id, ContentType.File, workspace)
  338. eq_(1, len(items2))
  339. eq_(child_id, items2[0].content_id)
  340. def test_set_status_unknown_status(self):
  341. uapi = UserApi(
  342. session=self.session,
  343. config=self.app_config,
  344. current_user=None,
  345. )
  346. group_api = GroupApi(
  347. current_user=None,
  348. session=self.session
  349. )
  350. groups = [group_api.get_one(Group.TIM_USER),
  351. group_api.get_one(Group.TIM_MANAGER),
  352. group_api.get_one(Group.TIM_ADMIN)]
  353. user = uapi.create_user(email='this.is@user',
  354. groups=groups, save_now=True)
  355. workspace = WorkspaceApi(
  356. current_user=user,
  357. session=self.session
  358. ).create_workspace(
  359. 'test workspace',
  360. save_now=True
  361. )
  362. api = ContentApi(
  363. current_user=user,
  364. session=self.session,
  365. config=self.app_config,
  366. )
  367. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  368. with new_revision(
  369. session=self.session,
  370. tm=transaction.manager,
  371. content=c,
  372. ):
  373. with pytest.raises(ValueError):
  374. api.set_status(c, 'unknown-status')
  375. def test_set_status_ok(self):
  376. uapi = UserApi(
  377. session=self.session,
  378. config=self.app_config,
  379. current_user=None,
  380. )
  381. group_api = GroupApi(
  382. current_user=None,
  383. session=self.session
  384. )
  385. groups = [group_api.get_one(Group.TIM_USER),
  386. group_api.get_one(Group.TIM_MANAGER),
  387. group_api.get_one(Group.TIM_ADMIN)]
  388. user = uapi.create_user(email='this.is@user',
  389. groups=groups, save_now=True)
  390. workspace = WorkspaceApi(
  391. current_user=user,
  392. session=self.session
  393. ).create_workspace(
  394. 'test workspace',
  395. save_now=True
  396. )
  397. api = ContentApi(
  398. current_user=user,
  399. session=self.session,
  400. config=self.app_config,
  401. )
  402. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  403. with new_revision(
  404. session=self.session,
  405. tm=transaction.manager,
  406. content=c,
  407. ):
  408. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  409. 'closed-deprecated']:
  410. api.set_status(c, new_status)
  411. eq_(new_status, c.status)
  412. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  413. def test_create_comment_ok(self):
  414. uapi = UserApi(
  415. session=self.session,
  416. config=self.app_config,
  417. current_user=None,
  418. )
  419. group_api = GroupApi(
  420. current_user=None,
  421. session=self.session
  422. )
  423. groups = [group_api.get_one(Group.TIM_USER),
  424. group_api.get_one(Group.TIM_MANAGER),
  425. group_api.get_one(Group.TIM_ADMIN)]
  426. user = uapi.create_user(email='this.is@user',
  427. groups=groups, save_now=True)
  428. workspace = WorkspaceApi(
  429. current_user=user,
  430. session=self.session
  431. ).create_workspace(
  432. 'test workspace',
  433. save_now=True
  434. )
  435. api = ContentApi(
  436. current_user=user,
  437. session=self.session,
  438. config=self.app_config,
  439. )
  440. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  441. c = api.create_comment(workspace, p, 'this is the comment', True)
  442. eq_(Content, c.__class__)
  443. eq_(p.content_id, c.parent_id)
  444. eq_(user, c.owner)
  445. eq_(workspace, c.workspace)
  446. eq_(ContentType.Comment, c.type)
  447. eq_('this is the comment', c.description)
  448. eq_('', c.label)
  449. eq_(ActionDescription.COMMENT, c.revision_type)
  450. def test_unit_copy_file_different_label_different_parent_ok(self):
  451. uapi = UserApi(
  452. session=self.session,
  453. config=self.app_config,
  454. current_user=None,
  455. )
  456. group_api = GroupApi(
  457. current_user=None,
  458. session=self.session
  459. )
  460. groups = [group_api.get_one(Group.TIM_USER),
  461. group_api.get_one(Group.TIM_MANAGER),
  462. group_api.get_one(Group.TIM_ADMIN)]
  463. user = uapi.create_user(
  464. email='user1@user',
  465. groups=groups,
  466. save_now=True
  467. )
  468. user2 = uapi.create_user(
  469. email='user2@user',
  470. groups=groups,
  471. save_now=True
  472. )
  473. workspace = WorkspaceApi(
  474. current_user=user,
  475. session=self.session
  476. ).create_workspace(
  477. 'test workspace',
  478. save_now=True
  479. )
  480. RoleApi(current_user=user, session=self.session).create_one(
  481. user2,
  482. workspace,
  483. UserRoleInWorkspace.WORKSPACE_MANAGER,
  484. with_notif=False
  485. )
  486. api = ContentApi(
  487. current_user=user,
  488. session=self.session,
  489. config=self.app_config,
  490. )
  491. foldera = api.create(
  492. ContentType.Folder,
  493. workspace,
  494. None,
  495. 'folder a',
  496. True
  497. )
  498. with self.session.no_autoflush:
  499. text_file = api.create(
  500. content_type=ContentType.File,
  501. workspace=workspace,
  502. parent=foldera,
  503. label='test_file',
  504. do_save=False,
  505. )
  506. api.update_file_data(
  507. text_file,
  508. 'test_file',
  509. 'text/plain',
  510. b'test_content'
  511. )
  512. api.save(text_file, ActionDescription.CREATION)
  513. api2 = ContentApi(
  514. current_user=user2,
  515. session=self.session,
  516. config=self.app_config,
  517. )
  518. workspace2 = WorkspaceApi(
  519. current_user=user2,
  520. session=self.session,
  521. ).create_workspace(
  522. 'test workspace2',
  523. save_now=True
  524. )
  525. folderb = api2.create(
  526. ContentType.Folder,
  527. workspace2,
  528. None,
  529. 'folder b',
  530. True
  531. )
  532. api2.copy(
  533. item=text_file,
  534. new_parent=folderb,
  535. new_label='test_file_copy'
  536. )
  537. transaction.commit()
  538. text_file_copy = api2.get_one_by_label_and_parent(
  539. 'test_file_copy',
  540. folderb,
  541. )
  542. assert text_file != text_file_copy
  543. assert text_file_copy.content_id != text_file.content_id
  544. assert text_file_copy.workspace_id == workspace2.workspace_id
  545. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  546. assert text_file_copy.depot_file.path != text_file.depot_file.path
  547. assert text_file_copy.label == 'test_file_copy'
  548. assert text_file_copy.type == text_file.type
  549. assert text_file_copy.parent.content_id == folderb.content_id
  550. assert text_file_copy.owner.user_id == user.user_id
  551. assert text_file_copy.description == text_file.description
  552. assert text_file_copy.file_extension == text_file.file_extension
  553. assert text_file_copy.file_mimetype == text_file.file_mimetype
  554. assert text_file_copy.revision_type == ActionDescription.COPY
  555. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  556. def test_unit_copy_file__same_label_different_parent_ok(self):
  557. uapi = UserApi(
  558. session=self.session,
  559. config=self.app_config,
  560. current_user=None,
  561. )
  562. group_api = GroupApi(
  563. current_user=None,
  564. session=self.session
  565. )
  566. groups = [group_api.get_one(Group.TIM_USER),
  567. group_api.get_one(Group.TIM_MANAGER),
  568. group_api.get_one(Group.TIM_ADMIN)]
  569. user = uapi.create_user(
  570. email='user1@user',
  571. groups=groups,
  572. save_now=True
  573. )
  574. user2 = uapi.create_user(
  575. email='user2@user',
  576. groups=groups,
  577. save_now=True
  578. )
  579. workspace = WorkspaceApi(
  580. current_user=user,
  581. session=self.session
  582. ).create_workspace(
  583. 'test workspace',
  584. save_now=True
  585. )
  586. RoleApi(current_user=user, session=self.session).create_one(
  587. user2,
  588. workspace,
  589. UserRoleInWorkspace.WORKSPACE_MANAGER,
  590. with_notif=False
  591. )
  592. api = ContentApi(
  593. current_user=user,
  594. session=self.session,
  595. config=self.app_config,
  596. )
  597. foldera = api.create(
  598. ContentType.Folder,
  599. workspace,
  600. None,
  601. 'folder a',
  602. True
  603. )
  604. with self.session.no_autoflush:
  605. text_file = api.create(
  606. content_type=ContentType.File,
  607. workspace=workspace,
  608. parent=foldera,
  609. label='test_file',
  610. do_save=False,
  611. )
  612. api.update_file_data(
  613. text_file,
  614. 'test_file',
  615. 'text/plain',
  616. b'test_content'
  617. )
  618. api.save(text_file, ActionDescription.CREATION)
  619. api2 = ContentApi(
  620. current_user=user2,
  621. session=self.session,
  622. config=self.app_config,
  623. )
  624. workspace2 = WorkspaceApi(
  625. current_user=user2,
  626. session=self.session
  627. ).create_workspace(
  628. 'test workspace2',
  629. save_now=True
  630. )
  631. folderb = api2.create(
  632. ContentType.Folder,
  633. workspace2,
  634. None,
  635. 'folder b',
  636. True
  637. )
  638. api2.copy(
  639. item=text_file,
  640. new_parent=folderb,
  641. )
  642. transaction.commit()
  643. text_file_copy = api2.get_one_by_label_and_parent(
  644. 'test_file',
  645. folderb,
  646. )
  647. assert text_file != text_file_copy
  648. assert text_file_copy.content_id != text_file.content_id
  649. assert text_file_copy.workspace_id == workspace2.workspace_id
  650. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  651. assert text_file_copy.depot_file.path != text_file.depot_file.path
  652. assert text_file_copy.label == text_file.label
  653. assert text_file_copy.type == text_file.type
  654. assert text_file_copy.parent.content_id == folderb.content_id
  655. assert text_file_copy.owner.user_id == user.user_id
  656. assert text_file_copy.description == text_file.description
  657. assert text_file_copy.file_extension == text_file.file_extension
  658. assert text_file_copy.file_mimetype == text_file.file_mimetype
  659. assert text_file_copy.revision_type == ActionDescription.COPY
  660. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  661. def test_unit_copy_file_different_label_same_parent_ok(self):
  662. uapi = UserApi(
  663. session=self.session,
  664. config=self.app_config,
  665. current_user=None,
  666. )
  667. group_api = GroupApi(
  668. current_user=None,
  669. session=self.session
  670. )
  671. groups = [group_api.get_one(Group.TIM_USER),
  672. group_api.get_one(Group.TIM_MANAGER),
  673. group_api.get_one(Group.TIM_ADMIN)]
  674. user = uapi.create_user(
  675. email='user1@user',
  676. groups=groups,
  677. save_now=True,
  678. )
  679. user2 = uapi.create_user(
  680. email='user2@user',
  681. groups=groups,
  682. save_now=True
  683. )
  684. workspace = WorkspaceApi(
  685. current_user=user,
  686. session=self.session
  687. ).create_workspace(
  688. 'test workspace',
  689. save_now=True
  690. )
  691. RoleApi(current_user=user, session=self.session).create_one(
  692. user2, workspace,
  693. UserRoleInWorkspace.WORKSPACE_MANAGER,
  694. with_notif=False
  695. )
  696. api = ContentApi(
  697. current_user=user,
  698. session=self.session,
  699. config=self.app_config,
  700. )
  701. foldera = api.create(
  702. ContentType.Folder,
  703. workspace,
  704. None,
  705. 'folder a',
  706. True
  707. )
  708. with self.session.no_autoflush:
  709. text_file = api.create(
  710. content_type=ContentType.File,
  711. workspace=workspace,
  712. parent=foldera,
  713. label='test_file',
  714. do_save=False,
  715. )
  716. api.update_file_data(
  717. text_file,
  718. 'test_file',
  719. 'text/plain',
  720. b'test_content'
  721. )
  722. api.save(
  723. text_file,
  724. ActionDescription.CREATION
  725. )
  726. api2 = ContentApi(
  727. current_user=user2,
  728. session=self.session,
  729. config=self.app_config,
  730. )
  731. api2.copy(
  732. item=text_file,
  733. new_label='test_file_copy'
  734. )
  735. transaction.commit()
  736. text_file_copy = api2.get_one_by_label_and_parent(
  737. 'test_file_copy',
  738. foldera,
  739. )
  740. assert text_file != text_file_copy
  741. assert text_file_copy.content_id != text_file.content_id
  742. assert text_file_copy.workspace_id == workspace.workspace_id
  743. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  744. assert text_file_copy.depot_file.path != text_file.depot_file.path
  745. assert text_file_copy.label == 'test_file_copy'
  746. assert text_file_copy.type == text_file.type
  747. assert text_file_copy.parent.content_id == foldera.content_id
  748. assert text_file_copy.owner.user_id == user.user_id
  749. assert text_file_copy.description == text_file.description
  750. assert text_file_copy.file_extension == text_file.file_extension
  751. assert text_file_copy.file_mimetype == text_file.file_mimetype
  752. assert text_file_copy.revision_type == ActionDescription.COPY
  753. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  754. def test_mark_read__workspace(self):
  755. uapi = UserApi(
  756. session=self.session,
  757. config=self.app_config,
  758. current_user=None,
  759. )
  760. group_api = GroupApi(
  761. current_user=None,
  762. session=self.session
  763. )
  764. groups = [group_api.get_one(Group.TIM_USER),
  765. group_api.get_one(Group.TIM_MANAGER),
  766. group_api.get_one(Group.TIM_ADMIN)]
  767. user_a = uapi.create_user(email='this.is@user',
  768. groups=groups, save_now=True)
  769. user_b = uapi.create_user(email='this.is@another.user',
  770. groups=groups, save_now=True)
  771. wapi = WorkspaceApi(
  772. current_user=user_a,
  773. session=self.session,
  774. )
  775. workspace1 = wapi.create_workspace(
  776. 'test workspace n°1',
  777. save_now=True)
  778. workspace2 = wapi.create_workspace(
  779. 'test workspace n°2',
  780. save_now=True)
  781. role_api1 = RoleApi(
  782. current_user=user_a,
  783. session=self.session,
  784. )
  785. role_api1.create_one(
  786. user_b,
  787. workspace1,
  788. UserRoleInWorkspace.READER,
  789. False
  790. )
  791. role_api2 = RoleApi(
  792. current_user=user_b,
  793. session=self.session,
  794. )
  795. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  796. False)
  797. cont_api_a = ContentApi(
  798. current_user=user_a,
  799. session=self.session,
  800. config=self.app_config,
  801. )
  802. cont_api_b = ContentApi(
  803. current_user=user_b,
  804. session=self.session,
  805. config=self.app_config,
  806. )
  807. # Creates page_1 & page_2 in workspace 1
  808. # and page_3 & page_4 in workspace 2
  809. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  810. 'this is a page', do_save=True)
  811. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  812. 'this is page1', do_save=True)
  813. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  814. 'this is page2', do_save=True)
  815. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  816. 'this is page3', do_save=True)
  817. for rev in page_1.revisions:
  818. eq_(user_b not in rev.read_by.keys(), True)
  819. for rev in page_2.revisions:
  820. eq_(user_b not in rev.read_by.keys(), True)
  821. for rev in page_3.revisions:
  822. eq_(user_b not in rev.read_by.keys(), True)
  823. for rev in page_4.revisions:
  824. eq_(user_b not in rev.read_by.keys(), True)
  825. # Set as read the workspace n°1
  826. cont_api_b.mark_read__workspace(workspace=workspace1)
  827. for rev in page_1.revisions:
  828. eq_(user_b in rev.read_by.keys(), True)
  829. for rev in page_2.revisions:
  830. eq_(user_b in rev.read_by.keys(), True)
  831. for rev in page_3.revisions:
  832. eq_(user_b not in rev.read_by.keys(), True)
  833. for rev in page_4.revisions:
  834. eq_(user_b not in rev.read_by.keys(), True)
  835. # Set as read the workspace n°2
  836. cont_api_b.mark_read__workspace(workspace=workspace2)
  837. for rev in page_1.revisions:
  838. eq_(user_b in rev.read_by.keys(), True)
  839. for rev in page_2.revisions:
  840. eq_(user_b in rev.read_by.keys(), True)
  841. for rev in page_3.revisions:
  842. eq_(user_b in rev.read_by.keys(), True)
  843. for rev in page_4.revisions:
  844. eq_(user_b in rev.read_by.keys(), True)
  845. def test_mark_read(self):
  846. uapi = UserApi(
  847. session=self.session,
  848. config=self.app_config,
  849. current_user=None,
  850. )
  851. group_api = GroupApi(
  852. current_user=None,
  853. session=self.session
  854. )
  855. groups = [group_api.get_one(Group.TIM_USER),
  856. group_api.get_one(Group.TIM_MANAGER),
  857. group_api.get_one(Group.TIM_ADMIN)]
  858. user_a = uapi.create_user(
  859. email='this.is@user',
  860. groups=groups,
  861. save_now=True
  862. )
  863. user_b = uapi.create_user(
  864. email='this.is@another.user',
  865. groups=groups,
  866. save_now=True
  867. )
  868. wapi = WorkspaceApi(current_user=user_a, session=self.session)
  869. workspace_api = WorkspaceApi(
  870. current_user=user_a,
  871. session=self.session
  872. )
  873. workspace = wapi.create_workspace(
  874. 'test workspace',
  875. save_now=True)
  876. role_api = RoleApi(
  877. current_user=user_a,
  878. session=self.session,
  879. )
  880. role_api.create_one(
  881. user_b,
  882. workspace,
  883. UserRoleInWorkspace.READER,
  884. False
  885. )
  886. cont_api_a = ContentApi(
  887. current_user=user_a,
  888. session=self.session,
  889. config=self.app_config,
  890. )
  891. cont_api_b = ContentApi(
  892. current_user=user_b,
  893. session=self.session,
  894. config=self.app_config,
  895. )
  896. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  897. 'this is a page', do_save=True)
  898. for rev in page_1.revisions:
  899. eq_(user_b not in rev.read_by.keys(), True)
  900. cont_api_b.mark_read(page_1)
  901. for rev in page_1.revisions:
  902. eq_(user_b in rev.read_by.keys(), True)
  903. def test_mark_read__all(self):
  904. uapi = UserApi(
  905. session=self.session,
  906. config=self.app_config,
  907. current_user=None,
  908. )
  909. group_api = GroupApi(current_user=None, session=self.session)
  910. groups = [group_api.get_one(Group.TIM_USER),
  911. group_api.get_one(Group.TIM_MANAGER),
  912. group_api.get_one(Group.TIM_ADMIN)]
  913. user_a = uapi.create_user(
  914. email='this.is@user',
  915. groups=groups,
  916. save_now=True
  917. )
  918. user_b = uapi.create_user(
  919. email='this.is@another.user',
  920. groups=groups,
  921. save_now=True
  922. )
  923. wapi = WorkspaceApi(
  924. current_user=user_a,
  925. session=self.session,
  926. )
  927. workspace = wapi.create_workspace(
  928. 'test workspace',
  929. save_now=True)
  930. role_api = RoleApi(
  931. current_user=user_a,
  932. session=self.session,
  933. )
  934. role_api.create_one(
  935. user_b,
  936. workspace,
  937. UserRoleInWorkspace.READER,
  938. False
  939. )
  940. cont_api_a = ContentApi(
  941. current_user=user_a,
  942. session=self.session,
  943. config=self.app_config,
  944. )
  945. cont_api_b = ContentApi(
  946. current_user=user_b,
  947. session=self.session,
  948. config=self.app_config,
  949. )
  950. page_2 = cont_api_a.create(
  951. ContentType.Page,
  952. workspace,
  953. None,
  954. 'this is page1',
  955. do_save=True
  956. )
  957. page_3 = cont_api_a.create(
  958. ContentType.Thread,
  959. workspace,
  960. None,
  961. 'this is page2',
  962. do_save=True
  963. )
  964. page_4 = cont_api_a.create(
  965. ContentType.File,
  966. workspace,
  967. None,
  968. 'this is page3',
  969. do_save=True
  970. )
  971. for rev in page_2.revisions:
  972. eq_(user_b not in rev.read_by.keys(), True)
  973. for rev in page_3.revisions:
  974. eq_(user_b not in rev.read_by.keys(), True)
  975. for rev in page_4.revisions:
  976. eq_(user_b not in rev.read_by.keys(), True)
  977. self.session.refresh(page_2)
  978. self.session.refresh(page_3)
  979. self.session.refresh(page_4)
  980. cont_api_b.mark_read__all()
  981. for rev in page_2.revisions:
  982. eq_(user_b in rev.read_by.keys(), True)
  983. for rev in page_3.revisions:
  984. eq_(user_b in rev.read_by.keys(), True)
  985. for rev in page_4.revisions:
  986. eq_(user_b in rev.read_by.keys(), True)
  987. def test_update(self):
  988. uapi = UserApi(
  989. session=self.session,
  990. config=self.app_config,
  991. current_user=None,
  992. )
  993. group_api = GroupApi(current_user=None, session=self.session)
  994. groups = [group_api.get_one(Group.TIM_USER),
  995. group_api.get_one(Group.TIM_MANAGER),
  996. group_api.get_one(Group.TIM_ADMIN)]
  997. user1 = uapi.create_user(
  998. email='this.is@user',
  999. groups=groups,
  1000. save_now=True
  1001. )
  1002. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1003. workspace = workspace_api.create_workspace(
  1004. 'test workspace',
  1005. save_now=True
  1006. )
  1007. wid = workspace.workspace_id
  1008. user2 = uapi.create_user()
  1009. user2.email = 'this.is@another.user'
  1010. uapi.save(user2)
  1011. RoleApi(
  1012. current_user=user1,
  1013. session=self.session
  1014. ).create_one(
  1015. user2,
  1016. workspace,
  1017. UserRoleInWorkspace.CONTENT_MANAGER,
  1018. with_notif=False,
  1019. flush=True
  1020. )
  1021. # Test starts here
  1022. api = ContentApi(
  1023. current_user=user1,
  1024. session=self.session,
  1025. config=self.app_config,
  1026. )
  1027. p = api.create(ContentType.Page, workspace, None,
  1028. 'this_is_a_page', True)
  1029. u1id = user1.user_id
  1030. u2id = user2.user_id
  1031. pcid = p.content_id
  1032. poid = p.owner_id
  1033. transaction.commit()
  1034. # Refresh instances after commit
  1035. user1 = uapi.get_one(u1id)
  1036. workspace = WorkspaceApi(
  1037. current_user=user1,
  1038. session=self.session
  1039. ).get_one(wid)
  1040. api = ContentApi(
  1041. current_user=user1,
  1042. session=self.session,
  1043. config=self.app_config,
  1044. )
  1045. content = api.get_one(pcid, ContentType.Any, workspace)
  1046. eq_(u1id, content.owner_id)
  1047. eq_(poid, content.owner_id)
  1048. u2 = UserApi(
  1049. session=self.session,
  1050. config=self.app_config,
  1051. current_user=None,
  1052. ).get_one(u2id)
  1053. api2 = ContentApi(
  1054. current_user=u2,
  1055. session=self.session,
  1056. config=self.app_config,
  1057. )
  1058. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1059. with new_revision(
  1060. session=self.session,
  1061. tm=transaction.manager,
  1062. content=content2,
  1063. ):
  1064. api2.update_content(
  1065. content2,
  1066. 'this is an updated page',
  1067. 'new content'
  1068. )
  1069. api2.save(content2)
  1070. transaction.commit()
  1071. # Refresh instances after commit
  1072. user1 = uapi.get_one(u1id)
  1073. workspace = WorkspaceApi(
  1074. current_user=user1,
  1075. session=self.session,
  1076. ).get_one(wid)
  1077. api = ContentApi(
  1078. current_user=user1,
  1079. session=self.session,
  1080. config=self.app_config,
  1081. )
  1082. updated = api.get_one(pcid, ContentType.Any, workspace)
  1083. eq_(u2id, updated.owner_id,
  1084. 'the owner id should be {} (found {})'.format(u2id,
  1085. updated.owner_id))
  1086. eq_('this is an updated page', updated.label)
  1087. eq_('new content', updated.description)
  1088. eq_(ActionDescription.EDITION, updated.revision_type)
  1089. def test_update_no_change(self):
  1090. uapi = UserApi(
  1091. session=self.session,
  1092. config=self.app_config,
  1093. current_user=None,
  1094. )
  1095. group_api = GroupApi(
  1096. current_user=None,
  1097. session=self.session
  1098. )
  1099. groups = [group_api.get_one(Group.TIM_USER),
  1100. group_api.get_one(Group.TIM_MANAGER),
  1101. group_api.get_one(Group.TIM_ADMIN)]
  1102. user1 = uapi.create_user(
  1103. email='this.is@user',
  1104. groups=groups,
  1105. save_now=True,
  1106. )
  1107. workspace = WorkspaceApi(
  1108. current_user=user1,
  1109. session=self.session,
  1110. ).create_workspace(
  1111. 'test workspace',
  1112. save_now=True
  1113. )
  1114. user2 = uapi.create_user()
  1115. user2.email = 'this.is@another.user'
  1116. uapi.save(user2)
  1117. RoleApi(
  1118. current_user=user1,
  1119. session=self.session
  1120. ).create_one(
  1121. user2,
  1122. workspace,
  1123. UserRoleInWorkspace.CONTENT_MANAGER,
  1124. with_notif=False,
  1125. flush=True
  1126. )
  1127. api = ContentApi(
  1128. current_user=user1,
  1129. session=self.session,
  1130. config=self.app_config,
  1131. )
  1132. with self.session.no_autoflush:
  1133. page = api.create(
  1134. content_type=ContentType.Page,
  1135. workspace=workspace,
  1136. label="same_content",
  1137. do_save=False
  1138. )
  1139. page.description = "Same_content_here"
  1140. api.save(page, ActionDescription.CREATION, do_notify=True)
  1141. transaction.commit()
  1142. api2 = ContentApi(
  1143. current_user=user2,
  1144. session=self.session,
  1145. config=self.app_config,
  1146. )
  1147. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1148. with new_revision(
  1149. session=self.session,
  1150. tm=transaction.manager,
  1151. content=content2,
  1152. ):
  1153. with pytest.raises(SameValueError):
  1154. api2.update_content(
  1155. item=content2,
  1156. new_label='same_content',
  1157. new_content='Same_content_here'
  1158. )
  1159. api2.save(content2)
  1160. transaction.commit()
  1161. def test_update_file_data(self):
  1162. uapi = UserApi(
  1163. session=self.session,
  1164. config=self.app_config,
  1165. current_user=None,
  1166. )
  1167. group_api = GroupApi(
  1168. current_user=None,
  1169. session=self.session
  1170. )
  1171. groups = [group_api.get_one(Group.TIM_USER),
  1172. group_api.get_one(Group.TIM_MANAGER),
  1173. group_api.get_one(Group.TIM_ADMIN)]
  1174. user1 = uapi.create_user(
  1175. email='this.is@user',
  1176. groups=groups,
  1177. save_now=True
  1178. )
  1179. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1180. workspace = workspace_api.create_workspace(
  1181. 'test workspace',
  1182. save_now=True
  1183. )
  1184. wid = workspace.workspace_id
  1185. user2 = uapi.create_user()
  1186. user2.email = 'this.is@another.user'
  1187. uapi.save(user2)
  1188. RoleApi(
  1189. current_user=user1,
  1190. session=self.session,
  1191. ).create_one(
  1192. user2,
  1193. workspace,
  1194. UserRoleInWorkspace.CONTENT_MANAGER,
  1195. with_notif=True,
  1196. flush=True
  1197. )
  1198. # Test starts here
  1199. api = ContentApi(
  1200. current_user=user1,
  1201. session=self.session,
  1202. config=self.app_config,
  1203. )
  1204. p = api.create(ContentType.File, workspace, None,
  1205. 'this_is_a_page', True)
  1206. u1id = user1.user_id
  1207. u2id = user2.user_id
  1208. pcid = p.content_id
  1209. poid = p.owner_id
  1210. api.save(p)
  1211. transaction.commit()
  1212. # Refresh instances after commit
  1213. user1 = uapi.get_one(u1id)
  1214. workspace_api2 = WorkspaceApi(current_user=user1, session=self.session)
  1215. workspace = workspace_api2.get_one(wid)
  1216. api = ContentApi(
  1217. current_user=user1,
  1218. session=self.session,
  1219. config=self.app_config,
  1220. )
  1221. content = api.get_one(pcid, ContentType.Any, workspace)
  1222. eq_(u1id, content.owner_id)
  1223. eq_(poid, content.owner_id)
  1224. u2 = UserApi(
  1225. current_user=None,
  1226. session=self.session,
  1227. config=self.app_config,
  1228. ).get_one(u2id)
  1229. api2 = ContentApi(
  1230. current_user=u2,
  1231. session=self.session,
  1232. config=self.app_config,
  1233. )
  1234. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1235. with new_revision(
  1236. session=self.session,
  1237. tm=transaction.manager,
  1238. content=content2,
  1239. ):
  1240. api2.update_file_data(
  1241. content2,
  1242. 'index.html',
  1243. 'text/html',
  1244. b'<html>hello world</html>'
  1245. )
  1246. api2.save(content2)
  1247. transaction.commit()
  1248. # Refresh instances after commit
  1249. user1 = uapi.get_one(u1id)
  1250. workspace = WorkspaceApi(
  1251. current_user=user1,
  1252. session=self.session,
  1253. ).get_one(wid)
  1254. updated = api.get_one(pcid, ContentType.Any, workspace)
  1255. eq_(u2id, updated.owner_id,
  1256. 'the owner id should be {} (found {})'.format(u2id,
  1257. updated.owner_id))
  1258. eq_('this_is_a_page.html', updated.file_name)
  1259. eq_('text/html', updated.file_mimetype)
  1260. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1261. eq_(ActionDescription.REVISION, updated.revision_type)
  1262. def test_update_no_change(self):
  1263. uapi = UserApi(
  1264. session=self.session,
  1265. config=self.app_config,
  1266. current_user=None,
  1267. )
  1268. group_api = GroupApi(
  1269. current_user=None,
  1270. session=self.session,
  1271. )
  1272. groups = [group_api.get_one(Group.TIM_USER),
  1273. group_api.get_one(Group.TIM_MANAGER),
  1274. group_api.get_one(Group.TIM_ADMIN)]
  1275. user1 = uapi.create_user(
  1276. email='this.is@user',
  1277. groups=groups,
  1278. save_now=True,
  1279. )
  1280. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1281. workspace = workspace_api.create_workspace(
  1282. 'test workspace',
  1283. save_now=True
  1284. )
  1285. user2 = uapi.create_user()
  1286. user2.email = 'this.is@another.user'
  1287. uapi.save(user2)
  1288. RoleApi(
  1289. current_user=user1,
  1290. session=self.session,
  1291. ).create_one(
  1292. user2,
  1293. workspace,
  1294. UserRoleInWorkspace.CONTENT_MANAGER,
  1295. with_notif=False,
  1296. flush=True
  1297. )
  1298. api = ContentApi(
  1299. current_user=user1,
  1300. session=self.session,
  1301. config=self.app_config,
  1302. )
  1303. with self.session.no_autoflush:
  1304. page = api.create(
  1305. content_type=ContentType.Page,
  1306. workspace=workspace,
  1307. label="same_content",
  1308. do_save=False
  1309. )
  1310. api.update_file_data(
  1311. page,
  1312. 'index.html',
  1313. 'text/html',
  1314. b'<html>Same Content Here</html>'
  1315. )
  1316. api.save(page, ActionDescription.CREATION, do_notify=True)
  1317. transaction.commit()
  1318. api2 = ContentApi(
  1319. current_user=user2,
  1320. session=self.session,
  1321. config=self.app_config,
  1322. )
  1323. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1324. with new_revision(
  1325. session=self.session,
  1326. tm=transaction.manager,
  1327. content=content2,
  1328. ):
  1329. with pytest.raises(SameValueError):
  1330. api2.update_file_data(
  1331. page,
  1332. 'index.html',
  1333. 'text/html',
  1334. b'<html>Same Content Here</html>'
  1335. )
  1336. api2.save(content2)
  1337. transaction.commit()
  1338. def test_archive_unarchive(self):
  1339. uapi = UserApi(
  1340. session=self.session,
  1341. config=self.app_config,
  1342. current_user=None,
  1343. )
  1344. group_api = GroupApi(current_user=None, session=self.session)
  1345. groups = [group_api.get_one(Group.TIM_USER),
  1346. group_api.get_one(Group.TIM_MANAGER),
  1347. group_api.get_one(Group.TIM_ADMIN)]
  1348. user1 = uapi.create_user(
  1349. email='this.is@user',
  1350. groups=groups,
  1351. save_now=True
  1352. )
  1353. u1id = user1.user_id
  1354. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1355. workspace = workspace_api.create_workspace(
  1356. 'test workspace',
  1357. save_now=True
  1358. )
  1359. wid = workspace.workspace_id
  1360. user2 = uapi.create_user()
  1361. user2.email = 'this.is@another.user'
  1362. uapi.save(user2)
  1363. RoleApi(
  1364. current_user=user1,
  1365. session=self.session
  1366. ).create_one(
  1367. user2,
  1368. workspace,
  1369. UserRoleInWorkspace.CONTENT_MANAGER,
  1370. with_notif=True,
  1371. flush=True
  1372. )
  1373. # show archived is used at the top end of the test
  1374. api = ContentApi(
  1375. current_user=user1,
  1376. session=self.session,
  1377. show_archived=True,
  1378. config=self.app_config,
  1379. )
  1380. p = api.create(ContentType.File, workspace, None,
  1381. 'this_is_a_page', True)
  1382. u1id = user1.user_id
  1383. u2id = user2.user_id
  1384. pcid = p.content_id
  1385. poid = p.owner_id
  1386. transaction.commit()
  1387. ####
  1388. # refresh after commit
  1389. user1 = UserApi(
  1390. current_user=None,
  1391. config=self.app_config,
  1392. session=self.session
  1393. ).get_one(u1id)
  1394. workspace = WorkspaceApi(
  1395. current_user=user1,
  1396. session=self.session
  1397. ).get_one(wid)
  1398. content = api.get_one(pcid, ContentType.Any, workspace)
  1399. eq_(u1id, content.owner_id)
  1400. eq_(poid, content.owner_id)
  1401. u2api = UserApi(
  1402. session=self.session,
  1403. config=self.app_config,
  1404. current_user=None,
  1405. )
  1406. u2 = u2api.get_one(u2id)
  1407. api2 = ContentApi(
  1408. current_user=u2,
  1409. session=self.session,
  1410. config=self.app_config,
  1411. show_archived=True,
  1412. )
  1413. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1414. with new_revision(
  1415. session=self.session,
  1416. tm=transaction.manager,
  1417. content=content2,
  1418. ):
  1419. api2.archive(content2)
  1420. api2.save(content2)
  1421. transaction.commit()
  1422. # refresh after commit
  1423. user1 = UserApi(
  1424. current_user=None,
  1425. session=self.session,
  1426. config=self.app_config,
  1427. ).get_one(u1id)
  1428. workspace = WorkspaceApi(
  1429. current_user=user1,
  1430. session=self.session,
  1431. ).get_one(wid)
  1432. u2 = UserApi(
  1433. current_user=None,
  1434. session=self.session,
  1435. config=self.app_config,
  1436. ).get_one(u2id)
  1437. api = ContentApi(
  1438. current_user=user1,
  1439. session=self.session,
  1440. config=self.app_config,
  1441. show_archived=True,
  1442. )
  1443. api2 = ContentApi(
  1444. current_user=u2,
  1445. session=self.session,
  1446. config=self.app_config,
  1447. show_archived=True,
  1448. )
  1449. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1450. eq_(u2id, updated.owner_id,
  1451. 'the owner id should be {} (found {})'.format(u2id,
  1452. updated.owner_id))
  1453. eq_(True, updated.is_archived)
  1454. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1455. ####
  1456. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1457. with new_revision(
  1458. session=self.session,
  1459. tm=transaction.manager,
  1460. content=updated,
  1461. ):
  1462. api.unarchive(updated)
  1463. api.save(updated2)
  1464. eq_(False, updated2.is_archived)
  1465. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1466. eq_(u1id, updated2.owner_id)
  1467. def test_delete_undelete(self):
  1468. uapi = UserApi(
  1469. session=self.session,
  1470. config=self.app_config,
  1471. current_user=None,
  1472. )
  1473. group_api = GroupApi(
  1474. current_user=None,
  1475. session=self.session
  1476. )
  1477. groups = [group_api.get_one(Group.TIM_USER),
  1478. group_api.get_one(Group.TIM_MANAGER),
  1479. group_api.get_one(Group.TIM_ADMIN)]
  1480. user1 = uapi.create_user(
  1481. email='this.is@user',
  1482. groups=groups,
  1483. save_now=True
  1484. )
  1485. u1id = user1.user_id
  1486. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1487. workspace = workspace_api.create_workspace(
  1488. 'test workspace',
  1489. save_now=True
  1490. )
  1491. wid = workspace.workspace_id
  1492. user2 = uapi.create_user()
  1493. user2.email = 'this.is@another.user'
  1494. uapi.save(user2)
  1495. RoleApi(
  1496. current_user=user1,
  1497. session=self.session
  1498. ).create_one(
  1499. user2,
  1500. workspace,
  1501. UserRoleInWorkspace.CONTENT_MANAGER,
  1502. with_notif=True,
  1503. flush=True
  1504. )
  1505. # show archived is used at the top end of the test
  1506. api = ContentApi(
  1507. current_user=user1,
  1508. session=self.session,
  1509. config=self.app_config,
  1510. show_deleted=True,
  1511. )
  1512. p = api.create(ContentType.File, workspace, None,
  1513. 'this_is_a_page', True)
  1514. u1id = user1.user_id
  1515. u2id = user2.user_id
  1516. pcid = p.content_id
  1517. poid = p.owner_id
  1518. transaction.commit()
  1519. ####
  1520. user1 = UserApi(
  1521. current_user=None,
  1522. session=self.session,
  1523. config=self.app_config,
  1524. ).get_one(u1id)
  1525. workspace = WorkspaceApi(
  1526. current_user=user1,
  1527. session=self.session,
  1528. ).get_one(wid)
  1529. content = api.get_one(pcid, ContentType.Any, workspace)
  1530. eq_(u1id, content.owner_id)
  1531. eq_(poid, content.owner_id)
  1532. u2 = UserApi(
  1533. current_user=None,
  1534. session=self.session,
  1535. config=self.app_config,
  1536. ).get_one(u2id)
  1537. api2 = ContentApi(
  1538. current_user=u2,
  1539. session=self.session,
  1540. config=self.app_config,
  1541. show_deleted=True,
  1542. )
  1543. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1544. with new_revision(
  1545. session=self.session,
  1546. tm=transaction.manager,
  1547. content=content2,
  1548. ):
  1549. api2.delete(content2)
  1550. api2.save(content2)
  1551. transaction.commit()
  1552. ####
  1553. user1 = UserApi(
  1554. current_user=None,
  1555. session=self.session,
  1556. config=self.app_config,
  1557. ).get_one(u1id)
  1558. workspace = WorkspaceApi(
  1559. current_user=user1,
  1560. session=self.session,
  1561. ).get_one(wid)
  1562. # show archived is used at the top end of the test
  1563. api = ContentApi(
  1564. current_user=user1,
  1565. session=self.session,
  1566. config=self.app_config,
  1567. show_deleted=True,
  1568. )
  1569. u2 = UserApi(
  1570. current_user=None,
  1571. session=self.session,
  1572. config=self.app_config,
  1573. ).get_one(u2id)
  1574. api2 = ContentApi(
  1575. current_user=u2,
  1576. session=self.session,
  1577. config=self.app_config,
  1578. show_deleted=True
  1579. )
  1580. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1581. eq_(u2id, updated.owner_id,
  1582. 'the owner id should be {} (found {})'.format(u2id,
  1583. updated.owner_id))
  1584. eq_(True, updated.is_deleted)
  1585. eq_(ActionDescription.DELETION, updated.revision_type)
  1586. ####
  1587. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1588. with new_revision(
  1589. tm=transaction.manager,
  1590. session=self.session,
  1591. content=updated2,
  1592. ):
  1593. api.undelete(updated2)
  1594. api.save(updated2)
  1595. eq_(False, updated2.is_deleted)
  1596. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1597. eq_(u1id, updated2.owner_id)
  1598. def test_search_in_label(self):
  1599. # HACK - D.A. - 2015-03-09
  1600. # This test is based on a bug which does NOT return results found
  1601. # at root of a workspace (eg a folder)
  1602. uapi = UserApi(
  1603. session=self.session,
  1604. config=self.app_config,
  1605. current_user=None,
  1606. )
  1607. group_api = GroupApi(
  1608. current_user=None,
  1609. session=self.session,
  1610. )
  1611. groups = [group_api.get_one(Group.TIM_USER),
  1612. group_api.get_one(Group.TIM_MANAGER),
  1613. group_api.get_one(Group.TIM_ADMIN)]
  1614. user = uapi.create_user(email='this.is@user',
  1615. groups=groups, save_now=True)
  1616. workspace = WorkspaceApi(
  1617. current_user=user,
  1618. session=self.session
  1619. ).create_workspace(
  1620. 'test workspace',
  1621. save_now=True
  1622. )
  1623. api = ContentApi(
  1624. current_user=user,
  1625. session=self.session,
  1626. config=self.app_config,
  1627. )
  1628. a = api.create(ContentType.Folder, workspace, None,
  1629. 'this is randomized folder', True)
  1630. p = api.create(ContentType.Page, workspace, a,
  1631. 'this is randomized label content', True)
  1632. with new_revision(
  1633. session=self.session,
  1634. tm=transaction.manager,
  1635. content=p,
  1636. ):
  1637. p.description = 'This is some amazing test'
  1638. api.save(p)
  1639. original_id = p.content_id
  1640. res = api.search(['randomized'])
  1641. eq_(1, len(res.all()))
  1642. item = res.all()[0]
  1643. eq_(original_id, item.content_id)
  1644. def test_search_in_description(self):
  1645. # HACK - D.A. - 2015-03-09
  1646. # This test is based on a bug which does NOT return results found
  1647. # at root of a workspace (eg a folder)
  1648. uapi = UserApi(
  1649. session=self.session,
  1650. config=self.app_config,
  1651. current_user=None,
  1652. )
  1653. group_api = GroupApi(
  1654. current_user=None,
  1655. session=self.session,
  1656. )
  1657. groups = [group_api.get_one(Group.TIM_USER),
  1658. group_api.get_one(Group.TIM_MANAGER),
  1659. group_api.get_one(Group.TIM_ADMIN)]
  1660. user = uapi.create_user(email='this.is@user',
  1661. groups=groups, save_now=True)
  1662. workspace = WorkspaceApi(
  1663. current_user=user,
  1664. session=self.session
  1665. ).create_workspace(
  1666. 'test workspace',
  1667. save_now=True,
  1668. )
  1669. api = ContentApi(
  1670. current_user=user,
  1671. session=self.session,
  1672. config=self.app_config,
  1673. )
  1674. a = api.create(ContentType.Folder, workspace, None,
  1675. 'this is randomized folder', True)
  1676. p = api.create(ContentType.Page, workspace, a,
  1677. 'this is dummy label content', True)
  1678. with new_revision(
  1679. tm=transaction.manager,
  1680. session=self.session,
  1681. content=p,
  1682. ):
  1683. p.description = 'This is some amazing test'
  1684. api.save(p)
  1685. original_id = p.content_id
  1686. res = api.search(['dummy'])
  1687. eq_(1, len(res.all()))
  1688. item = res.all()[0]
  1689. eq_(original_id, item.content_id)
  1690. def test_search_in_label_or_description(self):
  1691. # HACK - D.A. - 2015-03-09
  1692. # This test is based on a bug which does NOT return results found
  1693. # at root of a workspace (eg a folder)
  1694. uapi = UserApi(
  1695. session=self.session,
  1696. config=self.app_config,
  1697. current_user=None,
  1698. )
  1699. group_api = GroupApi(current_user=None, session=self.session)
  1700. groups = [group_api.get_one(Group.TIM_USER),
  1701. group_api.get_one(Group.TIM_MANAGER),
  1702. group_api.get_one(Group.TIM_ADMIN)]
  1703. user = uapi.create_user(email='this.is@user',
  1704. groups=groups, save_now=True)
  1705. workspace = WorkspaceApi(
  1706. current_user=user,
  1707. session=self.session
  1708. ).create_workspace('test workspace', save_now=True)
  1709. api = ContentApi(
  1710. current_user=user,
  1711. session=self.session,
  1712. config=self.app_config,
  1713. )
  1714. a = api.create(ContentType.Folder, workspace, None,
  1715. 'this is randomized folder', True)
  1716. p1 = api.create(ContentType.Page, workspace, a,
  1717. 'this is dummy label content', True)
  1718. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1719. with new_revision(
  1720. session=self.session,
  1721. tm=transaction.manager,
  1722. content=p1,
  1723. ):
  1724. p1.description = 'This is some amazing test'
  1725. with new_revision(
  1726. session=self.session,
  1727. tm=transaction.manager,
  1728. content=p2,
  1729. ):
  1730. p2.description = 'What\'s up ?'
  1731. api.save(p1)
  1732. api.save(p2)
  1733. id1 = p1.content_id
  1734. id2 = p2.content_id
  1735. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1736. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1737. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1738. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1739. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1740. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
  1741. res = api.search(['dummy', 'jon'])
  1742. eq_(2, len(res.all()))
  1743. eq_(True, id1 in [o.content_id for o in res.all()])
  1744. eq_(True, id2 in [o.content_id for o in res.all()])
  1745. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1746. admin = self.session.query(User)\
  1747. .filter(User.email == 'admin@admin.admin').one()
  1748. workspace = self._create_workspace_and_test(
  1749. 'workspace_1',
  1750. admin
  1751. )
  1752. folder_1 = self._create_content_and_test(
  1753. 'folder_1',
  1754. workspace=workspace,
  1755. type=ContentType.Folder
  1756. )
  1757. folder_2 = self._create_content_and_test(
  1758. 'folder_2',
  1759. workspace=workspace,
  1760. type=ContentType.Folder
  1761. )
  1762. page_1 = self._create_content_and_test(
  1763. 'foo', workspace=workspace,
  1764. type=ContentType.Page,
  1765. parent=folder_1
  1766. )
  1767. page_2 = self._create_content_and_test(
  1768. 'bar',
  1769. workspace=workspace,
  1770. type=ContentType.Page,
  1771. parent=folder_2
  1772. )
  1773. api = ContentApi(
  1774. current_user=admin,
  1775. session=self.session,
  1776. config=self.app_config,
  1777. )
  1778. foo_result = api.search(['foo']).all()
  1779. eq_(1, len(foo_result))
  1780. assert page_1 in foo_result
  1781. bar_result = api.search(['bar']).all()
  1782. eq_(1, len(bar_result))
  1783. assert page_2 in bar_result
  1784. with new_revision(
  1785. session=self.session,
  1786. tm=transaction.manager,
  1787. content=folder_1,
  1788. ):
  1789. api.delete(folder_1)
  1790. with new_revision(
  1791. session=self.session,
  1792. tm=transaction.manager,
  1793. content=folder_2,
  1794. ):
  1795. api.archive(folder_2)
  1796. # Actually ContentApi.search don't filter it
  1797. foo_result = api.search(['foo']).all()
  1798. eq_(1, len(foo_result))
  1799. assert page_1 in foo_result
  1800. bar_result = api.search(['bar']).all()
  1801. eq_(1, len(bar_result))
  1802. assert page_2 in bar_result
  1803. # ContentApi offer exclude_unavailable method to do it
  1804. foo_result = api.search(['foo']).all()
  1805. api.exclude_unavailable(foo_result)
  1806. eq_(0, len(foo_result))
  1807. bar_result = api.search(['bar']).all()
  1808. api.exclude_unavailable(bar_result)
  1809. eq_(0, len(bar_result))
  1810. class TestContentApiSecurity(DefaultTest):
  1811. fixtures = [FixtureTest, ]
  1812. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1813. admin = self.session.query(User)\
  1814. .filter(User.email == 'admin@admin.admin').one()
  1815. bob = self.session.query(User)\
  1816. .filter(User.email == 'bob@fsf.local').one()
  1817. bob_workspace = WorkspaceApi(
  1818. current_user=bob,
  1819. session=self.session,
  1820. ).create_workspace(
  1821. 'bob_workspace',
  1822. save_now=True,
  1823. )
  1824. admin_workspace = WorkspaceApi(
  1825. current_user=admin,
  1826. session=self.session,
  1827. ).create_workspace(
  1828. 'admin_workspace',
  1829. save_now=True,
  1830. )
  1831. bob_page = ContentApi(
  1832. current_user=bob,
  1833. session=self.session,
  1834. config=self.app_config,
  1835. ).create(
  1836. content_type=ContentType.Page,
  1837. workspace=bob_workspace,
  1838. label='bob_page',
  1839. do_save=True,
  1840. )
  1841. admin_page = ContentApi(
  1842. current_user=admin,
  1843. session=self.session,
  1844. config=self.app_config,
  1845. ).create(
  1846. content_type=ContentType.Page,
  1847. workspace=admin_workspace,
  1848. label='admin_page',
  1849. do_save=True,
  1850. )
  1851. bob_viewable = ContentApi(
  1852. current_user=bob,
  1853. session=self.session,
  1854. config=self.app_config,
  1855. ).get_all()
  1856. eq_(1, len(bob_viewable), 'Bob should view only one content')
  1857. eq_(
  1858. 'bob_page',
  1859. bob_viewable[0].label,
  1860. 'Bob should not view "{0}" content'.format(
  1861. bob_viewable[0].label,
  1862. )
  1863. )