test_content_api.py 62KB


  1. # -*- coding: utf-8 -*-
  2. from nose.tools import eq_, ok_
  3. from nose.tools import raises
  4. import transaction
  5. from tracim.config import CFG
  6. from tracim.lib.core.content import compare_content_for_sorting_by_type_and_name
  7. from tracim.lib.core.content import ContentApi
  8. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  9. from tracim.lib.core.group import GroupApi
  10. from tracim.lib.core.user import UserApi
  11. from tracim.exceptions import SameValueError
  12. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  13. from tracim.lib.core.workspace import RoleApi
  14. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  15. from tracim.lib.core.workspace import WorkspaceApi
  16. from tracim.models.revision_protection import new_revision
  17. from tracim.models.auth import User
  18. from tracim.models.auth import Group
  19. from tracim.models.data import ActionDescription
  20. from tracim.models.data import ContentRevisionRO
  21. from tracim.models.data import Workspace
  22. from tracim.models.data import Content
  23. from tracim.models.data import ContentType
  24. from tracim.models.data import UserRoleInWorkspace
  25. from tracim.fixtures.users_and_groups import Test as FixtureTest
  26. from tracim.tests import DefaultTest
  27. class TestContentApi(DefaultTest):
  28. def test_compare_content_for_sorting_by_type(self):
  29. c1 = Content()
  30. c1.label = ''
  31. c1.type = 'file'
  32. c2 = Content()
  33. c2.label = ''
  34. c2.type = 'folder'
  35. c11 = c1
  36. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  37. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  38. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  39. def test_compare_content_for_sorting_by_label(self):
  40. c1 = Content()
  41. c1.label = 'bbb'
  42. c1.type = 'file'
  43. c2 = Content()
  44. c2.label = 'aaa'
  45. c2.type = 'file'
  46. c11 = c1
  47. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  48. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  49. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  50. def test_sort_by_label_or_filename(self):
  51. c1 = Content()
  52. c1.label = 'ABCD'
  53. c1.type = 'file'
  54. c2 = Content()
  55. c2.label = ''
  56. c2.type = 'file'
  57. c2.file_name = 'AABC'
  58. c3 = Content()
  59. c3.label = 'BCDE'
  60. c3.type = 'file'
  61. items = [c1, c2, c3]
  62. sorteds = ContentApi.sort_content(items)
  63. eq_(sorteds[0], c2)
  64. eq_(sorteds[1], c1)
  65. eq_(sorteds[2], c3)
  66. def test_sort_by_content_type(self):
  67. c1 = Content()
  68. c1.label = 'AAAA'
  69. c1.type = 'file'
  70. c2 = Content()
  71. c2.label = 'BBBB'
  72. c2.type = 'folder'
  73. items = [c1, c2]
  74. sorteds = ContentApi.sort_content(items)
  75. eq_(sorteds[0], c2,
  76. 'value is {} instead of {}'.format(sorteds[0].content_id,
  77. c2.content_id))
  78. eq_(sorteds[1], c1,
  79. 'value is {} instead of {}'.format(sorteds[1].content_id,
  80. c1.content_id))
  81. def test_delete(self):
  82. uapi = UserApi(
  83. session=self.session,
  84. config=CFG(self.config.get_settings()),
  85. current_user=None,
  86. )
  87. group_api = GroupApi(current_user=None,session=self.session)
  88. groups = [group_api.get_one(Group.TIM_USER),
  89. group_api.get_one(Group.TIM_MANAGER),
  90. group_api.get_one(Group.TIM_ADMIN)]
  91. user = uapi.create_user(email='this.is@user',
  92. groups=groups, save_now=True)
  93. workspace = WorkspaceApi(
  94. current_user=user,
  95. session=self.session
  96. ).create_workspace('test workspace', save_now=True)
  97. api = ContentApi(
  98. current_user=user,
  99. session=self.session,
  100. )
  101. item = api.create(ContentType.Folder, workspace, None,
  102. 'not_deleted', True)
  103. item2 = api.create(ContentType.Folder, workspace, None,
  104. 'to_delete', True)
  105. uid = user.user_id
  106. wid = workspace.workspace_id
  107. transaction.commit()
  108. # Refresh instances after commit
  109. user = uapi.get_one(uid)
  110. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  111. workspace = workspace_api.get_one(wid)
  112. api = ContentApi(current_user=user, session=self.session)
  113. items = api.get_all(None, ContentType.Any, workspace)
  114. eq_(2, len(items))
  115. items = api.get_all(None, ContentType.Any, workspace)
  116. with new_revision(
  117. session=self.session,
  118. tm=transaction.manager,
  119. content=items[0]
  120. ):
  121. api.delete(items[0])
  122. transaction.commit()
  123. # Refresh instances after commit
  124. user = uapi.get_one(uid)
  125. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  126. workspace = workspace_api.get_one(wid)
  127. api = ContentApi(
  128. current_user=user,
  129. session=self.session
  130. )
  131. items = api.get_all(None, ContentType.Any, workspace)
  132. eq_(1, len(items))
  133. transaction.commit()
  134. # Test that the item is still available if "show deleted" is activated
  135. # Refresh instances after commit
  136. user = uapi.get_one(uid)
  137. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  138. api = ContentApi(
  139. current_user=user,
  140. session=self.session,
  141. show_deleted=True
  142. )
  143. items = api.get_all(None, ContentType.Any, workspace)
  144. eq_(2, len(items))
  145. def test_archive(self):
  146. uapi = UserApi(
  147. session=self.session,
  148. config=CFG(self.config.get_settings()),
  149. current_user=None,
  150. )
  151. group_api = GroupApi(current_user=None, session=self.session)
  152. groups = [group_api.get_one(Group.TIM_USER),
  153. group_api.get_one(Group.TIM_MANAGER),
  154. group_api.get_one(Group.TIM_ADMIN)]
  155. user = uapi.create_user(email='this.is@user',
  156. groups=groups, save_now=True)
  157. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  158. workspace = workspace_api.create_workspace(
  159. 'test workspace',
  160. save_now=True
  161. )
  162. api = ContentApi(
  163. current_user=user,
  164. session=self.session,
  165. )
  166. item = api.create(ContentType.Folder, workspace, None,
  167. 'not_archived', True)
  168. item2 = api.create(ContentType.Folder, workspace, None,
  169. 'to_archive', True)
  170. uid = user.user_id
  171. wid = workspace.workspace_id
  172. transaction.commit()
  173. # Refresh instances after commit
  174. user = uapi.get_one(uid)
  175. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  176. api = ContentApi(
  177. session=self.session,
  178. current_user=user,
  179. )
  180. items = api.get_all(None, ContentType.Any, workspace)
  181. eq_(2, len(items))
  182. items = api.get_all(None, ContentType.Any, workspace)
  183. with new_revision(
  184. session=self.session,
  185. tm=transaction.manager,
  186. content=items[0],
  187. ):
  188. api.archive(items[0])
  189. transaction.commit()
  190. # Refresh instances after commit
  191. user = uapi.get_one(uid)
  192. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  193. workspace = workspace_api.get_one(wid)
  194. api = ContentApi(
  195. current_user=user,
  196. session=self.session
  197. )
  198. items = api.get_all(None, ContentType.Any, workspace)
  199. eq_(1, len(items))
  200. transaction.commit()
  201. # Refresh instances after commit
  202. user = uapi.get_one(uid)
  203. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  204. workspace = workspace_api.get_one(wid)
  205. api = ContentApi(
  206. current_user=user,
  207. session=self.session
  208. )
  209. # Test that the item is still available if "show deleted" is activated
  210. api = ContentApi(
  211. current_user=None,
  212. session=self.session,
  213. show_archived=True
  214. )
  215. items = api.get_all(None, ContentType.Any, workspace)
  216. eq_(2, len(items))
  217. def test_get_all_with_filter(self):
  218. uapi = UserApi(
  219. session=self.session,
  220. config=CFG(self.config.get_settings()),
  221. current_user=None,
  222. )
  223. group_api = GroupApi(current_user=None, session=self.session)
  224. groups = [group_api.get_one(Group.TIM_USER),
  225. group_api.get_one(Group.TIM_MANAGER),
  226. group_api.get_one(Group.TIM_ADMIN)]
  227. user = uapi.create_user(
  228. email='this.is@user',
  229. groups=groups,
  230. save_now=True
  231. )
  232. workspace = WorkspaceApi(
  233. current_user=user,
  234. session=self.session
  235. ).create_workspace(
  236. 'test workspace',
  237. save_now=True
  238. )
  239. api = ContentApi(
  240. current_user=user,
  241. session=self.session,
  242. )
  243. item = api.create(ContentType.Folder, workspace, None,
  244. 'thefolder', True)
  245. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  246. uid = user.user_id
  247. wid = workspace.workspace_id
  248. transaction.commit()
  249. # Refresh instances after commit
  250. user = uapi.get_one(uid)
  251. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  252. workspace = workspace_api.get_one(wid)
  253. api = ContentApi(
  254. current_user=user,
  255. session=self.session
  256. )
  257. items = api.get_all(None, ContentType.Any, workspace)
  258. eq_(2, len(items))
  259. items2 = api.get_all(None, ContentType.File, workspace)
  260. eq_(1, len(items2))
  261. eq_('thefile', items2[0].label)
  262. items3 = api.get_all(None, ContentType.Folder, workspace)
  263. eq_(1, len(items3))
  264. eq_('thefolder', items3[0].label)
  265. def test_get_all_with_parent_id(self):
  266. uapi = UserApi(
  267. session=self.session,
  268. config=CFG(self.config.get_settings()),
  269. current_user=None,
  270. )
  271. group_api = GroupApi(current_user=None, session=self.session)
  272. groups = [group_api.get_one(Group.TIM_USER),
  273. group_api.get_one(Group.TIM_MANAGER),
  274. group_api.get_one(Group.TIM_ADMIN)]
  275. user = uapi.create_user(email='this.is@user',
  276. groups=groups, save_now=True)
  277. workspace = WorkspaceApi(
  278. current_user=user,
  279. session=self.session
  280. ).create_workspace('test workspace', save_now=True)
  281. api = ContentApi(
  282. current_user=user,
  283. session=self.session
  284. )
  285. item = api.create(
  286. ContentType.Folder,
  287. workspace,
  288. None,
  289. 'parent',
  290. do_save=True,
  291. )
  292. item2 = api.create(
  293. ContentType.File,
  294. workspace,
  295. item,
  296. 'file1',
  297. do_save=True,
  298. )
  299. item3 = api.create(
  300. ContentType.File,
  301. workspace,
  302. None,
  303. 'file2',
  304. do_save=True,
  305. )
  306. parent_id = item.content_id
  307. child_id = item2.content_id
  308. uid = user.user_id
  309. wid = workspace.workspace_id
  310. transaction.commit()
  311. # Refresh instances after commit
  312. user = uapi.get_one(uid)
  313. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  314. workspace = workspace_api.get_one(wid)
  315. api = ContentApi(
  316. current_user=user,
  317. session=self.session
  318. )
  319. items = api.get_all(None, ContentType.Any, workspace)
  320. eq_(3, len(items))
  321. items2 = api.get_all(parent_id, ContentType.File, workspace)
  322. eq_(1, len(items2))
  323. eq_(child_id, items2[0].content_id)
  324. @raises(ValueError)
  325. def test_set_status_unknown_status(self):
  326. uapi = UserApi(
  327. session=self.session,
  328. config=CFG(self.config.get_settings()),
  329. current_user=None,
  330. )
  331. group_api = GroupApi(
  332. current_user=None,
  333. session=self.session
  334. )
  335. groups = [group_api.get_one(Group.TIM_USER),
  336. group_api.get_one(Group.TIM_MANAGER),
  337. group_api.get_one(Group.TIM_ADMIN)]
  338. user = uapi.create_user(email='this.is@user',
  339. groups=groups, save_now=True)
  340. workspace = WorkspaceApi(
  341. current_user=user,
  342. session=self.session
  343. ).create_workspace(
  344. 'test workspace',
  345. save_now=True
  346. )
  347. api = ContentApi(
  348. current_user=user,
  349. session=self.session
  350. )
  351. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  352. with new_revision(
  353. session=self.session,
  354. tm=transaction.manager,
  355. content=c,
  356. ):
  357. api.set_status(c, 'unknown-status')
  358. def test_set_status_ok(self):
  359. uapi = UserApi(
  360. session=self.session,
  361. config=CFG(self.config.get_settings()),
  362. current_user=None,
  363. )
  364. group_api = GroupApi(
  365. current_user=None,
  366. session=self.session
  367. )
  368. groups = [group_api.get_one(Group.TIM_USER),
  369. group_api.get_one(Group.TIM_MANAGER),
  370. group_api.get_one(Group.TIM_ADMIN)]
  371. user = uapi.create_user(email='this.is@user',
  372. groups=groups, save_now=True)
  373. workspace = WorkspaceApi(
  374. current_user=user,
  375. session=self.session
  376. ).create_workspace(
  377. 'test workspace',
  378. save_now=True
  379. )
  380. api = ContentApi(
  381. current_user=user,
  382. session=self.session
  383. )
  384. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  385. with new_revision(
  386. session=self.session,
  387. tm=transaction.manager,
  388. content=c,
  389. ):
  390. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  391. 'closed-deprecated']:
  392. api.set_status(c, new_status)
  393. eq_(new_status, c.status)
  394. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  395. def test_create_comment_ok(self):
  396. uapi = UserApi(
  397. session=self.session,
  398. config=CFG(self.config.get_settings()),
  399. current_user=None,
  400. )
  401. group_api = GroupApi(
  402. current_user=None,
  403. session=self.session
  404. )
  405. groups = [group_api.get_one(Group.TIM_USER),
  406. group_api.get_one(Group.TIM_MANAGER),
  407. group_api.get_one(Group.TIM_ADMIN)]
  408. user = uapi.create_user(email='this.is@user',
  409. groups=groups, save_now=True)
  410. workspace = WorkspaceApi(
  411. current_user=user,
  412. session=self.session
  413. ).create_workspace(
  414. 'test workspace',
  415. save_now=True
  416. )
  417. api = ContentApi(
  418. current_user=user,
  419. session=self.session
  420. )
  421. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  422. c = api.create_comment(workspace, p, 'this is the comment', True)
  423. eq_(Content, c.__class__)
  424. eq_(p.content_id, c.parent_id)
  425. eq_(user, c.owner)
  426. eq_(workspace, c.workspace)
  427. eq_(ContentType.Comment, c.type)
  428. eq_('this is the comment', c.description)
  429. eq_('', c.label)
  430. eq_(ActionDescription.COMMENT, c.revision_type)
  431. def test_unit_copy_file_different_label_different_parent_ok(self):
  432. uapi = UserApi(
  433. session=self.session,
  434. config=CFG(self.config.get_settings()),
  435. current_user=None,
  436. )
  437. group_api = GroupApi(
  438. current_user=None,
  439. session=self.session
  440. )
  441. groups = [group_api.get_one(Group.TIM_USER),
  442. group_api.get_one(Group.TIM_MANAGER),
  443. group_api.get_one(Group.TIM_ADMIN)]
  444. user = uapi.create_user(
  445. email='user1@user',
  446. groups=groups,
  447. save_now=True
  448. )
  449. user2 = uapi.create_user(
  450. email='user2@user',
  451. groups=groups,
  452. save_now=True
  453. )
  454. workspace = WorkspaceApi(
  455. current_user=user,
  456. session=self.session
  457. ).create_workspace(
  458. 'test workspace',
  459. save_now=True
  460. )
  461. RoleApi(current_user=user, session=self.session).create_one(
  462. user2,
  463. workspace,
  464. UserRoleInWorkspace.WORKSPACE_MANAGER,
  465. with_notif=False
  466. )
  467. api = ContentApi(
  468. current_user=user,
  469. session=self.session
  470. )
  471. foldera = api.create(
  472. ContentType.Folder,
  473. workspace,
  474. None,
  475. 'folder a',
  476. True
  477. )
  478. with self.session.no_autoflush:
  479. text_file = api.create(
  480. content_type=ContentType.File,
  481. workspace=workspace,
  482. parent=foldera,
  483. label='test_file',
  484. do_save=False,
  485. )
  486. api.update_file_data(
  487. text_file,
  488. 'test_file',
  489. 'text/plain',
  490. b'test_content'
  491. )
  492. api.save(text_file, ActionDescription.CREATION)
  493. api2 = ContentApi(current_user=user2, session=self.session)
  494. workspace2 = WorkspaceApi(
  495. current_user=user2,
  496. session=self.session,
  497. ).create_workspace(
  498. 'test workspace2',
  499. save_now=True
  500. )
  501. folderb = api2.create(
  502. ContentType.Folder,
  503. workspace2,
  504. None,
  505. 'folder b',
  506. True
  507. )
  508. api2.copy(
  509. item=text_file,
  510. new_parent=folderb,
  511. new_label='test_file_copy'
  512. )
  513. transaction.commit()
  514. text_file_copy = api2.get_one_by_label_and_parent(
  515. 'test_file_copy',
  516. folderb,
  517. )
  518. assert text_file != text_file_copy
  519. assert text_file_copy.content_id != text_file.content_id
  520. assert text_file_copy.workspace_id == workspace2.workspace_id
  521. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  522. assert text_file_copy.depot_file.path != text_file.depot_file.path
  523. assert text_file_copy.label == 'test_file_copy'
  524. assert text_file_copy.type == text_file.type
  525. assert text_file_copy.parent.content_id == folderb.content_id
  526. assert text_file_copy.owner.user_id == user.user_id
  527. assert text_file_copy.description == text_file.description
  528. assert text_file_copy.file_extension == text_file.file_extension
  529. assert text_file_copy.file_mimetype == text_file.file_mimetype
  530. assert text_file_copy.revision_type == ActionDescription.COPY
  531. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  532. def test_unit_copy_file__same_label_different_parent_ok(self):
  533. uapi = UserApi(
  534. session=self.session,
  535. config=CFG(self.config.get_settings()),
  536. current_user=None,
  537. )
  538. group_api = GroupApi(
  539. current_user=None,
  540. session=self.session
  541. )
  542. groups = [group_api.get_one(Group.TIM_USER),
  543. group_api.get_one(Group.TIM_MANAGER),
  544. group_api.get_one(Group.TIM_ADMIN)]
  545. user = uapi.create_user(
  546. email='user1@user',
  547. groups=groups,
  548. save_now=True
  549. )
  550. user2 = uapi.create_user(
  551. email='user2@user',
  552. groups=groups,
  553. save_now=True
  554. )
  555. workspace = WorkspaceApi(
  556. current_user=user,
  557. session=self.session
  558. ).create_workspace(
  559. 'test workspace',
  560. save_now=True
  561. )
  562. RoleApi(current_user=user, session=self.session).create_one(
  563. user2,
  564. workspace,
  565. UserRoleInWorkspace.WORKSPACE_MANAGER,
  566. with_notif=False
  567. )
  568. api = ContentApi(
  569. current_user=user,
  570. session=self.session
  571. )
  572. foldera = api.create(
  573. ContentType.Folder,
  574. workspace,
  575. None,
  576. 'folder a',
  577. True
  578. )
  579. with self.session.no_autoflush:
  580. text_file = api.create(
  581. content_type=ContentType.File,
  582. workspace=workspace,
  583. parent=foldera,
  584. label='test_file',
  585. do_save=False,
  586. )
  587. api.update_file_data(
  588. text_file,
  589. 'test_file',
  590. 'text/plain',
  591. b'test_content'
  592. )
  593. api.save(text_file, ActionDescription.CREATION)
  594. api2 = ContentApi(
  595. current_user=user2,
  596. session=self.session,
  597. )
  598. workspace2 = WorkspaceApi(
  599. current_user=user2,
  600. session=self.session
  601. ).create_workspace(
  602. 'test workspace2',
  603. save_now=True
  604. )
  605. folderb = api2.create(
  606. ContentType.Folder,
  607. workspace2,
  608. None,
  609. 'folder b',
  610. True
  611. )
  612. api2.copy(
  613. item=text_file,
  614. new_parent=folderb,
  615. )
  616. transaction.commit()
  617. text_file_copy = api2.get_one_by_label_and_parent(
  618. 'test_file',
  619. folderb,
  620. )
  621. assert text_file != text_file_copy
  622. assert text_file_copy.content_id != text_file.content_id
  623. assert text_file_copy.workspace_id == workspace2.workspace_id
  624. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  625. assert text_file_copy.depot_file.path != text_file.depot_file.path
  626. assert text_file_copy.label == text_file.label
  627. assert text_file_copy.type == text_file.type
  628. assert text_file_copy.parent.content_id == folderb.content_id
  629. assert text_file_copy.owner.user_id == user.user_id
  630. assert text_file_copy.description == text_file.description
  631. assert text_file_copy.file_extension == text_file.file_extension
  632. assert text_file_copy.file_mimetype == text_file.file_mimetype
  633. assert text_file_copy.revision_type == ActionDescription.COPY
  634. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  635. def test_unit_copy_file_different_label_same_parent_ok(self):
  636. uapi = UserApi(
  637. session=self.session,
  638. config=CFG(self.config.get_settings()),
  639. current_user=None,
  640. )
  641. group_api = GroupApi(
  642. current_user=None,
  643. session=self.session
  644. )
  645. groups = [group_api.get_one(Group.TIM_USER),
  646. group_api.get_one(Group.TIM_MANAGER),
  647. group_api.get_one(Group.TIM_ADMIN)]
  648. user = uapi.create_user(
  649. email='user1@user',
  650. groups=groups,
  651. save_now=True,
  652. )
  653. user2 = uapi.create_user(
  654. email='user2@user',
  655. groups=groups,
  656. save_now=True
  657. )
  658. workspace = WorkspaceApi(
  659. current_user=user,
  660. session=self.session
  661. ).create_workspace(
  662. 'test workspace',
  663. save_now=True
  664. )
  665. RoleApi(current_user=user, session=self.session).create_one(
  666. user2, workspace,
  667. UserRoleInWorkspace.WORKSPACE_MANAGER,
  668. with_notif=False
  669. )
  670. api = ContentApi(
  671. current_user=user,
  672. session=self.session
  673. )
  674. foldera = api.create(
  675. ContentType.Folder,
  676. workspace,
  677. None,
  678. 'folder a',
  679. True
  680. )
  681. with self.session.no_autoflush:
  682. text_file = api.create(
  683. content_type=ContentType.File,
  684. workspace=workspace,
  685. parent=foldera,
  686. label='test_file',
  687. do_save=False,
  688. )
  689. api.update_file_data(
  690. text_file,
  691. 'test_file',
  692. 'text/plain',
  693. b'test_content'
  694. )
  695. api.save(
  696. text_file,
  697. ActionDescription.CREATION
  698. )
  699. api2 = ContentApi(current_user=user2, session=self.session)
  700. api2.copy(
  701. item=text_file,
  702. new_label='test_file_copy'
  703. )
  704. transaction.commit()
  705. text_file_copy = api2.get_one_by_label_and_parent(
  706. 'test_file_copy',
  707. foldera,
  708. )
  709. assert text_file != text_file_copy
  710. assert text_file_copy.content_id != text_file.content_id
  711. assert text_file_copy.workspace_id == workspace.workspace_id
  712. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  713. assert text_file_copy.depot_file.path != text_file.depot_file.path
  714. assert text_file_copy.label == 'test_file_copy'
  715. assert text_file_copy.type == text_file.type
  716. assert text_file_copy.parent.content_id == foldera.content_id
  717. assert text_file_copy.owner.user_id == user.user_id
  718. assert text_file_copy.description == text_file.description
  719. assert text_file_copy.file_extension == text_file.file_extension
  720. assert text_file_copy.file_mimetype == text_file.file_mimetype
  721. assert text_file_copy.revision_type == ActionDescription.COPY
  722. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  723. def test_mark_read__workspace(self):
  724. uapi = UserApi(
  725. session=self.session,
  726. config=CFG(self.config.get_settings()),
  727. current_user=None,
  728. )
  729. group_api = GroupApi(
  730. current_user=None,
  731. session=self.session
  732. )
  733. groups = [group_api.get_one(Group.TIM_USER),
  734. group_api.get_one(Group.TIM_MANAGER),
  735. group_api.get_one(Group.TIM_ADMIN)]
  736. user_a = uapi.create_user(email='this.is@user',
  737. groups=groups, save_now=True)
  738. user_b = uapi.create_user(email='this.is@another.user',
  739. groups=groups, save_now=True)
  740. wapi = WorkspaceApi(
  741. current_user=user_a,
  742. session=self.session,
  743. )
  744. workspace1 = wapi.create_workspace(
  745. 'test workspace n°1',
  746. save_now=True)
  747. workspace2 = wapi.create_workspace(
  748. 'test workspace n°2',
  749. save_now=True)
  750. role_api1 = RoleApi(
  751. current_user=user_a,
  752. session=self.session,
  753. )
  754. role_api1.create_one(
  755. user_b,
  756. workspace1,
  757. UserRoleInWorkspace.READER,
  758. False
  759. )
  760. role_api2 = RoleApi(
  761. current_user=user_b,
  762. session=self.session,
  763. )
  764. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  765. False)
  766. cont_api_a = ContentApi(
  767. current_user=user_a,
  768. session=self.session,
  769. )
  770. cont_api_b = ContentApi(
  771. current_user=user_b,
  772. session=self.session,
  773. )
  774. # Creates page_1 & page_2 in workspace 1
  775. # and page_3 & page_4 in workspace 2
  776. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  777. 'this is a page', do_save=True)
  778. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  779. 'this is page1', do_save=True)
  780. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  781. 'this is page2', do_save=True)
  782. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  783. 'this is page3', do_save=True)
  784. for rev in page_1.revisions:
  785. eq_(user_b not in rev.read_by.keys(), True)
  786. for rev in page_2.revisions:
  787. eq_(user_b not in rev.read_by.keys(), True)
  788. for rev in page_3.revisions:
  789. eq_(user_b not in rev.read_by.keys(), True)
  790. for rev in page_4.revisions:
  791. eq_(user_b not in rev.read_by.keys(), True)
  792. # Set as read the workspace n°1
  793. cont_api_b.mark_read__workspace(workspace=workspace1)
  794. for rev in page_1.revisions:
  795. eq_(user_b in rev.read_by.keys(), True)
  796. for rev in page_2.revisions:
  797. eq_(user_b in rev.read_by.keys(), True)
  798. for rev in page_3.revisions:
  799. eq_(user_b not in rev.read_by.keys(), True)
  800. for rev in page_4.revisions:
  801. eq_(user_b not in rev.read_by.keys(), True)
  802. # Set as read the workspace n°2
  803. cont_api_b.mark_read__workspace(workspace=workspace2)
  804. for rev in page_1.revisions:
  805. eq_(user_b in rev.read_by.keys(), True)
  806. for rev in page_2.revisions:
  807. eq_(user_b in rev.read_by.keys(), True)
  808. for rev in page_3.revisions:
  809. eq_(user_b in rev.read_by.keys(), True)
  810. for rev in page_4.revisions:
  811. eq_(user_b in rev.read_by.keys(), True)
  812. def test_mark_read(self):
  813. uapi = UserApi(
  814. session=self.session,
  815. config=CFG(self.config.get_settings()),
  816. current_user=None,
  817. )
  818. group_api = GroupApi(
  819. current_user=None,
  820. session=self.session
  821. )
  822. groups = [group_api.get_one(Group.TIM_USER),
  823. group_api.get_one(Group.TIM_MANAGER),
  824. group_api.get_one(Group.TIM_ADMIN)]
  825. user_a = uapi.create_user(
  826. email='this.is@user',
  827. groups=groups,
  828. save_now=True
  829. )
  830. user_b = uapi.create_user(
  831. email='this.is@another.user',
  832. groups=groups,
  833. save_now=True
  834. )
  835. wapi = WorkspaceApi(current_user=user_a, session=self.session)
  836. workspace_api = WorkspaceApi(
  837. current_user=user_a,
  838. session=self.session
  839. )
  840. workspace = wapi.create_workspace(
  841. 'test workspace',
  842. save_now=True)
  843. role_api = RoleApi(
  844. current_user=user_a,
  845. session=self.session,
  846. )
  847. role_api.create_one(
  848. user_b,
  849. workspace,
  850. UserRoleInWorkspace.READER,
  851. False
  852. )
  853. cont_api_a = ContentApi(
  854. current_user=user_a,
  855. session=self.session,
  856. )
  857. cont_api_b = ContentApi(
  858. current_user=user_b,
  859. session=self.session,
  860. )
  861. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  862. 'this is a page', do_save=True)
  863. for rev in page_1.revisions:
  864. eq_(user_b not in rev.read_by.keys(), True)
  865. cont_api_b.mark_read(page_1)
  866. for rev in page_1.revisions:
  867. eq_(user_b in rev.read_by.keys(), True)
  868. def test_mark_read__all(self):
  869. uapi = UserApi(
  870. session=self.session,
  871. config=CFG(self.config.get_settings()),
  872. current_user=None,
  873. )
  874. group_api = GroupApi(current_user=None, session=self.session)
  875. groups = [group_api.get_one(Group.TIM_USER),
  876. group_api.get_one(Group.TIM_MANAGER),
  877. group_api.get_one(Group.TIM_ADMIN)]
  878. user_a = uapi.create_user(
  879. email='this.is@user',
  880. groups=groups,
  881. save_now=True
  882. )
  883. user_b = uapi.create_user(
  884. email='this.is@another.user',
  885. groups=groups,
  886. save_now=True
  887. )
  888. wapi = WorkspaceApi(
  889. current_user=user_a,
  890. session=self.session,
  891. )
  892. workspace = wapi.create_workspace(
  893. 'test workspace',
  894. save_now=True)
  895. role_api = RoleApi(
  896. current_user=user_a,
  897. session=self.session,
  898. )
  899. role_api.create_one(
  900. user_b,
  901. workspace,
  902. UserRoleInWorkspace.READER,
  903. False
  904. )
  905. cont_api_a = ContentApi(
  906. current_user=user_a,
  907. session=self.session,
  908. )
  909. cont_api_b = ContentApi(
  910. current_user=user_b,
  911. session=self.session,
  912. )
  913. page_2 = cont_api_a.create(
  914. ContentType.Page,
  915. workspace,
  916. None,
  917. 'this is page1',
  918. do_save=True
  919. )
  920. page_3 = cont_api_a.create(
  921. ContentType.Thread,
  922. workspace,
  923. None,
  924. 'this is page2',
  925. do_save=True
  926. )
  927. page_4 = cont_api_a.create(
  928. ContentType.File,
  929. workspace,
  930. None,
  931. 'this is page3',
  932. do_save=True
  933. )
  934. for rev in page_2.revisions:
  935. eq_(user_b not in rev.read_by.keys(), True)
  936. for rev in page_3.revisions:
  937. eq_(user_b not in rev.read_by.keys(), True)
  938. for rev in page_4.revisions:
  939. eq_(user_b not in rev.read_by.keys(), True)
  940. self.session.refresh(page_2)
  941. self.session.refresh(page_3)
  942. self.session.refresh(page_4)
  943. cont_api_b.mark_read__all()
  944. for rev in page_2.revisions:
  945. eq_(user_b in rev.read_by.keys(), True)
  946. for rev in page_3.revisions:
  947. eq_(user_b in rev.read_by.keys(), True)
  948. for rev in page_4.revisions:
  949. eq_(user_b in rev.read_by.keys(), True)
  950. def test_update(self):
  951. uapi = UserApi(
  952. session=self.session,
  953. config=CFG(self.config.get_settings()),
  954. current_user=None,
  955. )
  956. group_api = GroupApi(current_user=None, session=self.session)
  957. groups = [group_api.get_one(Group.TIM_USER),
  958. group_api.get_one(Group.TIM_MANAGER),
  959. group_api.get_one(Group.TIM_ADMIN)]
  960. user1 = uapi.create_user(
  961. email='this.is@user',
  962. groups=groups,
  963. save_now=True
  964. )
  965. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  966. workspace = workspace_api.create_workspace(
  967. 'test workspace',
  968. save_now=True
  969. )
  970. wid = workspace.workspace_id
  971. user2 = uapi.create_user()
  972. user2.email = 'this.is@another.user'
  973. uapi.save(user2)
  974. RoleApi(
  975. current_user=user1,
  976. session=self.session
  977. ).create_one(
  978. user2,
  979. workspace,
  980. UserRoleInWorkspace.CONTENT_MANAGER,
  981. with_notif=False,
  982. flush=True
  983. )
  984. # Test starts here
  985. api = ContentApi(
  986. current_user=user1,
  987. session=self.session,
  988. )
  989. p = api.create(ContentType.Page, workspace, None,
  990. 'this_is_a_page', True)
  991. u1id = user1.user_id
  992. u2id = user2.user_id
  993. pcid = p.content_id
  994. poid = p.owner_id
  995. transaction.commit()
  996. # Refresh instances after commit
  997. user1 = uapi.get_one(u1id)
  998. workspace = WorkspaceApi(
  999. current_user=user1,
  1000. session=self.session
  1001. ).get_one(wid)
  1002. api = ContentApi(
  1003. current_user=user1,
  1004. session=self.session,
  1005. )
  1006. content = api.get_one(pcid, ContentType.Any, workspace)
  1007. eq_(u1id, content.owner_id)
  1008. eq_(poid, content.owner_id)
  1009. u2 = UserApi(
  1010. session=self.session,
  1011. config=self.config,
  1012. current_user=None,
  1013. ).get_one(u2id)
  1014. api2 = ContentApi(
  1015. current_user=u2,
  1016. session=self.session,
  1017. )
  1018. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1019. with new_revision(
  1020. session=self.session,
  1021. tm=transaction.manager,
  1022. content=content2,
  1023. ):
  1024. api2.update_content(
  1025. content2,
  1026. 'this is an updated page',
  1027. 'new content'
  1028. )
  1029. api2.save(content2)
  1030. transaction.commit()
  1031. # Refresh instances after commit
  1032. user1 = uapi.get_one(u1id)
  1033. workspace = WorkspaceApi(
  1034. current_user=user1,
  1035. session=self.session,
  1036. ).get_one(wid)
  1037. api = ContentApi(
  1038. current_user=user1,
  1039. session=self.session,
  1040. )
  1041. updated = api.get_one(pcid, ContentType.Any, workspace)
  1042. eq_(u2id, updated.owner_id,
  1043. 'the owner id should be {} (found {})'.format(u2id,
  1044. updated.owner_id))
  1045. eq_('this is an updated page', updated.label)
  1046. eq_('new content', updated.description)
  1047. eq_(ActionDescription.EDITION, updated.revision_type)
  1048. @raises(SameValueError)
  1049. def test_update_no_change(self):
  1050. uapi = UserApi(
  1051. session=self.session,
  1052. config=CFG(self.config.get_settings()),
  1053. current_user=None,
  1054. )
  1055. group_api = GroupApi(
  1056. current_user=None,
  1057. session=self.session
  1058. )
  1059. groups = [group_api.get_one(Group.TIM_USER),
  1060. group_api.get_one(Group.TIM_MANAGER),
  1061. group_api.get_one(Group.TIM_ADMIN)]
  1062. user1 = uapi.create_user(
  1063. email='this.is@user',
  1064. groups=groups,
  1065. save_now=True,
  1066. )
  1067. workspace = WorkspaceApi(
  1068. current_user=user1,
  1069. session=self.session,
  1070. ).create_workspace(
  1071. 'test workspace',
  1072. save_now=True
  1073. )
  1074. user2 = uapi.create_user()
  1075. user2.email = 'this.is@another.user'
  1076. uapi.save(user2)
  1077. RoleApi(
  1078. current_user=user1,
  1079. session=self.session
  1080. ).create_one(
  1081. user2,
  1082. workspace,
  1083. UserRoleInWorkspace.CONTENT_MANAGER,
  1084. with_notif=False,
  1085. flush=True
  1086. )
  1087. api = ContentApi(
  1088. current_user=user1,
  1089. session=self.session
  1090. )
  1091. with self.session.no_autoflush:
  1092. page = api.create(
  1093. content_type=ContentType.Page,
  1094. workspace=workspace,
  1095. label="same_content",
  1096. do_save=False
  1097. )
  1098. page.description = "Same_content_here"
  1099. api.save(page, ActionDescription.CREATION, do_notify=True)
  1100. transaction.commit()
  1101. api2 = ContentApi(
  1102. current_user=user2,
  1103. session=self.session,
  1104. )
  1105. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1106. with new_revision(
  1107. session=self.session,
  1108. tm=transaction.manager,
  1109. content=content2,
  1110. ):
  1111. api2.update_content(
  1112. item=content2,
  1113. new_label='same_content',
  1114. new_content='Same_content_here'
  1115. )
  1116. api2.save(content2)
  1117. transaction.commit()
  1118. def test_update_file_data(self):
  1119. uapi = UserApi(
  1120. session=self.session,
  1121. config=CFG(self.config.get_settings()),
  1122. current_user=None,
  1123. )
  1124. group_api = GroupApi(
  1125. current_user=None,
  1126. session=self.session
  1127. )
  1128. groups = [group_api.get_one(Group.TIM_USER),
  1129. group_api.get_one(Group.TIM_MANAGER),
  1130. group_api.get_one(Group.TIM_ADMIN)]
  1131. user1 = uapi.create_user(
  1132. email='this.is@user',
  1133. groups=groups,
  1134. save_now=True
  1135. )
  1136. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1137. workspace = workspace_api.create_workspace(
  1138. 'test workspace',
  1139. save_now=True
  1140. )
  1141. wid = workspace.workspace_id
  1142. user2 = uapi.create_user()
  1143. user2.email = 'this.is@another.user'
  1144. uapi.save(user2)
  1145. RoleApi(
  1146. current_user=user1,
  1147. session=self.session,
  1148. ).create_one(
  1149. user2,
  1150. workspace,
  1151. UserRoleInWorkspace.CONTENT_MANAGER,
  1152. with_notif=True,
  1153. flush=True
  1154. )
  1155. # Test starts here
  1156. api = ContentApi(
  1157. current_user=user1,
  1158. session=self.session
  1159. )
  1160. p = api.create(ContentType.File, workspace, None,
  1161. 'this_is_a_page', True)
  1162. u1id = user1.user_id
  1163. u2id = user2.user_id
  1164. pcid = p.content_id
  1165. poid = p.owner_id
  1166. api.save(p)
  1167. transaction.commit()
  1168. # Refresh instances after commit
  1169. user1 = uapi.get_one(u1id)
  1170. workspace_api2 = WorkspaceApi(current_user=user1, session=self.session)
  1171. workspace = workspace_api2.get_one(wid)
  1172. api = ContentApi(current_user=user1, session=self.session)
  1173. content = api.get_one(pcid, ContentType.Any, workspace)
  1174. eq_(u1id, content.owner_id)
  1175. eq_(poid, content.owner_id)
  1176. u2 = UserApi(
  1177. current_user=None,
  1178. session=self.session,
  1179. config=self.config,
  1180. ).get_one(u2id)
  1181. api2 = ContentApi(
  1182. current_user=u2,
  1183. session=self.session,
  1184. )
  1185. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1186. with new_revision(
  1187. session=self.session,
  1188. tm=transaction.manager,
  1189. content=content2,
  1190. ):
  1191. api2.update_file_data(content2, 'index.html', 'text/html',
  1192. b'<html>hello world</html>')
  1193. api2.save(content2)
  1194. transaction.commit()
  1195. # Refresh instances after commit
  1196. user1 = uapi.get_one(u1id)
  1197. workspace = WorkspaceApi(
  1198. current_user=user1,
  1199. session=self.session,
  1200. ).get_one(wid)
  1201. updated = api.get_one(pcid, ContentType.Any, workspace)
  1202. eq_(u2id, updated.owner_id,
  1203. 'the owner id should be {} (found {})'.format(u2id,
  1204. updated.owner_id))
  1205. eq_('this_is_a_page.html', updated.file_name)
  1206. eq_('text/html', updated.file_mimetype)
  1207. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1208. eq_(ActionDescription.REVISION, updated.revision_type)
  1209. @raises(SameValueError)
  1210. def test_update_no_change(self):
  1211. uapi = UserApi(
  1212. session=self.session,
  1213. config=CFG(self.config.get_settings()),
  1214. current_user=None,
  1215. )
  1216. group_api = GroupApi(
  1217. current_user=None,
  1218. session=self.session,
  1219. )
  1220. groups = [group_api.get_one(Group.TIM_USER),
  1221. group_api.get_one(Group.TIM_MANAGER),
  1222. group_api.get_one(Group.TIM_ADMIN)]
  1223. user1 = uapi.create_user(
  1224. email='this.is@user',
  1225. groups=groups,
  1226. save_now=True,
  1227. )
  1228. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1229. workspace = workspace_api.create_workspace(
  1230. 'test workspace',
  1231. save_now=True
  1232. )
  1233. user2 = uapi.create_user()
  1234. user2.email = 'this.is@another.user'
  1235. uapi.save(user2)
  1236. RoleApi(
  1237. current_user=user1,
  1238. session=self.session,
  1239. ).create_one(
  1240. user2,
  1241. workspace,
  1242. UserRoleInWorkspace.CONTENT_MANAGER,
  1243. with_notif=False,
  1244. flush=True
  1245. )
  1246. api = ContentApi(current_user=user1, session=self.session)
  1247. with self.session.no_autoflush:
  1248. page = api.create(
  1249. content_type=ContentType.Page,
  1250. workspace=workspace,
  1251. label="same_content",
  1252. do_save=False
  1253. )
  1254. api.update_file_data(
  1255. page,
  1256. 'index.html',
  1257. 'text/html',
  1258. b'<html>Same Content Here</html>'
  1259. )
  1260. api.save(page, ActionDescription.CREATION, do_notify=True)
  1261. transaction.commit()
  1262. api2 = ContentApi(current_user=user2, session=self.session)
  1263. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1264. with new_revision(
  1265. session=self.session,
  1266. tm=transaction.manager,
  1267. content=content2,
  1268. ):
  1269. api2.update_file_data(
  1270. page,
  1271. 'index.html',
  1272. 'text/html',
  1273. b'<html>Same Content Here</html>'
  1274. )
  1275. api2.save(content2)
  1276. transaction.commit()
  1277. def test_archive_unarchive(self):
  1278. uapi = UserApi(
  1279. session=self.session,
  1280. config=CFG(self.config.get_settings()),
  1281. current_user=None,
  1282. )
  1283. group_api = GroupApi(current_user=None, session=self.session)
  1284. groups = [group_api.get_one(Group.TIM_USER),
  1285. group_api.get_one(Group.TIM_MANAGER),
  1286. group_api.get_one(Group.TIM_ADMIN)]
  1287. user1 = uapi.create_user(
  1288. email='this.is@user',
  1289. groups=groups,
  1290. save_now=True
  1291. )
  1292. u1id = user1.user_id
  1293. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1294. workspace = workspace_api.create_workspace(
  1295. 'test workspace',
  1296. save_now=True
  1297. )
  1298. wid = workspace.workspace_id
  1299. user2 = uapi.create_user()
  1300. user2.email = 'this.is@another.user'
  1301. uapi.save(user2)
  1302. RoleApi(
  1303. current_user=user1,
  1304. session=self.session
  1305. ).create_one(
  1306. user2,
  1307. workspace,
  1308. UserRoleInWorkspace.CONTENT_MANAGER,
  1309. with_notif=True,
  1310. flush=True
  1311. )
  1312. # show archived is used at the top end of the test
  1313. api = ContentApi(
  1314. current_user=user1,
  1315. session=self.session,
  1316. show_archived=True,
  1317. )
  1318. p = api.create(ContentType.File, workspace, None,
  1319. 'this_is_a_page', True)
  1320. u1id = user1.user_id
  1321. u2id = user2.user_id
  1322. pcid = p.content_id
  1323. poid = p.owner_id
  1324. transaction.commit()
  1325. ####
  1326. # refresh after commit
  1327. user1 = UserApi(
  1328. current_user=None,
  1329. config=self.config,
  1330. session=self.session
  1331. ).get_one(u1id)
  1332. workspace = WorkspaceApi(
  1333. current_user=user1,
  1334. session=self.session
  1335. ).get_one(wid)
  1336. content = api.get_one(pcid, ContentType.Any, workspace)
  1337. eq_(u1id, content.owner_id)
  1338. eq_(poid, content.owner_id)
  1339. u2api = UserApi(
  1340. session=self.session,
  1341. config=CFG(self.config.get_settings()),
  1342. current_user=None,
  1343. )
  1344. u2 = u2api.get_one(u2id)
  1345. api2 = ContentApi(
  1346. current_user=u2,
  1347. session=self.session,
  1348. show_archived=True
  1349. )
  1350. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1351. with new_revision(
  1352. session=self.session,
  1353. tm=transaction.manager,
  1354. content=content2,
  1355. ):
  1356. api2.archive(content2)
  1357. api2.save(content2)
  1358. transaction.commit()
  1359. # refresh after commit
  1360. user1 = UserApi(
  1361. current_user=None,
  1362. session=self.session,
  1363. config=self.config,
  1364. ).get_one(u1id)
  1365. workspace = WorkspaceApi(
  1366. current_user=user1,
  1367. session=self.session,
  1368. ).get_one(wid)
  1369. u2 = UserApi(
  1370. current_user=None,
  1371. session=self.session,
  1372. config=self.config
  1373. ).get_one(u2id)
  1374. api = ContentApi(
  1375. current_user=user1,
  1376. session=self.session,
  1377. show_archived=True,
  1378. )
  1379. api2 = ContentApi(
  1380. current_user=u2,
  1381. session=self.session,
  1382. show_archived=True,
  1383. )
  1384. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1385. eq_(u2id, updated.owner_id,
  1386. 'the owner id should be {} (found {})'.format(u2id,
  1387. updated.owner_id))
  1388. eq_(True, updated.is_archived)
  1389. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1390. ####
  1391. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1392. with new_revision(
  1393. session=self.session,
  1394. tm=transaction.manager,
  1395. content=updated,
  1396. ):
  1397. api.unarchive(updated)
  1398. api.save(updated2)
  1399. eq_(False, updated2.is_archived)
  1400. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1401. eq_(u1id, updated2.owner_id)
  1402. def test_delete_undelete(self):
  1403. uapi = UserApi(
  1404. session=self.session,
  1405. config=CFG(self.config.get_settings()),
  1406. current_user=None,
  1407. )
  1408. group_api = GroupApi(
  1409. current_user=None,
  1410. session=self.session
  1411. )
  1412. groups = [group_api.get_one(Group.TIM_USER),
  1413. group_api.get_one(Group.TIM_MANAGER),
  1414. group_api.get_one(Group.TIM_ADMIN)]
  1415. user1 = uapi.create_user(
  1416. email='this.is@user',
  1417. groups=groups,
  1418. save_now=True
  1419. )
  1420. u1id = user1.user_id
  1421. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1422. workspace = workspace_api.create_workspace(
  1423. 'test workspace',
  1424. save_now=True
  1425. )
  1426. wid = workspace.workspace_id
  1427. user2 = uapi.create_user()
  1428. user2.email = 'this.is@another.user'
  1429. uapi.save(user2)
  1430. RoleApi(
  1431. current_user=user1,
  1432. session=self.session
  1433. ).create_one(
  1434. user2,
  1435. workspace,
  1436. UserRoleInWorkspace.CONTENT_MANAGER,
  1437. with_notif=True,
  1438. flush=True
  1439. )
  1440. # show archived is used at the top end of the test
  1441. api = ContentApi(
  1442. current_user=user1,
  1443. session=self.session,
  1444. show_deleted=True,
  1445. )
  1446. p = api.create(ContentType.File, workspace, None,
  1447. 'this_is_a_page', True)
  1448. u1id = user1.user_id
  1449. u2id = user2.user_id
  1450. pcid = p.content_id
  1451. poid = p.owner_id
  1452. transaction.commit()
  1453. ####
  1454. user1 = UserApi(
  1455. current_user=None,
  1456. session=self.session,
  1457. config=self.config,
  1458. ).get_one(u1id)
  1459. workspace = WorkspaceApi(
  1460. current_user=user1,
  1461. session=self.session,
  1462. ).get_one(wid)
  1463. content = api.get_one(pcid, ContentType.Any, workspace)
  1464. eq_(u1id, content.owner_id)
  1465. eq_(poid, content.owner_id)
  1466. u2 = UserApi(
  1467. current_user=None,
  1468. session=self.session,
  1469. config=self.config,
  1470. ).get_one(u2id)
  1471. api2 = ContentApi(
  1472. current_user=u2,
  1473. session=self.session,
  1474. show_deleted=True,
  1475. )
  1476. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1477. with new_revision(
  1478. session=self.session,
  1479. tm=transaction.manager,
  1480. content=content2,
  1481. ):
  1482. api2.delete(content2)
  1483. api2.save(content2)
  1484. transaction.commit()
  1485. ####
  1486. user1 = UserApi(
  1487. current_user=None,
  1488. session=self.session,
  1489. config=self.config,
  1490. ).get_one(u1id)
  1491. workspace = WorkspaceApi(
  1492. current_user=user1,
  1493. session=self.session,
  1494. ).get_one(wid)
  1495. # show archived is used at the top end of the test
  1496. api = ContentApi(
  1497. current_user=user1,
  1498. session=self.session,
  1499. show_deleted=True,
  1500. )
  1501. u2 = UserApi(
  1502. current_user=None,
  1503. session=self.session,
  1504. config=self.config,
  1505. ).get_one(u2id)
  1506. api2 = ContentApi(
  1507. current_user=u2,
  1508. session=self.session,
  1509. show_deleted=True
  1510. )
  1511. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1512. eq_(u2id, updated.owner_id,
  1513. 'the owner id should be {} (found {})'.format(u2id,
  1514. updated.owner_id))
  1515. eq_(True, updated.is_deleted)
  1516. eq_(ActionDescription.DELETION, updated.revision_type)
  1517. ####
  1518. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1519. with new_revision(
  1520. tm=transaction.manager,
  1521. session=self.session,
  1522. content=updated2,
  1523. ):
  1524. api.undelete(updated2)
  1525. api.save(updated2)
  1526. eq_(False, updated2.is_deleted)
  1527. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1528. eq_(u1id, updated2.owner_id)
  1529. def test_search_in_label(self):
  1530. # HACK - D.A. - 2015-03-09
  1531. # This test is based on a bug which does NOT return results found
  1532. # at root of a workspace (eg a folder)
  1533. uapi = UserApi(
  1534. session=self.session,
  1535. config=CFG(self.config.get_settings()),
  1536. current_user=None,
  1537. )
  1538. group_api = GroupApi(
  1539. current_user=None,
  1540. session=self.session,
  1541. )
  1542. groups = [group_api.get_one(Group.TIM_USER),
  1543. group_api.get_one(Group.TIM_MANAGER),
  1544. group_api.get_one(Group.TIM_ADMIN)]
  1545. user = uapi.create_user(email='this.is@user',
  1546. groups=groups, save_now=True)
  1547. workspace = WorkspaceApi(
  1548. current_user=user,
  1549. session=self.session
  1550. ).create_workspace(
  1551. 'test workspace',
  1552. save_now=True
  1553. )
  1554. api = ContentApi(
  1555. current_user=user,
  1556. session=self.session
  1557. )
  1558. a = api.create(ContentType.Folder, workspace, None,
  1559. 'this is randomized folder', True)
  1560. p = api.create(ContentType.Page, workspace, a,
  1561. 'this is randomized label content', True)
  1562. with new_revision(
  1563. session=self.session,
  1564. tm=transaction.manager,
  1565. content=p,
  1566. ):
  1567. p.description = 'This is some amazing test'
  1568. api.save(p)
  1569. original_id = p.content_id
  1570. res = api.search(['randomized'])
  1571. eq_(1, len(res.all()))
  1572. item = res.all()[0]
  1573. eq_(original_id, item.content_id)
  1574. def test_search_in_description(self):
  1575. # HACK - D.A. - 2015-03-09
  1576. # This test is based on a bug which does NOT return results found
  1577. # at root of a workspace (eg a folder)
  1578. uapi = UserApi(
  1579. session=self.session,
  1580. config=CFG(self.config.get_settings()),
  1581. current_user=None,
  1582. )
  1583. group_api = GroupApi(
  1584. current_user=None,
  1585. session=self.session,
  1586. )
  1587. groups = [group_api.get_one(Group.TIM_USER),
  1588. group_api.get_one(Group.TIM_MANAGER),
  1589. group_api.get_one(Group.TIM_ADMIN)]
  1590. user = uapi.create_user(email='this.is@user',
  1591. groups=groups, save_now=True)
  1592. workspace = WorkspaceApi(
  1593. current_user=user,
  1594. session=self.session
  1595. ).create_workspace(
  1596. 'test workspace',
  1597. save_now=True,
  1598. )
  1599. api = ContentApi(
  1600. current_user=user,
  1601. session=self.session
  1602. )
  1603. a = api.create(ContentType.Folder, workspace, None,
  1604. 'this is randomized folder', True)
  1605. p = api.create(ContentType.Page, workspace, a,
  1606. 'this is dummy label content', True)
  1607. with new_revision(
  1608. tm=transaction.manager,
  1609. session=self.session,
  1610. content=p,
  1611. ):
  1612. p.description = 'This is some amazing test'
  1613. api.save(p)
  1614. original_id = p.content_id
  1615. res = api.search(['dummy'])
  1616. eq_(1, len(res.all()))
  1617. item = res.all()[0]
  1618. eq_(original_id, item.content_id)
  1619. def test_search_in_label_or_description(self):
  1620. # HACK - D.A. - 2015-03-09
  1621. # This test is based on a bug which does NOT return results found
  1622. # at root of a workspace (eg a folder)
  1623. uapi = UserApi(
  1624. session=self.session,
  1625. config=CFG(self.config.get_settings()),
  1626. current_user=None,
  1627. )
  1628. group_api = GroupApi(current_user=None, session=self.session)
  1629. groups = [group_api.get_one(Group.TIM_USER),
  1630. group_api.get_one(Group.TIM_MANAGER),
  1631. group_api.get_one(Group.TIM_ADMIN)]
  1632. user = uapi.create_user(email='this.is@user',
  1633. groups=groups, save_now=True)
  1634. workspace = WorkspaceApi(
  1635. current_user=user,
  1636. session=self.session
  1637. ).create_workspace('test workspace', save_now=True)
  1638. api = ContentApi(
  1639. current_user=user,
  1640. session=self.session
  1641. )
  1642. a = api.create(ContentType.Folder, workspace, None,
  1643. 'this is randomized folder', True)
  1644. p1 = api.create(ContentType.Page, workspace, a,
  1645. 'this is dummy label content', True)
  1646. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1647. with new_revision(
  1648. session=self.session,
  1649. tm=transaction.manager,
  1650. content=p1,
  1651. ):
  1652. p1.description = 'This is some amazing test'
  1653. with new_revision(
  1654. session=self.session,
  1655. tm=transaction.manager,
  1656. content=p2,
  1657. ):
  1658. p2.description = 'What\'s up ?'
  1659. api.save(p1)
  1660. api.save(p2)
  1661. id1 = p1.content_id
  1662. id2 = p2.content_id
  1663. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1664. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1665. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1666. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1667. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1668. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
  1669. res = api.search(['dummy', 'jon'])
  1670. eq_(2, len(res.all()))
  1671. eq_(True, id1 in [o.content_id for o in res.all()])
  1672. eq_(True, id2 in [o.content_id for o in res.all()])
  1673. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1674. admin = self.session.query(User)\
  1675. .filter(User.email == 'admin@admin.admin').one()
  1676. workspace = self._create_workspace_and_test(
  1677. 'workspace_1',
  1678. admin
  1679. )
  1680. folder_1 = self._create_content_and_test(
  1681. 'folder_1',
  1682. workspace=workspace,
  1683. type=ContentType.Folder
  1684. )
  1685. folder_2 = self._create_content_and_test(
  1686. 'folder_2',
  1687. workspace=workspace,
  1688. type=ContentType.Folder
  1689. )
  1690. page_1 = self._create_content_and_test(
  1691. 'foo', workspace=workspace,
  1692. type=ContentType.Page,
  1693. parent=folder_1
  1694. )
  1695. page_2 = self._create_content_and_test(
  1696. 'bar',
  1697. workspace=workspace,
  1698. type=ContentType.Page,
  1699. parent=folder_2
  1700. )
  1701. api = ContentApi(
  1702. current_user=admin,
  1703. session=self.session,
  1704. )
  1705. foo_result = api.search(['foo']).all()
  1706. eq_(1, len(foo_result))
  1707. ok_(page_1 in foo_result)
  1708. bar_result = api.search(['bar']).all()
  1709. eq_(1, len(bar_result))
  1710. ok_(page_2 in bar_result)
  1711. with new_revision(
  1712. session=self.session,
  1713. tm=transaction.manager,
  1714. content=folder_1,
  1715. ):
  1716. api.delete(folder_1)
  1717. with new_revision(
  1718. session=self.session,
  1719. tm=transaction.manager,
  1720. content=folder_2,
  1721. ):
  1722. api.archive(folder_2)
  1723. # Actually ContentApi.search don't filter it
  1724. foo_result = api.search(['foo']).all()
  1725. eq_(1, len(foo_result))
  1726. ok_(page_1 in foo_result)
  1727. bar_result = api.search(['bar']).all()
  1728. eq_(1, len(bar_result))
  1729. ok_(page_2 in bar_result)
  1730. # ContentApi offer exclude_unavailable method to do it
  1731. foo_result = api.search(['foo']).all()
  1732. api.exclude_unavailable(foo_result)
  1733. eq_(0, len(foo_result))
  1734. bar_result = api.search(['bar']).all()
  1735. api.exclude_unavailable(bar_result)
  1736. eq_(0, len(bar_result))
  1737. class TestContentApiSecurity(DefaultTest):
  1738. fixtures = [FixtureTest, ]
  1739. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1740. admin = self.session.query(User)\
  1741. .filter(User.email == 'admin@admin.admin').one()
  1742. bob = self.session.query(User)\
  1743. .filter(User.email == 'bob@fsf.local').one()
  1744. bob_workspace = WorkspaceApi(
  1745. current_user=bob,
  1746. session=self.session,
  1747. ).create_workspace(
  1748. 'bob_workspace',
  1749. save_now=True,
  1750. )
  1751. admin_workspace = WorkspaceApi(
  1752. current_user=admin,
  1753. session=self.session,
  1754. ).create_workspace(
  1755. 'admin_workspace',
  1756. save_now=True,
  1757. )
  1758. bob_page = ContentApi(
  1759. current_user=bob,
  1760. session=self.session
  1761. ).create(
  1762. content_type=ContentType.Page,
  1763. workspace=bob_workspace,
  1764. label='bob_page',
  1765. do_save=True,
  1766. )
  1767. admin_page = ContentApi(
  1768. current_user=admin,
  1769. session=self.session
  1770. ).create(
  1771. content_type=ContentType.Page,
  1772. workspace=admin_workspace,
  1773. label='admin_page',
  1774. do_save=True,
  1775. )
  1776. bob_viewable = ContentApi(
  1777. current_user=bob,
  1778. session=self.session
  1779. ).get_all()
  1780. eq_(1, len(bob_viewable), 'Bob should view only one content')
  1781. eq_(
  1782. 'bob_page',
  1783. bob_viewable[0].label,
  1784. 'Bob should not view "{0}" content'.format(
  1785. bob_viewable[0].label,
  1786. )
  1787. )