test_content_api.py 62KB


  1. # -*- coding: utf-8 -*-
  2. import datetime
  3. from nose.tools import eq_, ok_
  4. from nose.tools import raises
  5. from depot.io.utils import FileIntent
  6. import transaction
  7. from tracim.config import CFG
  8. from tracim.lib.content import compare_content_for_sorting_by_type_and_name
  9. from tracim.lib.content import ContentApi
  10. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  11. from tracim.lib.group import GroupApi
  12. from tracim.lib.user import UserApi
  13. from tracim.exceptions import SameValueError
  14. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  15. from tracim.lib.workspace import RoleApi
  16. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  17. from tracim.lib.workspace import WorkspaceApi
  18. from tracim.models.revision_protection import new_revision
  19. from tracim.models.auth import User
  20. from tracim.models.auth import Group
  21. from tracim.models.data import ActionDescription
  22. from tracim.models.data import ContentRevisionRO
  23. from tracim.models.data import Workspace
  24. from tracim.models.data import Content
  25. from tracim.models.data import ContentType
  26. from tracim.models.data import UserRoleInWorkspace
  27. from tracim.fixtures.users_and_groups import Test as FixtureTest
  28. from tracim.tests import DefaultTest
  29. class TestContentApi(DefaultTest):
  30. def test_compare_content_for_sorting_by_type(self):
  31. c1 = Content()
  32. c1.label = ''
  33. c1.type = 'file'
  34. c2 = Content()
  35. c2.label = ''
  36. c2.type = 'folder'
  37. c11 = c1
  38. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  39. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  40. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  41. def test_compare_content_for_sorting_by_label(self):
  42. c1 = Content()
  43. c1.label = 'bbb'
  44. c1.type = 'file'
  45. c2 = Content()
  46. c2.label = 'aaa'
  47. c2.type = 'file'
  48. c11 = c1
  49. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  50. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  51. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  52. def test_sort_by_label_or_filename(self):
  53. c1 = Content()
  54. c1.label = 'ABCD'
  55. c1.type = 'file'
  56. c2 = Content()
  57. c2.label = ''
  58. c2.type = 'file'
  59. c2.file_name = 'AABC'
  60. c3 = Content()
  61. c3.label = 'BCDE'
  62. c3.type = 'file'
  63. items = [c1, c2, c3]
  64. sorteds = ContentApi.sort_content(items)
  65. eq_(sorteds[0], c2)
  66. eq_(sorteds[1], c1)
  67. eq_(sorteds[2], c3)
  68. def test_sort_by_content_type(self):
  69. c1 = Content()
  70. c1.label = 'AAAA'
  71. c1.type = 'file'
  72. c2 = Content()
  73. c2.label = 'BBBB'
  74. c2.type = 'folder'
  75. items = [c1, c2]
  76. sorteds = ContentApi.sort_content(items)
  77. eq_(sorteds[0], c2,
  78. 'value is {} instead of {}'.format(sorteds[0].content_id,
  79. c2.content_id))
  80. eq_(sorteds[1], c1,
  81. 'value is {} instead of {}'.format(sorteds[1].content_id,
  82. c1.content_id))
  83. def test_delete(self):
  84. uapi = UserApi(
  85. session=self.session,
  86. config=CFG(self.config.get_settings()),
  87. current_user=None,
  88. )
  89. group_api = GroupApi(current_user=None,session=self.session)
  90. groups = [group_api.get_one(Group.TIM_USER),
  91. group_api.get_one(Group.TIM_MANAGER),
  92. group_api.get_one(Group.TIM_ADMIN)]
  93. user = uapi.create_user(email='this.is@user',
  94. groups=groups, save_now=True)
  95. workspace = WorkspaceApi(
  96. current_user=user,
  97. session=self.session
  98. ).create_workspace('test workspace', save_now=True)
  99. api = ContentApi(
  100. current_user=user,
  101. session=self.session,
  102. )
  103. item = api.create(ContentType.Folder, workspace, None,
  104. 'not_deleted', True)
  105. item2 = api.create(ContentType.Folder, workspace, None,
  106. 'to_delete', True)
  107. uid = user.user_id
  108. wid = workspace.workspace_id
  109. transaction.commit()
  110. # Refresh instances after commit
  111. user = uapi.get_one(uid)
  112. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  113. workspace = workspace_api.get_one(wid)
  114. api = ContentApi(current_user=user, session=self.session)
  115. items = api.get_all(None, ContentType.Any, workspace)
  116. eq_(2, len(items))
  117. items = api.get_all(None, ContentType.Any, workspace)
  118. with new_revision(
  119. session=self.session,
  120. tm=transaction.manager,
  121. content=items[0]
  122. ):
  123. api.delete(items[0])
  124. transaction.commit()
  125. # Refresh instances after commit
  126. user = uapi.get_one(uid)
  127. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  128. workspace = workspace_api.get_one(wid)
  129. api = ContentApi(
  130. current_user=user,
  131. session=self.session
  132. )
  133. items = api.get_all(None, ContentType.Any, workspace)
  134. eq_(1, len(items))
  135. transaction.commit()
  136. # Test that the item is still available if "show deleted" is activated
  137. # Refresh instances after commit
  138. user = uapi.get_one(uid)
  139. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  140. api = ContentApi(
  141. current_user=user,
  142. session=self.session,
  143. show_deleted=True
  144. )
  145. items = api.get_all(None, ContentType.Any, workspace)
  146. eq_(2, len(items))
  147. def test_archive(self):
  148. uapi = UserApi(
  149. session=self.session,
  150. config=CFG(self.config.get_settings()),
  151. current_user=None,
  152. )
  153. group_api = GroupApi(current_user=None, session=self.session)
  154. groups = [group_api.get_one(Group.TIM_USER),
  155. group_api.get_one(Group.TIM_MANAGER),
  156. group_api.get_one(Group.TIM_ADMIN)]
  157. user = uapi.create_user(email='this.is@user',
  158. groups=groups, save_now=True)
  159. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  160. workspace = workspace_api.create_workspace(
  161. 'test workspace',
  162. save_now=True
  163. )
  164. api = ContentApi(
  165. current_user=user,
  166. session=self.session,
  167. )
  168. item = api.create(ContentType.Folder, workspace, None,
  169. 'not_archived', True)
  170. item2 = api.create(ContentType.Folder, workspace, None,
  171. 'to_archive', True)
  172. uid = user.user_id
  173. wid = workspace.workspace_id
  174. transaction.commit()
  175. # Refresh instances after commit
  176. user = uapi.get_one(uid)
  177. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  178. api = ContentApi(
  179. session=self.session,
  180. current_user=user,
  181. )
  182. items = api.get_all(None, ContentType.Any, workspace)
  183. eq_(2, len(items))
  184. items = api.get_all(None, ContentType.Any, workspace)
  185. with new_revision(
  186. session=self.session,
  187. tm=transaction.manager,
  188. content=items[0],
  189. ):
  190. api.archive(items[0])
  191. transaction.commit()
  192. # Refresh instances after commit
  193. user = uapi.get_one(uid)
  194. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  195. workspace = workspace_api.get_one(wid)
  196. api = ContentApi(
  197. current_user=user,
  198. session=self.session
  199. )
  200. items = api.get_all(None, ContentType.Any, workspace)
  201. eq_(1, len(items))
  202. transaction.commit()
  203. # Refresh instances after commit
  204. user = uapi.get_one(uid)
  205. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  206. workspace = workspace_api.get_one(wid)
  207. api = ContentApi(
  208. current_user=user,
  209. session=self.session
  210. )
  211. # Test that the item is still available if "show deleted" is activated
  212. api = ContentApi(
  213. current_user=None,
  214. session=self.session,
  215. show_archived=True
  216. )
  217. items = api.get_all(None, ContentType.Any, workspace)
  218. eq_(2, len(items))
  219. def test_get_all_with_filter(self):
  220. uapi = UserApi(
  221. session=self.session,
  222. config=CFG(self.config.get_settings()),
  223. current_user=None,
  224. )
  225. group_api = GroupApi(current_user=None, session=self.session)
  226. groups = [group_api.get_one(Group.TIM_USER),
  227. group_api.get_one(Group.TIM_MANAGER),
  228. group_api.get_one(Group.TIM_ADMIN)]
  229. user = uapi.create_user(
  230. email='this.is@user',
  231. groups=groups,
  232. save_now=True
  233. )
  234. workspace = WorkspaceApi(
  235. current_user=user,
  236. session=self.session
  237. ).create_workspace(
  238. 'test workspace',
  239. save_now=True
  240. )
  241. api = ContentApi(
  242. current_user=user,
  243. session=self.session,
  244. )
  245. item = api.create(ContentType.Folder, workspace, None,
  246. 'thefolder', True)
  247. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  248. uid = user.user_id
  249. wid = workspace.workspace_id
  250. transaction.commit()
  251. # Refresh instances after commit
  252. user = uapi.get_one(uid)
  253. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  254. workspace = workspace_api.get_one(wid)
  255. api = ContentApi(
  256. current_user=user,
  257. session=self.session
  258. )
  259. items = api.get_all(None, ContentType.Any, workspace)
  260. eq_(2, len(items))
  261. items2 = api.get_all(None, ContentType.File, workspace)
  262. eq_(1, len(items2))
  263. eq_('thefile', items2[0].label)
  264. items3 = api.get_all(None, ContentType.Folder, workspace)
  265. eq_(1, len(items3))
  266. eq_('thefolder', items3[0].label)
  267. def test_get_all_with_parent_id(self):
  268. uapi = UserApi(
  269. session=self.session,
  270. config=CFG(self.config.get_settings()),
  271. current_user=None,
  272. )
  273. group_api = GroupApi(current_user=None, session=self.session)
  274. groups = [group_api.get_one(Group.TIM_USER),
  275. group_api.get_one(Group.TIM_MANAGER),
  276. group_api.get_one(Group.TIM_ADMIN)]
  277. user = uapi.create_user(email='this.is@user',
  278. groups=groups, save_now=True)
  279. workspace = WorkspaceApi(
  280. current_user=user,
  281. session=self.session
  282. ).create_workspace('test workspace', save_now=True)
  283. api = ContentApi(
  284. current_user=user,
  285. session=self.session
  286. )
  287. item = api.create(
  288. ContentType.Folder,
  289. workspace,
  290. None,
  291. 'parent',
  292. do_save=True,
  293. )
  294. item2 = api.create(
  295. ContentType.File,
  296. workspace,
  297. item,
  298. 'file1',
  299. do_save=True,
  300. )
  301. item3 = api.create(
  302. ContentType.File,
  303. workspace,
  304. None,
  305. 'file2',
  306. do_save=True,
  307. )
  308. parent_id = item.content_id
  309. child_id = item2.content_id
  310. uid = user.user_id
  311. wid = workspace.workspace_id
  312. transaction.commit()
  313. # Refresh instances after commit
  314. user = uapi.get_one(uid)
  315. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  316. workspace = workspace_api.get_one(wid)
  317. api = ContentApi(
  318. current_user=user,
  319. session=self.session
  320. )
  321. items = api.get_all(None, ContentType.Any, workspace)
  322. eq_(3, len(items))
  323. items2 = api.get_all(parent_id, ContentType.File, workspace)
  324. eq_(1, len(items2))
  325. eq_(child_id, items2[0].content_id)
  326. @raises(ValueError)
  327. def test_set_status_unknown_status(self):
  328. uapi = UserApi(
  329. session=self.session,
  330. config=CFG(self.config.get_settings()),
  331. current_user=None,
  332. )
  333. group_api = GroupApi(
  334. current_user=None,
  335. session=self.session
  336. )
  337. groups = [group_api.get_one(Group.TIM_USER),
  338. group_api.get_one(Group.TIM_MANAGER),
  339. group_api.get_one(Group.TIM_ADMIN)]
  340. user = uapi.create_user(email='this.is@user',
  341. groups=groups, save_now=True)
  342. workspace = WorkspaceApi(
  343. current_user=user,
  344. session=self.session
  345. ).create_workspace(
  346. 'test workspace',
  347. save_now=True
  348. )
  349. api = ContentApi(
  350. current_user=user,
  351. session=self.session
  352. )
  353. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  354. with new_revision(
  355. session=self.session,
  356. tm=transaction.manager,
  357. content=c,
  358. ):
  359. api.set_status(c, 'unknown-status')
  360. def test_set_status_ok(self):
  361. uapi = UserApi(
  362. session=self.session,
  363. config=CFG(self.config.get_settings()),
  364. current_user=None,
  365. )
  366. group_api = GroupApi(
  367. current_user=None,
  368. session=self.session
  369. )
  370. groups = [group_api.get_one(Group.TIM_USER),
  371. group_api.get_one(Group.TIM_MANAGER),
  372. group_api.get_one(Group.TIM_ADMIN)]
  373. user = uapi.create_user(email='this.is@user',
  374. groups=groups, save_now=True)
  375. workspace = WorkspaceApi(
  376. current_user=user,
  377. session=self.session
  378. ).create_workspace(
  379. 'test workspace',
  380. save_now=True
  381. )
  382. api = ContentApi(
  383. current_user=user,
  384. session=self.session
  385. )
  386. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  387. with new_revision(
  388. session=self.session,
  389. tm=transaction.manager,
  390. content=c,
  391. ):
  392. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  393. 'closed-deprecated']:
  394. api.set_status(c, new_status)
  395. eq_(new_status, c.status)
  396. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  397. def test_create_comment_ok(self):
  398. uapi = UserApi(
  399. session=self.session,
  400. config=CFG(self.config.get_settings()),
  401. current_user=None,
  402. )
  403. group_api = GroupApi(
  404. current_user=None,
  405. session=self.session
  406. )
  407. groups = [group_api.get_one(Group.TIM_USER),
  408. group_api.get_one(Group.TIM_MANAGER),
  409. group_api.get_one(Group.TIM_ADMIN)]
  410. user = uapi.create_user(email='this.is@user',
  411. groups=groups, save_now=True)
  412. workspace = WorkspaceApi(
  413. current_user=user,
  414. session=self.session
  415. ).create_workspace(
  416. 'test workspace',
  417. save_now=True
  418. )
  419. api = ContentApi(
  420. current_user=user,
  421. session=self.session
  422. )
  423. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  424. c = api.create_comment(workspace, p, 'this is the comment', True)
  425. eq_(Content, c.__class__)
  426. eq_(p.content_id, c.parent_id)
  427. eq_(user, c.owner)
  428. eq_(workspace, c.workspace)
  429. eq_(ContentType.Comment, c.type)
  430. eq_('this is the comment', c.description)
  431. eq_('', c.label)
  432. eq_(ActionDescription.COMMENT, c.revision_type)
  433. def test_unit_copy_file_different_label_different_parent_ok(self):
  434. uapi = UserApi(
  435. session=self.session,
  436. config=CFG(self.config.get_settings()),
  437. current_user=None,
  438. )
  439. group_api = GroupApi(
  440. current_user=None,
  441. session=self.session
  442. )
  443. groups = [group_api.get_one(Group.TIM_USER),
  444. group_api.get_one(Group.TIM_MANAGER),
  445. group_api.get_one(Group.TIM_ADMIN)]
  446. user = uapi.create_user(
  447. email='user1@user',
  448. groups=groups,
  449. save_now=True
  450. )
  451. user2 = uapi.create_user(
  452. email='user2@user',
  453. groups=groups,
  454. save_now=True
  455. )
  456. workspace = WorkspaceApi(
  457. current_user=user,
  458. session=self.session
  459. ).create_workspace(
  460. 'test workspace',
  461. save_now=True
  462. )
  463. RoleApi(current_user=user, session=self.session).create_one(
  464. user2,
  465. workspace,
  466. UserRoleInWorkspace.WORKSPACE_MANAGER,
  467. with_notif=False
  468. )
  469. api = ContentApi(
  470. current_user=user,
  471. session=self.session
  472. )
  473. foldera = api.create(
  474. ContentType.Folder,
  475. workspace,
  476. None,
  477. 'folder a',
  478. True
  479. )
  480. with self.session.no_autoflush:
  481. text_file = api.create(
  482. content_type=ContentType.File,
  483. workspace=workspace,
  484. parent=foldera,
  485. label='test_file',
  486. do_save=False,
  487. )
  488. api.update_file_data(
  489. text_file,
  490. 'test_file',
  491. 'text/plain',
  492. b'test_content'
  493. )
  494. api.save(text_file, ActionDescription.CREATION)
  495. api2 = ContentApi(current_user=user2, session=self.session)
  496. workspace2 = WorkspaceApi(
  497. current_user=user2,
  498. session=self.session,
  499. ).create_workspace(
  500. 'test workspace2',
  501. save_now=True
  502. )
  503. folderb = api2.create(
  504. ContentType.Folder,
  505. workspace2,
  506. None,
  507. 'folder b',
  508. True
  509. )
  510. api2.copy(
  511. item=text_file,
  512. new_parent=folderb,
  513. new_label='test_file_copy'
  514. )
  515. transaction.commit()
  516. text_file_copy = api2.get_one_by_label_and_parent(
  517. 'test_file_copy',
  518. folderb,
  519. )
  520. assert text_file != text_file_copy
  521. assert text_file_copy.content_id != text_file.content_id
  522. assert text_file_copy.workspace_id == workspace2.workspace_id
  523. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  524. assert text_file_copy.depot_file.path != text_file.depot_file.path
  525. assert text_file_copy.label == 'test_file_copy'
  526. assert text_file_copy.type == text_file.type
  527. assert text_file_copy.parent.content_id == folderb.content_id
  528. assert text_file_copy.owner.user_id == user.user_id
  529. assert text_file_copy.description == text_file.description
  530. assert text_file_copy.file_extension == text_file.file_extension
  531. assert text_file_copy.file_mimetype == text_file.file_mimetype
  532. assert text_file_copy.revision_type == ActionDescription.COPY
  533. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  534. def test_unit_copy_file__same_label_different_parent_ok(self):
  535. uapi = UserApi(
  536. session=self.session,
  537. config=CFG(self.config.get_settings()),
  538. current_user=None,
  539. )
  540. group_api = GroupApi(
  541. current_user=None,
  542. session=self.session
  543. )
  544. groups = [group_api.get_one(Group.TIM_USER),
  545. group_api.get_one(Group.TIM_MANAGER),
  546. group_api.get_one(Group.TIM_ADMIN)]
  547. user = uapi.create_user(
  548. email='user1@user',
  549. groups=groups,
  550. save_now=True
  551. )
  552. user2 = uapi.create_user(
  553. email='user2@user',
  554. groups=groups,
  555. save_now=True
  556. )
  557. workspace = WorkspaceApi(
  558. current_user=user,
  559. session=self.session
  560. ).create_workspace(
  561. 'test workspace',
  562. save_now=True
  563. )
  564. RoleApi(current_user=user, session=self.session).create_one(
  565. user2,
  566. workspace,
  567. UserRoleInWorkspace.WORKSPACE_MANAGER,
  568. with_notif=False
  569. )
  570. api = ContentApi(
  571. current_user=user,
  572. session=self.session
  573. )
  574. foldera = api.create(
  575. ContentType.Folder,
  576. workspace,
  577. None,
  578. 'folder a',
  579. True
  580. )
  581. with self.session.no_autoflush:
  582. text_file = api.create(
  583. content_type=ContentType.File,
  584. workspace=workspace,
  585. parent=foldera,
  586. label='test_file',
  587. do_save=False,
  588. )
  589. api.update_file_data(
  590. text_file,
  591. 'test_file',
  592. 'text/plain',
  593. b'test_content'
  594. )
  595. api.save(text_file, ActionDescription.CREATION)
  596. api2 = ContentApi(
  597. current_user=user2,
  598. session=self.session,
  599. )
  600. workspace2 = WorkspaceApi(
  601. current_user=user2,
  602. session=self.session
  603. ).create_workspace(
  604. 'test workspace2',
  605. save_now=True
  606. )
  607. folderb = api2.create(
  608. ContentType.Folder,
  609. workspace2,
  610. None,
  611. 'folder b',
  612. True
  613. )
  614. api2.copy(
  615. item=text_file,
  616. new_parent=folderb,
  617. )
  618. transaction.commit()
  619. text_file_copy = api2.get_one_by_label_and_parent(
  620. 'test_file',
  621. folderb,
  622. )
  623. assert text_file != text_file_copy
  624. assert text_file_copy.content_id != text_file.content_id
  625. assert text_file_copy.workspace_id == workspace2.workspace_id
  626. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  627. assert text_file_copy.depot_file.path != text_file.depot_file.path
  628. assert text_file_copy.label == text_file.label
  629. assert text_file_copy.type == text_file.type
  630. assert text_file_copy.parent.content_id == folderb.content_id
  631. assert text_file_copy.owner.user_id == user.user_id
  632. assert text_file_copy.description == text_file.description
  633. assert text_file_copy.file_extension == text_file.file_extension
  634. assert text_file_copy.file_mimetype == text_file.file_mimetype
  635. assert text_file_copy.revision_type == ActionDescription.COPY
  636. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  637. def test_unit_copy_file_different_label_same_parent_ok(self):
  638. uapi = UserApi(
  639. session=self.session,
  640. config=CFG(self.config.get_settings()),
  641. current_user=None,
  642. )
  643. group_api = GroupApi(
  644. current_user=None,
  645. session=self.session
  646. )
  647. groups = [group_api.get_one(Group.TIM_USER),
  648. group_api.get_one(Group.TIM_MANAGER),
  649. group_api.get_one(Group.TIM_ADMIN)]
  650. user = uapi.create_user(
  651. email='user1@user',
  652. groups=groups,
  653. save_now=True,
  654. )
  655. user2 = uapi.create_user(
  656. email='user2@user',
  657. groups=groups,
  658. save_now=True
  659. )
  660. workspace = WorkspaceApi(
  661. current_user=user,
  662. session=self.session
  663. ).create_workspace(
  664. 'test workspace',
  665. save_now=True
  666. )
  667. RoleApi(current_user=user, session=self.session).create_one(
  668. user2, workspace,
  669. UserRoleInWorkspace.WORKSPACE_MANAGER,
  670. with_notif=False
  671. )
  672. api = ContentApi(
  673. current_user=user,
  674. session=self.session
  675. )
  676. foldera = api.create(
  677. ContentType.Folder,
  678. workspace,
  679. None,
  680. 'folder a',
  681. True
  682. )
  683. with self.session.no_autoflush:
  684. text_file = api.create(
  685. content_type=ContentType.File,
  686. workspace=workspace,
  687. parent=foldera,
  688. label='test_file',
  689. do_save=False,
  690. )
  691. api.update_file_data(
  692. text_file,
  693. 'test_file',
  694. 'text/plain',
  695. b'test_content'
  696. )
  697. api.save(
  698. text_file,
  699. ActionDescription.CREATION
  700. )
  701. api2 = ContentApi(current_user=user2, session=self.session)
  702. api2.copy(
  703. item=text_file,
  704. new_label='test_file_copy'
  705. )
  706. transaction.commit()
  707. text_file_copy = api2.get_one_by_label_and_parent(
  708. 'test_file_copy',
  709. foldera,
  710. )
  711. assert text_file != text_file_copy
  712. assert text_file_copy.content_id != text_file.content_id
  713. assert text_file_copy.workspace_id == workspace.workspace_id
  714. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  715. assert text_file_copy.depot_file.path != text_file.depot_file.path
  716. assert text_file_copy.label == 'test_file_copy'
  717. assert text_file_copy.type == text_file.type
  718. assert text_file_copy.parent.content_id == foldera.content_id
  719. assert text_file_copy.owner.user_id == user.user_id
  720. assert text_file_copy.description == text_file.description
  721. assert text_file_copy.file_extension == text_file.file_extension
  722. assert text_file_copy.file_mimetype == text_file.file_mimetype
  723. assert text_file_copy.revision_type == ActionDescription.COPY
  724. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  725. def test_mark_read__workspace(self):
  726. uapi = UserApi(
  727. session=self.session,
  728. config=CFG(self.config.get_settings()),
  729. current_user=None,
  730. )
  731. group_api = GroupApi(
  732. current_user=None,
  733. session=self.session
  734. )
  735. groups = [group_api.get_one(Group.TIM_USER),
  736. group_api.get_one(Group.TIM_MANAGER),
  737. group_api.get_one(Group.TIM_ADMIN)]
  738. user_a = uapi.create_user(email='this.is@user',
  739. groups=groups, save_now=True)
  740. user_b = uapi.create_user(email='this.is@another.user',
  741. groups=groups, save_now=True)
  742. wapi = WorkspaceApi(
  743. current_user=user_a,
  744. session=self.session,
  745. )
  746. workspace1 = wapi.create_workspace(
  747. 'test workspace n°1',
  748. save_now=True)
  749. workspace2 = wapi.create_workspace(
  750. 'test workspace n°2',
  751. save_now=True)
  752. role_api1 = RoleApi(
  753. current_user=user_a,
  754. session=self.session,
  755. )
  756. role_api1.create_one(
  757. user_b,
  758. workspace1,
  759. UserRoleInWorkspace.READER,
  760. False
  761. )
  762. role_api2 = RoleApi(
  763. current_user=user_b,
  764. session=self.session,
  765. )
  766. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  767. False)
  768. cont_api_a = ContentApi(
  769. current_user=user_a,
  770. session=self.session,
  771. )
  772. cont_api_b = ContentApi(
  773. current_user=user_b,
  774. session=self.session,
  775. )
  776. # Creates page_1 & page_2 in workspace 1
  777. # and page_3 & page_4 in workspace 2
  778. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  779. 'this is a page', do_save=True)
  780. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  781. 'this is page1', do_save=True)
  782. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  783. 'this is page2', do_save=True)
  784. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  785. 'this is page3', do_save=True)
  786. for rev in page_1.revisions:
  787. eq_(user_b not in rev.read_by.keys(), True)
  788. for rev in page_2.revisions:
  789. eq_(user_b not in rev.read_by.keys(), True)
  790. for rev in page_3.revisions:
  791. eq_(user_b not in rev.read_by.keys(), True)
  792. for rev in page_4.revisions:
  793. eq_(user_b not in rev.read_by.keys(), True)
  794. # Set as read the workspace n°1
  795. cont_api_b.mark_read__workspace(workspace=workspace1)
  796. for rev in page_1.revisions:
  797. eq_(user_b in rev.read_by.keys(), True)
  798. for rev in page_2.revisions:
  799. eq_(user_b in rev.read_by.keys(), True)
  800. for rev in page_3.revisions:
  801. eq_(user_b not in rev.read_by.keys(), True)
  802. for rev in page_4.revisions:
  803. eq_(user_b not in rev.read_by.keys(), True)
  804. # Set as read the workspace n°2
  805. cont_api_b.mark_read__workspace(workspace=workspace2)
  806. for rev in page_1.revisions:
  807. eq_(user_b in rev.read_by.keys(), True)
  808. for rev in page_2.revisions:
  809. eq_(user_b in rev.read_by.keys(), True)
  810. for rev in page_3.revisions:
  811. eq_(user_b in rev.read_by.keys(), True)
  812. for rev in page_4.revisions:
  813. eq_(user_b in rev.read_by.keys(), True)
  814. def test_mark_read(self):
  815. uapi = UserApi(
  816. session=self.session,
  817. config=CFG(self.config.get_settings()),
  818. current_user=None,
  819. )
  820. group_api = GroupApi(
  821. current_user=None,
  822. session=self.session
  823. )
  824. groups = [group_api.get_one(Group.TIM_USER),
  825. group_api.get_one(Group.TIM_MANAGER),
  826. group_api.get_one(Group.TIM_ADMIN)]
  827. user_a = uapi.create_user(
  828. email='this.is@user',
  829. groups=groups,
  830. save_now=True
  831. )
  832. user_b = uapi.create_user(
  833. email='this.is@another.user',
  834. groups=groups,
  835. save_now=True
  836. )
  837. wapi = WorkspaceApi(current_user=user_a, session=self.session)
  838. workspace_api = WorkspaceApi(
  839. current_user=user_a,
  840. session=self.session
  841. )
  842. workspace = wapi.create_workspace(
  843. 'test workspace',
  844. save_now=True)
  845. role_api = RoleApi(
  846. current_user=user_a,
  847. session=self.session,
  848. )
  849. role_api.create_one(
  850. user_b,
  851. workspace,
  852. UserRoleInWorkspace.READER,
  853. False
  854. )
  855. cont_api_a = ContentApi(
  856. current_user=user_a,
  857. session=self.session,
  858. )
  859. cont_api_b = ContentApi(
  860. current_user=user_b,
  861. session=self.session,
  862. )
  863. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  864. 'this is a page', do_save=True)
  865. for rev in page_1.revisions:
  866. eq_(user_b not in rev.read_by.keys(), True)
  867. cont_api_b.mark_read(page_1)
  868. for rev in page_1.revisions:
  869. eq_(user_b in rev.read_by.keys(), True)
  870. def test_mark_read__all(self):
  871. uapi = UserApi(
  872. session=self.session,
  873. config=CFG(self.config.get_settings()),
  874. current_user=None,
  875. )
  876. group_api = GroupApi(current_user=None, session=self.session)
  877. groups = [group_api.get_one(Group.TIM_USER),
  878. group_api.get_one(Group.TIM_MANAGER),
  879. group_api.get_one(Group.TIM_ADMIN)]
  880. user_a = uapi.create_user(
  881. email='this.is@user',
  882. groups=groups,
  883. save_now=True
  884. )
  885. user_b = uapi.create_user(
  886. email='this.is@another.user',
  887. groups=groups,
  888. save_now=True
  889. )
  890. wapi = WorkspaceApi(
  891. current_user=user_a,
  892. session=self.session,
  893. )
  894. workspace = wapi.create_workspace(
  895. 'test workspace',
  896. save_now=True)
  897. role_api = RoleApi(
  898. current_user=user_a,
  899. session=self.session,
  900. )
  901. role_api.create_one(
  902. user_b,
  903. workspace,
  904. UserRoleInWorkspace.READER,
  905. False
  906. )
  907. cont_api_a = ContentApi(
  908. current_user=user_a,
  909. session=self.session,
  910. )
  911. cont_api_b = ContentApi(
  912. current_user=user_b,
  913. session=self.session,
  914. )
  915. page_2 = cont_api_a.create(
  916. ContentType.Page,
  917. workspace,
  918. None,
  919. 'this is page1',
  920. do_save=True
  921. )
  922. page_3 = cont_api_a.create(
  923. ContentType.Thread,
  924. workspace,
  925. None,
  926. 'this is page2',
  927. do_save=True
  928. )
  929. page_4 = cont_api_a.create(
  930. ContentType.File,
  931. workspace,
  932. None,
  933. 'this is page3',
  934. do_save=True
  935. )
  936. for rev in page_2.revisions:
  937. eq_(user_b not in rev.read_by.keys(), True)
  938. for rev in page_3.revisions:
  939. eq_(user_b not in rev.read_by.keys(), True)
  940. for rev in page_4.revisions:
  941. eq_(user_b not in rev.read_by.keys(), True)
  942. self.session.refresh(page_2)
  943. self.session.refresh(page_3)
  944. self.session.refresh(page_4)
  945. cont_api_b.mark_read__all()
  946. for rev in page_2.revisions:
  947. eq_(user_b in rev.read_by.keys(), True)
  948. for rev in page_3.revisions:
  949. eq_(user_b in rev.read_by.keys(), True)
  950. for rev in page_4.revisions:
  951. eq_(user_b in rev.read_by.keys(), True)
  952. def test_update(self):
  953. uapi = UserApi(
  954. session=self.session,
  955. config=CFG(self.config.get_settings()),
  956. current_user=None,
  957. )
  958. group_api = GroupApi(current_user=None, session=self.session)
  959. groups = [group_api.get_one(Group.TIM_USER),
  960. group_api.get_one(Group.TIM_MANAGER),
  961. group_api.get_one(Group.TIM_ADMIN)]
  962. user1 = uapi.create_user(
  963. email='this.is@user',
  964. groups=groups,
  965. save_now=True
  966. )
  967. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  968. workspace = workspace_api.create_workspace(
  969. 'test workspace',
  970. save_now=True
  971. )
  972. wid = workspace.workspace_id
  973. user2 = uapi.create_user()
  974. user2.email = 'this.is@another.user'
  975. uapi.save(user2)
  976. RoleApi(
  977. current_user=user1,
  978. session=self.session
  979. ).create_one(
  980. user2,
  981. workspace,
  982. UserRoleInWorkspace.CONTENT_MANAGER,
  983. with_notif=False,
  984. flush=True
  985. )
  986. # Test starts here
  987. api = ContentApi(
  988. current_user=user1,
  989. session=self.session,
  990. )
  991. p = api.create(ContentType.Page, workspace, None,
  992. 'this_is_a_page', True)
  993. u1id = user1.user_id
  994. u2id = user2.user_id
  995. pcid = p.content_id
  996. poid = p.owner_id
  997. transaction.commit()
  998. # Refresh instances after commit
  999. user1 = uapi.get_one(u1id)
  1000. workspace = WorkspaceApi(
  1001. current_user=user1,
  1002. session=self.session
  1003. ).get_one(wid)
  1004. api = ContentApi(
  1005. current_user=user1,
  1006. session=self.session,
  1007. )
  1008. content = api.get_one(pcid, ContentType.Any, workspace)
  1009. eq_(u1id, content.owner_id)
  1010. eq_(poid, content.owner_id)
  1011. u2 = UserApi(
  1012. session=self.session,
  1013. config=self.config,
  1014. current_user=None,
  1015. ).get_one(u2id)
  1016. api2 = ContentApi(
  1017. current_user=u2,
  1018. session=self.session,
  1019. )
  1020. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1021. with new_revision(
  1022. session=self.session,
  1023. tm=transaction.manager,
  1024. content=content2,
  1025. ):
  1026. api2.update_content(
  1027. content2,
  1028. 'this is an updated page',
  1029. 'new content'
  1030. )
  1031. api2.save(content2)
  1032. transaction.commit()
  1033. # Refresh instances after commit
  1034. user1 = uapi.get_one(u1id)
  1035. workspace = WorkspaceApi(
  1036. current_user=user1,
  1037. session=self.session,
  1038. ).get_one(wid)
  1039. api = ContentApi(
  1040. current_user=user1,
  1041. session=self.session,
  1042. )
  1043. updated = api.get_one(pcid, ContentType.Any, workspace)
  1044. eq_(u2id, updated.owner_id,
  1045. 'the owner id should be {} (found {})'.format(u2id,
  1046. updated.owner_id))
  1047. eq_('this is an updated page', updated.label)
  1048. eq_('new content', updated.description)
  1049. eq_(ActionDescription.EDITION, updated.revision_type)
  1050. @raises(SameValueError)
  1051. def test_update_no_change(self):
  1052. uapi = UserApi(
  1053. session=self.session,
  1054. config=CFG(self.config.get_settings()),
  1055. current_user=None,
  1056. )
  1057. group_api = GroupApi(
  1058. current_user=None,
  1059. session=self.session
  1060. )
  1061. groups = [group_api.get_one(Group.TIM_USER),
  1062. group_api.get_one(Group.TIM_MANAGER),
  1063. group_api.get_one(Group.TIM_ADMIN)]
  1064. user1 = uapi.create_user(
  1065. email='this.is@user',
  1066. groups=groups,
  1067. save_now=True,
  1068. )
  1069. workspace = WorkspaceApi(
  1070. current_user=user1,
  1071. session=self.session,
  1072. ).create_workspace(
  1073. 'test workspace',
  1074. save_now=True
  1075. )
  1076. user2 = uapi.create_user()
  1077. user2.email = 'this.is@another.user'
  1078. uapi.save(user2)
  1079. RoleApi(
  1080. current_user=user1,
  1081. session=self.session
  1082. ).create_one(
  1083. user2,
  1084. workspace,
  1085. UserRoleInWorkspace.CONTENT_MANAGER,
  1086. with_notif=False,
  1087. flush=True
  1088. )
  1089. api = ContentApi(
  1090. current_user=user1,
  1091. session=self.session
  1092. )
  1093. with self.session.no_autoflush:
  1094. page = api.create(
  1095. content_type=ContentType.Page,
  1096. workspace=workspace,
  1097. label="same_content",
  1098. do_save=False
  1099. )
  1100. page.description = "Same_content_here"
  1101. api.save(page, ActionDescription.CREATION, do_notify=True)
  1102. transaction.commit()
  1103. api2 = ContentApi(
  1104. current_user=user2,
  1105. session=self.session,
  1106. )
  1107. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1108. with new_revision(
  1109. session=self.session,
  1110. tm=transaction.manager,
  1111. content=content2,
  1112. ):
  1113. api2.update_content(
  1114. item=content2,
  1115. new_label='same_content',
  1116. new_content='Same_content_here'
  1117. )
  1118. api2.save(content2)
  1119. transaction.commit()
  1120. def test_update_file_data(self):
  1121. uapi = UserApi(
  1122. session=self.session,
  1123. config=CFG(self.config.get_settings()),
  1124. current_user=None,
  1125. )
  1126. group_api = GroupApi(
  1127. current_user=None,
  1128. session=self.session
  1129. )
  1130. groups = [group_api.get_one(Group.TIM_USER),
  1131. group_api.get_one(Group.TIM_MANAGER),
  1132. group_api.get_one(Group.TIM_ADMIN)]
  1133. user1 = uapi.create_user(
  1134. email='this.is@user',
  1135. groups=groups,
  1136. save_now=True
  1137. )
  1138. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1139. workspace = workspace_api.create_workspace(
  1140. 'test workspace',
  1141. save_now=True
  1142. )
  1143. wid = workspace.workspace_id
  1144. user2 = uapi.create_user()
  1145. user2.email = 'this.is@another.user'
  1146. uapi.save(user2)
  1147. RoleApi(
  1148. current_user=user1,
  1149. session=self.session,
  1150. ).create_one(
  1151. user2,
  1152. workspace,
  1153. UserRoleInWorkspace.CONTENT_MANAGER,
  1154. with_notif=True,
  1155. flush=True
  1156. )
  1157. # Test starts here
  1158. api = ContentApi(
  1159. current_user=user1,
  1160. session=self.session
  1161. )
  1162. p = api.create(ContentType.File, workspace, None,
  1163. 'this_is_a_page', True)
  1164. u1id = user1.user_id
  1165. u2id = user2.user_id
  1166. pcid = p.content_id
  1167. poid = p.owner_id
  1168. api.save(p)
  1169. transaction.commit()
  1170. # Refresh instances after commit
  1171. user1 = uapi.get_one(u1id)
  1172. workspace_api2 = WorkspaceApi(current_user=user1, session=self.session)
  1173. workspace = workspace_api2.get_one(wid)
  1174. api = ContentApi(current_user=user1, session=self.session)
  1175. content = api.get_one(pcid, ContentType.Any, workspace)
  1176. eq_(u1id, content.owner_id)
  1177. eq_(poid, content.owner_id)
  1178. u2 = UserApi(
  1179. current_user=None,
  1180. session=self.session,
  1181. config=self.config,
  1182. ).get_one(u2id)
  1183. api2 = ContentApi(
  1184. current_user=u2,
  1185. session=self.session,
  1186. )
  1187. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1188. with new_revision(
  1189. session=self.session,
  1190. tm=transaction.manager,
  1191. content=content2,
  1192. ):
  1193. api2.update_file_data(content2, 'index.html', 'text/html',
  1194. b'<html>hello world</html>')
  1195. api2.save(content2)
  1196. transaction.commit()
  1197. # Refresh instances after commit
  1198. user1 = uapi.get_one(u1id)
  1199. workspace = WorkspaceApi(
  1200. current_user=user1,
  1201. session=self.session,
  1202. ).get_one(wid)
  1203. updated = api.get_one(pcid, ContentType.Any, workspace)
  1204. eq_(u2id, updated.owner_id,
  1205. 'the owner id should be {} (found {})'.format(u2id,
  1206. updated.owner_id))
  1207. eq_('this_is_a_page.html', updated.file_name)
  1208. eq_('text/html', updated.file_mimetype)
  1209. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1210. eq_(ActionDescription.REVISION, updated.revision_type)
  1211. @raises(SameValueError)
  1212. def test_update_no_change(self):
  1213. uapi = UserApi(
  1214. session=self.session,
  1215. config=CFG(self.config.get_settings()),
  1216. current_user=None,
  1217. )
  1218. group_api = GroupApi(
  1219. current_user=None,
  1220. session=self.session,
  1221. )
  1222. groups = [group_api.get_one(Group.TIM_USER),
  1223. group_api.get_one(Group.TIM_MANAGER),
  1224. group_api.get_one(Group.TIM_ADMIN)]
  1225. user1 = uapi.create_user(
  1226. email='this.is@user',
  1227. groups=groups,
  1228. save_now=True,
  1229. )
  1230. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1231. workspace = workspace_api.create_workspace(
  1232. 'test workspace',
  1233. save_now=True
  1234. )
  1235. user2 = uapi.create_user()
  1236. user2.email = 'this.is@another.user'
  1237. uapi.save(user2)
  1238. RoleApi(
  1239. current_user=user1,
  1240. session=self.session,
  1241. ).create_one(
  1242. user2,
  1243. workspace,
  1244. UserRoleInWorkspace.CONTENT_MANAGER,
  1245. with_notif=False,
  1246. flush=True
  1247. )
  1248. api = ContentApi(current_user=user1, session=self.session)
  1249. with self.session.no_autoflush:
  1250. page = api.create(
  1251. content_type=ContentType.Page,
  1252. workspace=workspace,
  1253. label="same_content",
  1254. do_save=False
  1255. )
  1256. api.update_file_data(
  1257. page,
  1258. 'index.html',
  1259. 'text/html',
  1260. b'<html>Same Content Here</html>'
  1261. )
  1262. api.save(page, ActionDescription.CREATION, do_notify=True)
  1263. transaction.commit()
  1264. api2 = ContentApi(current_user=user2, session=self.session)
  1265. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1266. with new_revision(
  1267. session=self.session,
  1268. tm=transaction.manager,
  1269. content=content2,
  1270. ):
  1271. api2.update_file_data(
  1272. page,
  1273. 'index.html',
  1274. 'text/html',
  1275. b'<html>Same Content Here</html>'
  1276. )
  1277. api2.save(content2)
  1278. transaction.commit()
  1279. def test_archive_unarchive(self):
  1280. uapi = UserApi(
  1281. session=self.session,
  1282. config=CFG(self.config.get_settings()),
  1283. current_user=None,
  1284. )
  1285. group_api = GroupApi(current_user=None, session=self.session)
  1286. groups = [group_api.get_one(Group.TIM_USER),
  1287. group_api.get_one(Group.TIM_MANAGER),
  1288. group_api.get_one(Group.TIM_ADMIN)]
  1289. user1 = uapi.create_user(
  1290. email='this.is@user',
  1291. groups=groups,
  1292. save_now=True
  1293. )
  1294. u1id = user1.user_id
  1295. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1296. workspace = workspace_api.create_workspace(
  1297. 'test workspace',
  1298. save_now=True
  1299. )
  1300. wid = workspace.workspace_id
  1301. user2 = uapi.create_user()
  1302. user2.email = 'this.is@another.user'
  1303. uapi.save(user2)
  1304. RoleApi(
  1305. current_user=user1,
  1306. session=self.session
  1307. ).create_one(
  1308. user2,
  1309. workspace,
  1310. UserRoleInWorkspace.CONTENT_MANAGER,
  1311. with_notif=True,
  1312. flush=True
  1313. )
  1314. # show archived is used at the top end of the test
  1315. api = ContentApi(
  1316. current_user=user1,
  1317. session=self.session,
  1318. show_archived=True,
  1319. )
  1320. p = api.create(ContentType.File, workspace, None,
  1321. 'this_is_a_page', True)
  1322. u1id = user1.user_id
  1323. u2id = user2.user_id
  1324. pcid = p.content_id
  1325. poid = p.owner_id
  1326. transaction.commit()
  1327. ####
  1328. # refresh after commit
  1329. user1 = UserApi(
  1330. current_user=None,
  1331. config=self.config,
  1332. session=self.session
  1333. ).get_one(u1id)
  1334. workspace = WorkspaceApi(
  1335. current_user=user1,
  1336. session=self.session
  1337. ).get_one(wid)
  1338. content = api.get_one(pcid, ContentType.Any, workspace)
  1339. eq_(u1id, content.owner_id)
  1340. eq_(poid, content.owner_id)
  1341. u2api = UserApi(
  1342. session=self.session,
  1343. config=CFG(self.config.get_settings()),
  1344. current_user=None,
  1345. )
  1346. u2 = u2api.get_one(u2id)
  1347. api2 = ContentApi(
  1348. current_user=u2,
  1349. session=self.session,
  1350. show_archived=True
  1351. )
  1352. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1353. with new_revision(
  1354. session=self.session,
  1355. tm=transaction.manager,
  1356. content=content2,
  1357. ):
  1358. api2.archive(content2)
  1359. api2.save(content2)
  1360. transaction.commit()
  1361. # refresh after commit
  1362. user1 = UserApi(
  1363. current_user=None,
  1364. session=self.session,
  1365. config=self.config,
  1366. ).get_one(u1id)
  1367. workspace = WorkspaceApi(
  1368. current_user=user1,
  1369. session=self.session,
  1370. ).get_one(wid)
  1371. u2 = UserApi(
  1372. current_user=None,
  1373. session=self.session,
  1374. config=self.config
  1375. ).get_one(u2id)
  1376. api = ContentApi(
  1377. current_user=user1,
  1378. session=self.session,
  1379. show_archived=True,
  1380. )
  1381. api2 = ContentApi(
  1382. current_user=u2,
  1383. session=self.session,
  1384. show_archived=True,
  1385. )
  1386. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1387. eq_(u2id, updated.owner_id,
  1388. 'the owner id should be {} (found {})'.format(u2id,
  1389. updated.owner_id))
  1390. eq_(True, updated.is_archived)
  1391. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1392. ####
  1393. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1394. with new_revision(
  1395. session=self.session,
  1396. tm=transaction.manager,
  1397. content=updated,
  1398. ):
  1399. api.unarchive(updated)
  1400. api.save(updated2)
  1401. eq_(False, updated2.is_archived)
  1402. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1403. eq_(u1id, updated2.owner_id)
  1404. def test_delete_undelete(self):
  1405. uapi = UserApi(
  1406. session=self.session,
  1407. config=CFG(self.config.get_settings()),
  1408. current_user=None,
  1409. )
  1410. group_api = GroupApi(
  1411. current_user=None,
  1412. session=self.session
  1413. )
  1414. groups = [group_api.get_one(Group.TIM_USER),
  1415. group_api.get_one(Group.TIM_MANAGER),
  1416. group_api.get_one(Group.TIM_ADMIN)]
  1417. user1 = uapi.create_user(
  1418. email='this.is@user',
  1419. groups=groups,
  1420. save_now=True
  1421. )
  1422. u1id = user1.user_id
  1423. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1424. workspace = workspace_api.create_workspace(
  1425. 'test workspace',
  1426. save_now=True
  1427. )
  1428. wid = workspace.workspace_id
  1429. user2 = uapi.create_user()
  1430. user2.email = 'this.is@another.user'
  1431. uapi.save(user2)
  1432. RoleApi(
  1433. current_user=user1,
  1434. session=self.session
  1435. ).create_one(
  1436. user2,
  1437. workspace,
  1438. UserRoleInWorkspace.CONTENT_MANAGER,
  1439. with_notif=True,
  1440. flush=True
  1441. )
  1442. # show archived is used at the top end of the test
  1443. api = ContentApi(
  1444. current_user=user1,
  1445. session=self.session,
  1446. show_deleted=True,
  1447. )
  1448. p = api.create(ContentType.File, workspace, None,
  1449. 'this_is_a_page', True)
  1450. u1id = user1.user_id
  1451. u2id = user2.user_id
  1452. pcid = p.content_id
  1453. poid = p.owner_id
  1454. transaction.commit()
  1455. ####
  1456. user1 = UserApi(
  1457. current_user=None,
  1458. session=self.session,
  1459. config=self.config,
  1460. ).get_one(u1id)
  1461. workspace = WorkspaceApi(
  1462. current_user=user1,
  1463. session=self.session,
  1464. ).get_one(wid)
  1465. content = api.get_one(pcid, ContentType.Any, workspace)
  1466. eq_(u1id, content.owner_id)
  1467. eq_(poid, content.owner_id)
  1468. u2 = UserApi(
  1469. current_user=None,
  1470. session=self.session,
  1471. config=self.config,
  1472. ).get_one(u2id)
  1473. api2 = ContentApi(
  1474. current_user=u2,
  1475. session=self.session,
  1476. show_deleted=True,
  1477. )
  1478. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1479. with new_revision(
  1480. session=self.session,
  1481. tm=transaction.manager,
  1482. content=content2,
  1483. ):
  1484. api2.delete(content2)
  1485. api2.save(content2)
  1486. transaction.commit()
  1487. ####
  1488. user1 = UserApi(
  1489. current_user=None,
  1490. session=self.session,
  1491. config=self.config,
  1492. ).get_one(u1id)
  1493. workspace = WorkspaceApi(
  1494. current_user=user1,
  1495. session=self.session,
  1496. ).get_one(wid)
  1497. # show archived is used at the top end of the test
  1498. api = ContentApi(
  1499. current_user=user1,
  1500. session=self.session,
  1501. show_deleted=True,
  1502. )
  1503. u2 = UserApi(
  1504. current_user=None,
  1505. session=self.session,
  1506. config=self.config,
  1507. ).get_one(u2id)
  1508. api2 = ContentApi(
  1509. current_user=u2,
  1510. session=self.session,
  1511. show_deleted=True
  1512. )
  1513. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1514. eq_(u2id, updated.owner_id,
  1515. 'the owner id should be {} (found {})'.format(u2id,
  1516. updated.owner_id))
  1517. eq_(True, updated.is_deleted)
  1518. eq_(ActionDescription.DELETION, updated.revision_type)
  1519. ####
  1520. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1521. with new_revision(
  1522. tm=transaction.manager,
  1523. session=self.session,
  1524. content=updated2,
  1525. ):
  1526. api.undelete(updated2)
  1527. api.save(updated2)
  1528. eq_(False, updated2.is_deleted)
  1529. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1530. eq_(u1id, updated2.owner_id)
  1531. def test_search_in_label(self):
  1532. # HACK - D.A. - 2015-03-09
  1533. # This test is based on a bug which does NOT return results found
  1534. # at root of a workspace (eg a folder)
  1535. uapi = UserApi(
  1536. session=self.session,
  1537. config=CFG(self.config.get_settings()),
  1538. current_user=None,
  1539. )
  1540. group_api = GroupApi(
  1541. current_user=None,
  1542. session=self.session,
  1543. )
  1544. groups = [group_api.get_one(Group.TIM_USER),
  1545. group_api.get_one(Group.TIM_MANAGER),
  1546. group_api.get_one(Group.TIM_ADMIN)]
  1547. user = uapi.create_user(email='this.is@user',
  1548. groups=groups, save_now=True)
  1549. workspace = WorkspaceApi(
  1550. current_user=user,
  1551. session=self.session
  1552. ).create_workspace(
  1553. 'test workspace',
  1554. save_now=True
  1555. )
  1556. api = ContentApi(
  1557. current_user=user,
  1558. session=self.session
  1559. )
  1560. a = api.create(ContentType.Folder, workspace, None,
  1561. 'this is randomized folder', True)
  1562. p = api.create(ContentType.Page, workspace, a,
  1563. 'this is randomized label content', True)
  1564. with new_revision(
  1565. session=self.session,
  1566. tm=transaction.manager,
  1567. content=p,
  1568. ):
  1569. p.description = 'This is some amazing test'
  1570. api.save(p)
  1571. original_id = p.content_id
  1572. res = api.search(['randomized'])
  1573. eq_(1, len(res.all()))
  1574. item = res.all()[0]
  1575. eq_(original_id, item.content_id)
  1576. def test_search_in_description(self):
  1577. # HACK - D.A. - 2015-03-09
  1578. # This test is based on a bug which does NOT return results found
  1579. # at root of a workspace (eg a folder)
  1580. uapi = UserApi(
  1581. session=self.session,
  1582. config=CFG(self.config.get_settings()),
  1583. current_user=None,
  1584. )
  1585. group_api = GroupApi(
  1586. current_user=None,
  1587. session=self.session,
  1588. )
  1589. groups = [group_api.get_one(Group.TIM_USER),
  1590. group_api.get_one(Group.TIM_MANAGER),
  1591. group_api.get_one(Group.TIM_ADMIN)]
  1592. user = uapi.create_user(email='this.is@user',
  1593. groups=groups, save_now=True)
  1594. workspace = WorkspaceApi(
  1595. current_user=user,
  1596. session=self.session
  1597. ).create_workspace(
  1598. 'test workspace',
  1599. save_now=True,
  1600. )
  1601. api = ContentApi(
  1602. current_user=user,
  1603. session=self.session
  1604. )
  1605. a = api.create(ContentType.Folder, workspace, None,
  1606. 'this is randomized folder', True)
  1607. p = api.create(ContentType.Page, workspace, a,
  1608. 'this is dummy label content', True)
  1609. with new_revision(
  1610. tm=transaction.manager,
  1611. session=self.session,
  1612. content=p,
  1613. ):
  1614. p.description = 'This is some amazing test'
  1615. api.save(p)
  1616. original_id = p.content_id
  1617. res = api.search(['dummy'])
  1618. eq_(1, len(res.all()))
  1619. item = res.all()[0]
  1620. eq_(original_id, item.content_id)
  1621. def test_search_in_label_or_description(self):
  1622. # HACK - D.A. - 2015-03-09
  1623. # This test is based on a bug which does NOT return results found
  1624. # at root of a workspace (eg a folder)
  1625. uapi = UserApi(
  1626. session=self.session,
  1627. config=CFG(self.config.get_settings()),
  1628. current_user=None,
  1629. )
  1630. group_api = GroupApi(current_user=None, session=self.session)
  1631. groups = [group_api.get_one(Group.TIM_USER),
  1632. group_api.get_one(Group.TIM_MANAGER),
  1633. group_api.get_one(Group.TIM_ADMIN)]
  1634. user = uapi.create_user(email='this.is@user',
  1635. groups=groups, save_now=True)
  1636. workspace = WorkspaceApi(
  1637. current_user=user,
  1638. session=self.session
  1639. ).create_workspace('test workspace', save_now=True)
  1640. api = ContentApi(
  1641. current_user=user,
  1642. session=self.session
  1643. )
  1644. a = api.create(ContentType.Folder, workspace, None,
  1645. 'this is randomized folder', True)
  1646. p1 = api.create(ContentType.Page, workspace, a,
  1647. 'this is dummy label content', True)
  1648. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1649. with new_revision(
  1650. session=self.session,
  1651. tm=transaction.manager,
  1652. content=p1,
  1653. ):
  1654. p1.description = 'This is some amazing test'
  1655. with new_revision(
  1656. session=self.session,
  1657. tm=transaction.manager,
  1658. content=p2,
  1659. ):
  1660. p2.description = 'What\'s up ?'
  1661. api.save(p1)
  1662. api.save(p2)
  1663. id1 = p1.content_id
  1664. id2 = p2.content_id
  1665. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1666. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1667. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1668. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1669. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1670. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
  1671. res = api.search(['dummy', 'jon'])
  1672. eq_(2, len(res.all()))
  1673. eq_(True, id1 in [o.content_id for o in res.all()])
  1674. eq_(True, id2 in [o.content_id for o in res.all()])
  1675. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self): # nopep8
  1676. admin = self.session.query(User)\
  1677. .filter(User.email == 'admin@admin.admin').one()
  1678. workspace = self._create_workspace_and_test(
  1679. 'workspace_1',
  1680. admin
  1681. )
  1682. folder_1 = self._create_content_and_test(
  1683. 'folder_1',
  1684. workspace=workspace,
  1685. type=ContentType.Folder
  1686. )
  1687. folder_2 = self._create_content_and_test(
  1688. 'folder_2',
  1689. workspace=workspace,
  1690. type=ContentType.Folder
  1691. )
  1692. page_1 = self._create_content_and_test(
  1693. 'foo', workspace=workspace,
  1694. type=ContentType.Page,
  1695. parent=folder_1
  1696. )
  1697. page_2 = self._create_content_and_test(
  1698. 'bar',
  1699. workspace=workspace,
  1700. type=ContentType.Page,
  1701. parent=folder_2
  1702. )
  1703. api = ContentApi(
  1704. current_user=admin,
  1705. session=self.session,
  1706. )
  1707. foo_result = api.search(['foo']).all()
  1708. eq_(1, len(foo_result))
  1709. ok_(page_1 in foo_result)
  1710. bar_result = api.search(['bar']).all()
  1711. eq_(1, len(bar_result))
  1712. ok_(page_2 in bar_result)
  1713. with new_revision(
  1714. session=self.session,
  1715. tm=transaction.manager,
  1716. content=folder_1,
  1717. ):
  1718. api.delete(folder_1)
  1719. with new_revision(
  1720. session=self.session,
  1721. tm=transaction.manager,
  1722. content=folder_2,
  1723. ):
  1724. api.archive(folder_2)
  1725. # Actually ContentApi.search don't filter it
  1726. foo_result = api.search(['foo']).all()
  1727. eq_(1, len(foo_result))
  1728. ok_(page_1 in foo_result)
  1729. bar_result = api.search(['bar']).all()
  1730. eq_(1, len(bar_result))
  1731. ok_(page_2 in bar_result)
  1732. # ContentApi offer exclude_unavailable method to do it
  1733. foo_result = api.search(['foo']).all()
  1734. api.exclude_unavailable(foo_result)
  1735. eq_(0, len(foo_result))
  1736. bar_result = api.search(['bar']).all()
  1737. api.exclude_unavailable(bar_result)
  1738. eq_(0, len(bar_result))
  1739. class TestContentApiSecurity(DefaultTest):
  1740. fixtures = [FixtureTest, ]
  1741. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1742. admin = self.session.query(User)\
  1743. .filter(User.email == 'admin@admin.admin').one()
  1744. bob = self.session.query(User)\
  1745. .filter(User.email == 'bob@fsf.local').one()
  1746. bob_workspace = WorkspaceApi(
  1747. current_user=bob,
  1748. session=self.session,
  1749. ).create_workspace(
  1750. 'bob_workspace',
  1751. save_now=True,
  1752. )
  1753. admin_workspace = WorkspaceApi(
  1754. current_user=admin,
  1755. session=self.session,
  1756. ).create_workspace(
  1757. 'admin_workspace',
  1758. save_now=True,
  1759. )
  1760. bob_page = ContentApi(
  1761. current_user=bob,
  1762. session=self.session
  1763. ).create(
  1764. content_type=ContentType.Page,
  1765. workspace=bob_workspace,
  1766. label='bob_page',
  1767. do_save=True,
  1768. )
  1769. admin_page = ContentApi(
  1770. current_user=admin,
  1771. session=self.session
  1772. ).create(
  1773. content_type=ContentType.Page,
  1774. workspace=admin_workspace,
  1775. label='admin_page',
  1776. do_save=True,
  1777. )
  1778. bob_viewable = ContentApi(
  1779. current_user=bob,
  1780. session=self.session
  1781. ).get_all()
  1782. eq_(1, len(bob_viewable), 'Bob should view only one content')
  1783. eq_(
  1784. 'bob_page',
  1785. bob_viewable[0].label,
  1786. 'Bob should not view "{0}" content'.format(
  1787. bob_viewable[0].label,
  1788. )
  1789. )