test_content_api.py 61KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816
  1. # -*- coding: utf-8 -*-
  2. import datetime
  3. from nose.tools import eq_, ok_
  4. from nose.tools import raises
  5. from depot.io.utils import FileIntent
  6. import transaction
  7. from tracim.config import CFG
  8. from tracim.lib.content import compare_content_for_sorting_by_type_and_name
  9. from tracim.lib.content import ContentApi
  10. # TODO - G.M - 28-03-2018 - [GroupApi] Re-enable GroupApi
  11. from tracim.lib.group import GroupApi
  12. from tracim.lib.user import UserApi
  13. from tracim.exceptions import SameValueError
  14. # TODO - G.M - 28-03-2018 - [RoleApi] Re-enable RoleApi
  15. from tracim.lib.workspace import RoleApi
  16. # TODO - G.M - 28-03-2018 - [WorkspaceApi] Re-enable WorkspaceApi
  17. from tracim.lib.workspace import WorkspaceApi
  18. from tracim.models.revision_protection import new_revision
  19. from tracim.models.auth import User
  20. from tracim.models.auth import Group
  21. from tracim.models.data import ActionDescription
  22. from tracim.models.data import ContentRevisionRO
  23. from tracim.models.data import Workspace
  24. from tracim.models.data import Content
  25. from tracim.models.data import ContentType
  26. from tracim.models.data import UserRoleInWorkspace
  27. from tracim.fixtures.users_and_groups import Test as TestFixture
  28. from tracim.tests import DefaultTest
  29. class TestContentApi(DefaultTest):
  30. def test_compare_content_for_sorting_by_type(self):
  31. c1 = Content()
  32. c1.label = ''
  33. c1.type = 'file'
  34. c2 = Content()
  35. c2.label = ''
  36. c2.type = 'folder'
  37. c11 = c1
  38. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  39. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  40. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  41. def test_compare_content_for_sorting_by_label(self):
  42. c1 = Content()
  43. c1.label = 'bbb'
  44. c1.type = 'file'
  45. c2 = Content()
  46. c2.label = 'aaa'
  47. c2.type = 'file'
  48. c11 = c1
  49. eq_(1, compare_content_for_sorting_by_type_and_name(c1, c2))
  50. eq_(-1, compare_content_for_sorting_by_type_and_name(c2, c1))
  51. eq_(0, compare_content_for_sorting_by_type_and_name(c1, c11))
  52. def test_sort_by_label_or_filename(self):
  53. c1 = Content()
  54. c1.label = 'ABCD'
  55. c1.type = 'file'
  56. c2 = Content()
  57. c2.label = ''
  58. c2.type = 'file'
  59. c2.file_name = 'AABC'
  60. c3 = Content()
  61. c3.label = 'BCDE'
  62. c3.type = 'file'
  63. items = [c1, c2, c3]
  64. sorteds = ContentApi.sort_content(items)
  65. eq_(sorteds[0], c2)
  66. eq_(sorteds[1], c1)
  67. eq_(sorteds[2], c3)
  68. def test_sort_by_content_type(self):
  69. c1 = Content()
  70. c1.label = 'AAAA'
  71. c1.type = 'file'
  72. c2 = Content()
  73. c2.label = 'BBBB'
  74. c2.type = 'folder'
  75. items = [c1, c2]
  76. sorteds = ContentApi.sort_content(items)
  77. eq_(sorteds[0], c2,
  78. 'value is {} instead of {}'.format(sorteds[0].content_id,
  79. c2.content_id))
  80. eq_(sorteds[1], c1,
  81. 'value is {} instead of {}'.format(sorteds[1].content_id,
  82. c1.content_id))
  83. def test_delete(self):
  84. uapi = UserApi(
  85. session=self.session,
  86. config=CFG(self.config.get_settings()),
  87. current_user=None,
  88. )
  89. group_api = GroupApi(current_user=None,session=self.session)
  90. groups = [group_api.get_one(Group.TIM_USER),
  91. group_api.get_one(Group.TIM_MANAGER),
  92. group_api.get_one(Group.TIM_ADMIN)]
  93. user = uapi.create_user(email='this.is@user',
  94. groups=groups, save_now=True)
  95. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  96. save_now=True)
  97. api = ContentApi(
  98. current_user=user,
  99. session=self.session,
  100. )
  101. item = api.create(ContentType.Folder, workspace, None,
  102. 'not_deleted', True)
  103. item2 = api.create(ContentType.Folder, workspace, None,
  104. 'to_delete', True)
  105. uid = user.user_id
  106. wid = workspace.workspace_id
  107. transaction.commit()
  108. # Refresh instances after commit
  109. user = uapi.get_one(uid)
  110. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  111. workspace= workspace_api.get_one(wid)
  112. api = ContentApi(current_user=user, session=self.session)
  113. items = api.get_all(None, ContentType.Any, workspace)
  114. eq_(2, len(items))
  115. items = api.get_all(None, ContentType.Any, workspace)
  116. with new_revision(
  117. session=self.session,
  118. tm=transaction.manager,
  119. content=items[0]
  120. ):
  121. api.delete(items[0])
  122. transaction.commit()
  123. # Refresh instances after commit
  124. user = uapi.get_one(uid)
  125. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  126. workspace= workspace_api.get_one(wid)
  127. api = ContentApi(
  128. current_user=user,
  129. session=self.session
  130. )
  131. items = api.get_all(None, ContentType.Any, workspace)
  132. eq_(1, len(items))
  133. transaction.commit()
  134. # Test that the item is still available if "show deleted" is activated
  135. # Refresh instances after commit
  136. user = uapi.get_one(uid)
  137. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  138. api = ContentApi(
  139. current_user=user,
  140. session=self.session,
  141. show_deleted=True
  142. )
  143. items = api.get_all(None, ContentType.Any, workspace)
  144. eq_(2, len(items))
  145. def test_archive(self):
  146. uapi = UserApi(
  147. session=self.session,
  148. config=CFG(self.config.get_settings()),
  149. current_user=None,
  150. )
  151. group_api = GroupApi(current_user=None,session=self.session)
  152. groups = [group_api.get_one(Group.TIM_USER),
  153. group_api.get_one(Group.TIM_MANAGER),
  154. group_api.get_one(Group.TIM_ADMIN)]
  155. user = uapi.create_user(email='this.is@user',
  156. groups=groups, save_now=True)
  157. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  158. workspace = workspace_api.create_workspace(
  159. 'test workspace',
  160. save_now=True
  161. )
  162. api = ContentApi(
  163. current_user=user,
  164. session=self.session,
  165. )
  166. item = api.create(ContentType.Folder, workspace, None,
  167. 'not_archived', True)
  168. item2 = api.create(ContentType.Folder, workspace, None,
  169. 'to_archive', True)
  170. uid = user.user_id
  171. wid = workspace.workspace_id
  172. transaction.commit()
  173. # Refresh instances after commit
  174. user = uapi.get_one(uid)
  175. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  176. api = ContentApi(
  177. session=self.session,
  178. current_user=user,
  179. )
  180. items = api.get_all(None, ContentType.Any, workspace)
  181. eq_(2, len(items))
  182. items = api.get_all(None, ContentType.Any, workspace)
  183. with new_revision(
  184. session=self.session,
  185. tm=transaction.manager,
  186. content=items[0],
  187. ):
  188. api.archive(items[0])
  189. transaction.commit()
  190. # Refresh instances after commit
  191. user = uapi.get_one(uid)
  192. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  193. workspace= workspace_api.get_one(wid)
  194. api = ContentApi(
  195. current_user=user,
  196. session=self.session
  197. )
  198. items = api.get_all(None, ContentType.Any, workspace)
  199. eq_(1, len(items))
  200. transaction.commit()
  201. # Refresh instances after commit
  202. user = uapi.get_one(uid)
  203. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  204. workspace= workspace_api.get_one(wid)
  205. api = ContentApi(
  206. current_user=user,
  207. session=self.session
  208. )
  209. # Test that the item is still available if "show deleted" is activated
  210. api = ContentApi(
  211. current_user=None,
  212. session=self.session,
  213. show_archived=True
  214. )
  215. items = api.get_all(None, ContentType.Any, workspace)
  216. eq_(2, len(items))
  217. def test_get_all_with_filter(self):
  218. uapi = UserApi(
  219. session=self.session,
  220. config=CFG(self.config.get_settings()),
  221. current_user=None,
  222. )
  223. group_api = GroupApi(current_user=None,session=self.session)
  224. groups = [group_api.get_one(Group.TIM_USER),
  225. group_api.get_one(Group.TIM_MANAGER),
  226. group_api.get_one(Group.TIM_ADMIN)]
  227. user = uapi.create_user(email='this.is@user',
  228. groups=groups, save_now=True)
  229. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  230. save_now=True)
  231. api = ContentApi(
  232. current_user=user,
  233. session=self.session,
  234. )
  235. item = api.create(ContentType.Folder, workspace, None,
  236. 'thefolder', True)
  237. item2 = api.create(ContentType.File, workspace, None, 'thefile', True)
  238. uid = user.user_id
  239. wid = workspace.workspace_id
  240. transaction.commit()
  241. # Refresh instances after commit
  242. user = uapi.get_one(uid)
  243. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  244. workspace= workspace_api.get_one(wid)
  245. api = ContentApi(
  246. current_user=user,
  247. session=self.session
  248. )
  249. items = api.get_all(None, ContentType.Any, workspace)
  250. eq_(2, len(items))
  251. items2 = api.get_all(None, ContentType.File, workspace)
  252. eq_(1, len(items2))
  253. eq_('thefile', items2[0].label)
  254. items3 = api.get_all(None, ContentType.Folder, workspace)
  255. eq_(1, len(items3))
  256. eq_('thefolder', items3[0].label)
  257. def test_get_all_with_parent_id(self):
  258. uapi = UserApi(
  259. session=self.session,
  260. config=CFG(self.config.get_settings()),
  261. current_user=None,
  262. )
  263. group_api = GroupApi(current_user=None,session=self.session)
  264. groups = [group_api.get_one(Group.TIM_USER),
  265. group_api.get_one(Group.TIM_MANAGER),
  266. group_api.get_one(Group.TIM_ADMIN)]
  267. user = uapi.create_user(email='this.is@user',
  268. groups=groups, save_now=True)
  269. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  270. save_now=True)
  271. api = ContentApi(
  272. current_user=user,
  273. session=self.session
  274. )
  275. item = api.create(
  276. ContentType.Folder,
  277. workspace,
  278. None,
  279. 'parent',
  280. do_save=True,
  281. )
  282. item2 = api.create(
  283. ContentType.File,
  284. workspace,
  285. item,
  286. 'file1',
  287. do_save=True,
  288. )
  289. item3 = api.create(
  290. ContentType.File,
  291. workspace,
  292. None,
  293. 'file2',
  294. do_save=True,
  295. )
  296. parent_id = item.content_id
  297. child_id = item2.content_id
  298. uid = user.user_id
  299. wid = workspace.workspace_id
  300. transaction.commit()
  301. # Refresh instances after commit
  302. user = uapi.get_one(uid)
  303. workspace_api = WorkspaceApi(current_user=user, session=self.session)
  304. workspace= workspace_api.get_one(wid)
  305. api = ContentApi(
  306. current_user=user,
  307. session=self.session
  308. )
  309. items = api.get_all(None, ContentType.Any, workspace)
  310. eq_(3, len(items))
  311. items2 = api.get_all(parent_id, ContentType.File, workspace)
  312. eq_(1, len(items2))
  313. eq_(child_id, items2[0].content_id)
  314. @raises(ValueError)
  315. def test_set_status_unknown_status(self):
  316. uapi = UserApi(
  317. session=self.session,
  318. config=CFG(self.config.get_settings()),
  319. current_user=None,
  320. )
  321. group_api = GroupApi(current_user=None,session=self.session)
  322. groups = [group_api.get_one(Group.TIM_USER),
  323. group_api.get_one(Group.TIM_MANAGER),
  324. group_api.get_one(Group.TIM_ADMIN)]
  325. user = uapi.create_user(email='this.is@user',
  326. groups=groups, save_now=True)
  327. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  328. save_now=True)
  329. api = ContentApi(
  330. current_user=user,
  331. session=self.session
  332. )
  333. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  334. with new_revision(
  335. session=self.session,
  336. tm=transaction.manager,
  337. content=c,
  338. ):
  339. api.set_status(c, 'unknown-status')
  340. def test_set_status_ok(self):
  341. uapi = UserApi(
  342. session=self.session,
  343. config=CFG(self.config.get_settings()),
  344. current_user=None,
  345. )
  346. group_api = GroupApi(current_user=None,session=self.session)
  347. groups = [group_api.get_one(Group.TIM_USER),
  348. group_api.get_one(Group.TIM_MANAGER),
  349. group_api.get_one(Group.TIM_ADMIN)]
  350. user = uapi.create_user(email='this.is@user',
  351. groups=groups, save_now=True)
  352. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  353. save_now=True)
  354. api = ContentApi(
  355. current_user=user,
  356. session=self.session
  357. )
  358. c = api.create(ContentType.Folder, workspace, None, 'parent', True)
  359. with new_revision(
  360. session=self.session,
  361. tm=transaction.manager,
  362. content=c,
  363. ):
  364. for new_status in ['open', 'closed-validated', 'closed-unvalidated',
  365. 'closed-deprecated']:
  366. api.set_status(c, new_status)
  367. eq_(new_status, c.status)
  368. eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
  369. def test_create_comment_ok(self):
  370. uapi = UserApi(
  371. session=self.session,
  372. config=CFG(self.config.get_settings()),
  373. current_user=None,
  374. )
  375. group_api = GroupApi(current_user=None,session=self.session)
  376. groups = [group_api.get_one(Group.TIM_USER),
  377. group_api.get_one(Group.TIM_MANAGER),
  378. group_api.get_one(Group.TIM_ADMIN)]
  379. user = uapi.create_user(email='this.is@user',
  380. groups=groups, save_now=True)
  381. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  382. save_now=True)
  383. api = ContentApi(
  384. current_user=user,
  385. session=self.session
  386. )
  387. p = api.create(ContentType.Page, workspace, None, 'this_is_a_page')
  388. c = api.create_comment(workspace, p, 'this is the comment', True)
  389. eq_(Content, c.__class__)
  390. eq_(p.content_id, c.parent_id)
  391. eq_(user, c.owner)
  392. eq_(workspace, c.workspace)
  393. eq_(ContentType.Comment, c.type)
  394. eq_('this is the comment', c.description)
  395. eq_('', c.label)
  396. eq_(ActionDescription.COMMENT, c.revision_type)
  397. def test_unit_copy_file_different_label_different_parent_ok(self):
  398. uapi = UserApi(
  399. session=self.session,
  400. config=CFG(self.config.get_settings()),
  401. current_user=None,
  402. )
  403. group_api = GroupApi(current_user=None,session=self.session)
  404. groups = [group_api.get_one(Group.TIM_USER),
  405. group_api.get_one(Group.TIM_MANAGER),
  406. group_api.get_one(Group.TIM_ADMIN)]
  407. user = uapi.create_user(
  408. email='user1@user',
  409. groups=groups,
  410. save_now=True
  411. )
  412. user2 = uapi.create_user(
  413. email='user2@user',
  414. groups=groups,
  415. save_now=True
  416. )
  417. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace(
  418. 'test workspace',
  419. save_now=True
  420. )
  421. RoleApi(current_user=user, session=self.session).create_one(
  422. user2,
  423. workspace,
  424. UserRoleInWorkspace.WORKSPACE_MANAGER,
  425. with_notif=False
  426. )
  427. api = ContentApi(
  428. current_user=user,
  429. session=self.session
  430. )
  431. foldera = api.create(
  432. ContentType.Folder,
  433. workspace,
  434. None,
  435. 'folder a',
  436. True
  437. )
  438. with self.session.no_autoflush:
  439. text_file = api.create(
  440. content_type=ContentType.File,
  441. workspace=workspace,
  442. parent=foldera,
  443. label='test_file',
  444. do_save=False,
  445. )
  446. api.update_file_data(
  447. text_file,
  448. 'test_file',
  449. 'text/plain',
  450. b'test_content'
  451. )
  452. api.save(text_file, ActionDescription.CREATION)
  453. api2 = ContentApi(current_user=user2, session=self.session)
  454. workspace2 = WorkspaceApi(
  455. current_user=user2,
  456. session=self.session,
  457. ).create_workspace(
  458. 'test workspace2',
  459. save_now=True
  460. )
  461. folderb = api2.create(
  462. ContentType.Folder,
  463. workspace2,
  464. None,
  465. 'folder b',
  466. True
  467. )
  468. api2.copy(
  469. item=text_file,
  470. new_parent=folderb,
  471. new_label='test_file_copy'
  472. )
  473. transaction.commit()
  474. text_file_copy = api2.get_one_by_label_and_parent(
  475. 'test_file_copy',
  476. folderb,
  477. )
  478. assert text_file != text_file_copy
  479. assert text_file_copy.content_id != text_file.content_id
  480. assert text_file_copy.workspace_id == workspace2.workspace_id
  481. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  482. assert text_file_copy.depot_file.path != text_file.depot_file.path
  483. assert text_file_copy.label == 'test_file_copy'
  484. assert text_file_copy.type == text_file.type
  485. assert text_file_copy.parent.content_id == folderb.content_id
  486. assert text_file_copy.owner.user_id == user.user_id
  487. assert text_file_copy.description == text_file.description
  488. assert text_file_copy.file_extension == text_file.file_extension
  489. assert text_file_copy.file_mimetype == text_file.file_mimetype
  490. assert text_file_copy.revision_type == ActionDescription.COPY
  491. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  492. def test_unit_copy_file__same_label_different_parent_ok(self):
  493. uapi = UserApi(
  494. session=self.session,
  495. config=CFG(self.config.get_settings()),
  496. current_user=None,
  497. )
  498. group_api = GroupApi(current_user=None,session=self.session)
  499. groups = [group_api.get_one(Group.TIM_USER),
  500. group_api.get_one(Group.TIM_MANAGER),
  501. group_api.get_one(Group.TIM_ADMIN)]
  502. user = uapi.create_user(
  503. email='user1@user',
  504. groups=groups,
  505. save_now=True
  506. )
  507. user2 = uapi.create_user(
  508. email='user2@user',
  509. groups=groups,
  510. save_now=True
  511. )
  512. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace(
  513. 'test workspace',
  514. save_now=True
  515. )
  516. RoleApi(current_user=user, session=self.session).create_one(
  517. user2,
  518. workspace,
  519. UserRoleInWorkspace.WORKSPACE_MANAGER,
  520. with_notif=False
  521. )
  522. api = ContentApi(
  523. current_user=user,
  524. session=self.session
  525. )
  526. foldera = api.create(
  527. ContentType.Folder,
  528. workspace,
  529. None,
  530. 'folder a',
  531. True
  532. )
  533. with self.session.no_autoflush:
  534. text_file = api.create(
  535. content_type=ContentType.File,
  536. workspace=workspace,
  537. parent=foldera,
  538. label='test_file',
  539. do_save=False,
  540. )
  541. api.update_file_data(
  542. text_file,
  543. 'test_file',
  544. 'text/plain',
  545. b'test_content'
  546. )
  547. api.save(text_file, ActionDescription.CREATION)
  548. api2 = ContentApi(
  549. current_user=user2,
  550. session=self.session,
  551. )
  552. workspace2 = WorkspaceApi(
  553. current_user=user2,
  554. session=self.session
  555. ).create_workspace(
  556. 'test workspace2',
  557. save_now=True
  558. )
  559. folderb = api2.create(
  560. ContentType.Folder,
  561. workspace2,
  562. None,
  563. 'folder b',
  564. True
  565. )
  566. api2.copy(
  567. item=text_file,
  568. new_parent=folderb,
  569. )
  570. transaction.commit()
  571. text_file_copy = api2.get_one_by_label_and_parent(
  572. 'test_file',
  573. folderb,
  574. )
  575. assert text_file != text_file_copy
  576. assert text_file_copy.content_id != text_file.content_id
  577. assert text_file_copy.workspace_id == workspace2.workspace_id
  578. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  579. assert text_file_copy.depot_file.path != text_file.depot_file.path
  580. assert text_file_copy.label == text_file.label
  581. assert text_file_copy.type == text_file.type
  582. assert text_file_copy.parent.content_id == folderb.content_id
  583. assert text_file_copy.owner.user_id == user.user_id
  584. assert text_file_copy.description == text_file.description
  585. assert text_file_copy.file_extension == text_file.file_extension
  586. assert text_file_copy.file_mimetype == text_file.file_mimetype
  587. assert text_file_copy.revision_type == ActionDescription.COPY
  588. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  589. def test_unit_copy_file_different_label_same_parent_ok(self):
  590. uapi = UserApi(
  591. session=self.session,
  592. config=CFG(self.config.get_settings()),
  593. current_user=None,
  594. )
  595. group_api = GroupApi(current_user=None,session=self.session)
  596. groups = [group_api.get_one(Group.TIM_USER),
  597. group_api.get_one(Group.TIM_MANAGER),
  598. group_api.get_one(Group.TIM_ADMIN)]
  599. user = uapi.create_user(
  600. email='user1@user',
  601. groups=groups,
  602. save_now=True,
  603. )
  604. user2 = uapi.create_user(
  605. email='user2@user',
  606. groups=groups,
  607. save_now=True
  608. )
  609. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace(
  610. 'test workspace',
  611. save_now=True
  612. )
  613. RoleApi(current_user=user, session=self.session).create_one(
  614. user2, workspace,
  615. UserRoleInWorkspace.WORKSPACE_MANAGER,
  616. with_notif=False
  617. )
  618. api = ContentApi(
  619. current_user=user,
  620. session=self.session
  621. )
  622. foldera = api.create(
  623. ContentType.Folder,
  624. workspace,
  625. None,
  626. 'folder a',
  627. True
  628. )
  629. with self.session.no_autoflush:
  630. text_file = api.create(
  631. content_type=ContentType.File,
  632. workspace=workspace,
  633. parent=foldera,
  634. label='test_file',
  635. do_save=False,
  636. )
  637. api.update_file_data(
  638. text_file,
  639. 'test_file',
  640. 'text/plain',
  641. b'test_content'
  642. )
  643. api.save(
  644. text_file,
  645. ActionDescription.CREATION
  646. )
  647. api2 = ContentApi(current_user=user2, session=self.session)
  648. api2.copy(
  649. item=text_file,
  650. new_label='test_file_copy'
  651. )
  652. transaction.commit()
  653. text_file_copy = api2.get_one_by_label_and_parent(
  654. 'test_file_copy',
  655. foldera,
  656. )
  657. assert text_file != text_file_copy
  658. assert text_file_copy.content_id != text_file.content_id
  659. assert text_file_copy.workspace_id == workspace.workspace_id
  660. assert text_file_copy.depot_file.file.read() == text_file.depot_file.file.read() # nopep8
  661. assert text_file_copy.depot_file.path != text_file.depot_file.path
  662. assert text_file_copy.label == 'test_file_copy'
  663. assert text_file_copy.type == text_file.type
  664. assert text_file_copy.parent.content_id == foldera.content_id
  665. assert text_file_copy.owner.user_id == user.user_id
  666. assert text_file_copy.description == text_file.description
  667. assert text_file_copy.file_extension == text_file.file_extension
  668. assert text_file_copy.file_mimetype == text_file.file_mimetype
  669. assert text_file_copy.revision_type == ActionDescription.COPY
  670. assert len(text_file_copy.revisions) == len(text_file.revisions) + 1
  671. def test_mark_read__workspace(self):
  672. uapi = UserApi(
  673. session=self.session,
  674. config=CFG(self.config.get_settings()),
  675. current_user=None,
  676. )
  677. group_api = GroupApi(current_user=None,session=self.session)
  678. groups = [group_api.get_one(Group.TIM_USER),
  679. group_api.get_one(Group.TIM_MANAGER),
  680. group_api.get_one(Group.TIM_ADMIN)]
  681. user_a = uapi.create_user(email='this.is@user',
  682. groups=groups, save_now=True)
  683. user_b = uapi.create_user(email='this.is@another.user',
  684. groups=groups, save_now=True)
  685. wapi = WorkspaceApi(
  686. current_user=user_a,
  687. session=self.session,
  688. )
  689. workspace1 = wapi.create_workspace(
  690. 'test workspace n°1',
  691. save_now=True)
  692. workspace2 = wapi.create_workspace(
  693. 'test workspace n°2',
  694. save_now=True)
  695. role_api1 = RoleApi(
  696. current_user=user_a,
  697. session=self.session,
  698. )
  699. role_api1.create_one(user_b, workspace1, UserRoleInWorkspace.READER,
  700. False)
  701. role_api2 = RoleApi(
  702. current_user=user_b,
  703. session=self.session,
  704. )
  705. role_api2.create_one(user_b, workspace2, UserRoleInWorkspace.READER,
  706. False)
  707. cont_api_a = ContentApi(
  708. current_user=user_a,
  709. session=self.session,
  710. )
  711. cont_api_b = ContentApi(
  712. current_user=user_b,
  713. session=self.session,
  714. )
  715. # Creates page_1 & page_2 in workspace 1
  716. # and page_3 & page_4 in workspace 2
  717. page_1 = cont_api_a.create(ContentType.Page, workspace1, None,
  718. 'this is a page', do_save=True)
  719. page_2 = cont_api_a.create(ContentType.Page, workspace1, None,
  720. 'this is page1', do_save=True)
  721. page_3 = cont_api_a.create(ContentType.Thread, workspace2, None,
  722. 'this is page2', do_save=True)
  723. page_4 = cont_api_a.create(ContentType.File, workspace2, None,
  724. 'this is page3', do_save=True)
  725. for rev in page_1.revisions:
  726. eq_(user_b not in rev.read_by.keys(), True)
  727. for rev in page_2.revisions:
  728. eq_(user_b not in rev.read_by.keys(), True)
  729. for rev in page_3.revisions:
  730. eq_(user_b not in rev.read_by.keys(), True)
  731. for rev in page_4.revisions:
  732. eq_(user_b not in rev.read_by.keys(), True)
  733. # Set as read the workspace n°1
  734. cont_api_b.mark_read__workspace(workspace=workspace1)
  735. for rev in page_1.revisions:
  736. eq_(user_b in rev.read_by.keys(), True)
  737. for rev in page_2.revisions:
  738. eq_(user_b in rev.read_by.keys(), True)
  739. for rev in page_3.revisions:
  740. eq_(user_b not in rev.read_by.keys(), True)
  741. for rev in page_4.revisions:
  742. eq_(user_b not in rev.read_by.keys(), True)
  743. # Set as read the workspace n°2
  744. cont_api_b.mark_read__workspace(workspace=workspace2)
  745. for rev in page_1.revisions:
  746. eq_(user_b in rev.read_by.keys(), True)
  747. for rev in page_2.revisions:
  748. eq_(user_b in rev.read_by.keys(), True)
  749. for rev in page_3.revisions:
  750. eq_(user_b in rev.read_by.keys(), True)
  751. for rev in page_4.revisions:
  752. eq_(user_b in rev.read_by.keys(), True)
  753. def test_mark_read(self):
  754. uapi = UserApi(
  755. session=self.session,
  756. config=CFG(self.config.get_settings()),
  757. current_user=None,
  758. )
  759. group_api = GroupApi(current_user=None,session=self.session)
  760. groups = [group_api.get_one(Group.TIM_USER),
  761. group_api.get_one(Group.TIM_MANAGER),
  762. group_api.get_one(Group.TIM_ADMIN)]
  763. user_a = uapi.create_user(email='this.is@user',
  764. groups=groups, save_now=True)
  765. user_b = uapi.create_user(email='this.is@another.user',
  766. groups=groups, save_now=True)
  767. wapi = WorkspaceApi(current_user=user_a, session=self.session)
  768. workspace_api = WorkspaceApi(current_user=user_a, session=self.session)
  769. workspace = wapi.create_workspace(
  770. 'test workspace',
  771. save_now=True)
  772. role_api = RoleApi(
  773. current_user=user_a,
  774. session=self.session,
  775. )
  776. role_api.create_one(user_b, workspace, UserRoleInWorkspace.READER, False)
  777. cont_api_a = ContentApi(
  778. current_user=user_a,
  779. session=self.session,
  780. )
  781. cont_api_b = ContentApi(
  782. current_user=user_b,
  783. session=self.session,
  784. )
  785. page_1 = cont_api_a.create(ContentType.Page, workspace, None,
  786. 'this is a page', do_save=True)
  787. for rev in page_1.revisions:
  788. eq_(user_b not in rev.read_by.keys(), True)
  789. cont_api_b.mark_read(page_1)
  790. for rev in page_1.revisions:
  791. eq_(user_b in rev.read_by.keys(), True)
  792. def test_mark_read__all(self):
  793. uapi = UserApi(
  794. session=self.session,
  795. config=CFG(self.config.get_settings()),
  796. current_user=None,
  797. )
  798. group_api = GroupApi(current_user=None,session=self.session)
  799. groups = [group_api.get_one(Group.TIM_USER),
  800. group_api.get_one(Group.TIM_MANAGER),
  801. group_api.get_one(Group.TIM_ADMIN)]
  802. user_a = uapi.create_user(email='this.is@user',
  803. groups=groups, save_now=True)
  804. user_b = uapi.create_user(email='this.is@another.user',
  805. groups=groups, save_now=True)
  806. wapi = WorkspaceApi(
  807. current_user=user_a,
  808. session=self.session,
  809. )
  810. workspace = wapi.create_workspace(
  811. 'test workspace',
  812. save_now=True)
  813. role_api = RoleApi(
  814. current_user=user_a,
  815. session=self.session,
  816. )
  817. role_api.create_one(user_b, workspace, UserRoleInWorkspace.READER, False)
  818. cont_api_a = ContentApi(
  819. current_user=user_a,
  820. session=self.session,
  821. )
  822. cont_api_b = ContentApi(
  823. current_user=user_b,
  824. session=self.session,
  825. )
  826. page_2 = cont_api_a.create(ContentType.Page, workspace, None, 'this is page1', do_save=True)
  827. page_3 = cont_api_a.create(ContentType.Thread, workspace, None, 'this is page2', do_save=True)
  828. page_4 = cont_api_a.create(ContentType.File, workspace, None, 'this is page3', do_save=True)
  829. for rev in page_2.revisions:
  830. eq_(user_b not in rev.read_by.keys(), True)
  831. for rev in page_3.revisions:
  832. eq_(user_b not in rev.read_by.keys(), True)
  833. for rev in page_4.revisions:
  834. eq_(user_b not in rev.read_by.keys(), True)
  835. self.session.refresh(page_2)
  836. self.session.refresh(page_3)
  837. self.session.refresh(page_4)
  838. cont_api_b.mark_read__all()
  839. for rev in page_2.revisions:
  840. eq_(user_b in rev.read_by.keys(), True)
  841. for rev in page_3.revisions:
  842. eq_(user_b in rev.read_by.keys(), True)
  843. for rev in page_4.revisions:
  844. eq_(user_b in rev.read_by.keys(), True)
  845. def test_update(self):
  846. uapi = UserApi(
  847. session=self.session,
  848. config=CFG(self.config.get_settings()),
  849. current_user=None,
  850. )
  851. group_api = GroupApi(current_user=None,session=self.session)
  852. groups = [group_api.get_one(Group.TIM_USER),
  853. group_api.get_one(Group.TIM_MANAGER),
  854. group_api.get_one(Group.TIM_ADMIN)]
  855. user1 = uapi.create_user(email='this.is@user',
  856. groups=groups, save_now=True)
  857. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  858. workspace = workspace_api.create_workspace(
  859. 'test workspace',
  860. save_now=True
  861. )
  862. wid = workspace.workspace_id
  863. user2 = uapi.create_user()
  864. user2.email = 'this.is@another.user'
  865. uapi.save(user2)
  866. RoleApi(current_user=user1,session=self.session).create_one(user2, workspace,
  867. UserRoleInWorkspace.CONTENT_MANAGER,
  868. with_notif=False,
  869. flush=True)
  870. # Test starts here
  871. api = ContentApi(
  872. current_user=user1,
  873. session=self.session,
  874. )
  875. p = api.create(ContentType.Page, workspace, None,
  876. 'this_is_a_page', True)
  877. u1id = user1.user_id
  878. u2id = user2.user_id
  879. pcid = p.content_id
  880. poid = p.owner_id
  881. transaction.commit()
  882. # Refresh instances after commit
  883. user1 = uapi.get_one(u1id)
  884. workspace = WorkspaceApi(
  885. current_user=user1,
  886. session=self.session
  887. ).get_one(wid)
  888. api = ContentApi(
  889. current_user=user1,
  890. session=self.session,
  891. )
  892. content = api.get_one(pcid, ContentType.Any, workspace)
  893. eq_(u1id, content.owner_id)
  894. eq_(poid, content.owner_id)
  895. u2 = UserApi(
  896. session=self.session,
  897. config=self.config,
  898. current_user=None,
  899. ).get_one(u2id)
  900. api2 = ContentApi(
  901. current_user=u2,
  902. session=self.session,
  903. )
  904. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  905. with new_revision(
  906. session=self.session,
  907. tm=transaction.manager,
  908. content=content2,
  909. ):
  910. api2.update_content(content2, 'this is an updated page', 'new content')
  911. api2.save(content2)
  912. transaction.commit()
  913. # Refresh instances after commit
  914. user1 = uapi.get_one(u1id)
  915. workspace = WorkspaceApi(
  916. current_user=user1,
  917. session=self.session,
  918. ).get_one(wid)
  919. api = ContentApi(
  920. current_user=user1,
  921. session=self.session,
  922. )
  923. updated = api.get_one(pcid, ContentType.Any, workspace)
  924. eq_(u2id, updated.owner_id,
  925. 'the owner id should be {} (found {})'.format(u2id,
  926. updated.owner_id))
  927. eq_('this is an updated page', updated.label)
  928. eq_('new content', updated.description)
  929. eq_(ActionDescription.EDITION, updated.revision_type)
  930. @raises(SameValueError)
  931. def test_update_no_change(self):
  932. uapi = UserApi(
  933. session=self.session,
  934. config=CFG(self.config.get_settings()),
  935. current_user=None,
  936. )
  937. group_api = GroupApi(current_user=None,session=self.session)
  938. groups = [group_api.get_one(Group.TIM_USER),
  939. group_api.get_one(Group.TIM_MANAGER),
  940. group_api.get_one(Group.TIM_ADMIN)]
  941. user1 = uapi.create_user(
  942. email='this.is@user',
  943. groups=groups,
  944. save_now=True,
  945. )
  946. workspace = WorkspaceApi(user1).create_workspace(
  947. 'test workspace',
  948. save_now=True
  949. )
  950. user2 = uapi.create_user()
  951. user2.email = 'this.is@another.user'
  952. uapi.save(user2)
  953. RoleApi(current_user=user1,session=self.session).create_one(
  954. user2,
  955. workspace,
  956. UserRoleInWorkspace.CONTENT_MANAGER,
  957. with_notif=False,
  958. flush=True
  959. )
  960. api = ContentApi(
  961. current_user=user1,
  962. session=self.session
  963. )
  964. with self.session.no_autoflush:
  965. page = api.create(
  966. content_type=ContentType.Page,
  967. workspace=workspace,
  968. label="same_content",
  969. do_save=False
  970. )
  971. page.description = "Same_content_here"
  972. api.save(page, ActionDescription.CREATION, do_notify=True)
  973. transaction.commit()
  974. api2 = ContentApi(user2)
  975. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  976. with new_revision(
  977. session=self.session,
  978. tm=transaction.manager,
  979. content=content2,
  980. ):
  981. api2.update_content(
  982. item=content2,
  983. new_label='same_content',
  984. new_content='Same_content_here'
  985. )
  986. api2.save(content2)
  987. transaction.commit()
  988. def test_update_file_data(self):
  989. uapi = UserApi(
  990. session=self.session,
  991. config=CFG(self.config.get_settings()),
  992. current_user=None,
  993. )
  994. group_api = GroupApi(current_user=None,session=self.session)
  995. groups = [group_api.get_one(Group.TIM_USER),
  996. group_api.get_one(Group.TIM_MANAGER),
  997. group_api.get_one(Group.TIM_ADMIN)]
  998. user1 = uapi.create_user(email='this.is@user',
  999. groups=groups, save_now=True)
  1000. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1001. workspace = workspace_api.create_workspace(
  1002. 'test workspace',
  1003. save_now=True
  1004. )
  1005. wid = workspace.workspace_id
  1006. user2 = uapi.create_user()
  1007. user2.email = 'this.is@another.user'
  1008. uapi.save(user2)
  1009. RoleApi(current_user=user1,session=self.session).create_one(user2, workspace,
  1010. UserRoleInWorkspace.CONTENT_MANAGER,
  1011. with_notif=True,
  1012. flush=True)
  1013. # Test starts here
  1014. api = ContentApi(
  1015. current_user=user1,
  1016. session=self.session
  1017. )
  1018. p = api.create(ContentType.File, workspace, None,
  1019. 'this_is_a_page', True)
  1020. u1id = user1.user_id
  1021. u2id = user2.user_id
  1022. pcid = p.content_id
  1023. poid = p.owner_id
  1024. api.save(p)
  1025. transaction.commit()
  1026. # Refresh instances after commit
  1027. user1 = uapi.get_one(u1id)
  1028. workspace_api2 = WorkspaceApi(current_user=user1, session=self.session)
  1029. workspace = workspace_api2.get_one(wid)
  1030. api = ContentApi(current_user=user1, session=self.session)
  1031. content = api.get_one(pcid, ContentType.Any, workspace)
  1032. eq_(u1id, content.owner_id)
  1033. eq_(poid, content.owner_id)
  1034. u2 = UserApi(
  1035. current_user=None,
  1036. session=self.session,
  1037. config=self.config,
  1038. ).get_one(u2id)
  1039. api2 = ContentApi(
  1040. current_user=u2,
  1041. session=self.session,
  1042. )
  1043. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1044. with new_revision(
  1045. session=self.session,
  1046. tm=transaction.manager,
  1047. content=content2,
  1048. ):
  1049. api2.update_file_data(content2, 'index.html', 'text/html',
  1050. b'<html>hello world</html>')
  1051. api2.save(content2)
  1052. transaction.commit()
  1053. # Refresh instances after commit
  1054. user1 = uapi.get_one(u1id)
  1055. workspace = WorkspaceApi(
  1056. current_user=user1,
  1057. session=self.session,
  1058. ).get_one(wid)
  1059. updated = api.get_one(pcid, ContentType.Any, workspace)
  1060. eq_(u2id, updated.owner_id,
  1061. 'the owner id should be {} (found {})'.format(u2id,
  1062. updated.owner_id))
  1063. eq_('this_is_a_page.html', updated.file_name)
  1064. eq_('text/html', updated.file_mimetype)
  1065. eq_(b'<html>hello world</html>', updated.depot_file.file.read())
  1066. eq_(ActionDescription.REVISION, updated.revision_type)
  1067. @raises(SameValueError)
  1068. def test_update_no_change(self):
  1069. uapi = UserApi(
  1070. session=self.session,
  1071. config=CFG(self.config.get_settings()),
  1072. current_user=None,
  1073. )
  1074. group_api = GroupApi(current_user=None,session=self.session)
  1075. groups = [group_api.get_one(Group.TIM_USER),
  1076. group_api.get_one(Group.TIM_MANAGER),
  1077. group_api.get_one(Group.TIM_ADMIN)]
  1078. user1 = uapi.create_user(
  1079. email='this.is@user',
  1080. groups=groups,
  1081. save_now=True,
  1082. )
  1083. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1084. workspace = workspace_api.create_workspace(
  1085. 'test workspace',
  1086. save_now=True
  1087. )
  1088. user2 = uapi.create_user()
  1089. user2.email = 'this.is@another.user'
  1090. uapi.save(user2)
  1091. RoleApi(current_user=user1,session=self.session).create_one(
  1092. user2,
  1093. workspace,
  1094. UserRoleInWorkspace.CONTENT_MANAGER,
  1095. with_notif=False,
  1096. flush=True
  1097. )
  1098. api = ContentApi(current_user=user1, session=self.session)
  1099. with self.session.no_autoflush:
  1100. page = api.create(
  1101. content_type=ContentType.Page,
  1102. workspace=workspace,
  1103. label="same_content",
  1104. do_save=False
  1105. )
  1106. api.update_file_data(
  1107. page,
  1108. 'index.html',
  1109. 'text/html',
  1110. b'<html>Same Content Here</html>'
  1111. )
  1112. api.save(page, ActionDescription.CREATION, do_notify=True)
  1113. transaction.commit()
  1114. api2 = ContentApi(current_user=user2, session=self.session)
  1115. content2 = api2.get_one(page.content_id, ContentType.Any, workspace)
  1116. with new_revision(
  1117. session=self.session,
  1118. tm=transaction.manager,
  1119. content=content2,
  1120. ):
  1121. api2.update_file_data(
  1122. page,
  1123. 'index.html',
  1124. 'text/html',
  1125. b'<html>Same Content Here</html>'
  1126. )
  1127. api2.save(content2)
  1128. transaction.commit()
  1129. def test_archive_unarchive(self):
  1130. uapi = UserApi(
  1131. session=self.session,
  1132. config=CFG(self.config.get_settings()),
  1133. current_user=None,
  1134. )
  1135. group_api = GroupApi(current_user=None,session=self.session)
  1136. groups = [group_api.get_one(Group.TIM_USER),
  1137. group_api.get_one(Group.TIM_MANAGER),
  1138. group_api.get_one(Group.TIM_ADMIN)]
  1139. user1 = uapi.create_user(email='this.is@user',
  1140. groups=groups, save_now=True)
  1141. u1id = user1.user_id
  1142. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1143. workspace = workspace_api.create_workspace(
  1144. 'test workspace',
  1145. save_now=True
  1146. )
  1147. wid = workspace.workspace_id
  1148. user2 = uapi.create_user()
  1149. user2.email = 'this.is@another.user'
  1150. uapi.save(user2)
  1151. RoleApi(current_user=user1,session=self.session).create_one(user2, workspace,
  1152. UserRoleInWorkspace.CONTENT_MANAGER,
  1153. with_notif=True,
  1154. flush=True)
  1155. # show archived is used at the top end of the test
  1156. api = ContentApi(
  1157. current_user=user1,
  1158. session=self.session,
  1159. show_archived=True,
  1160. )
  1161. p = api.create(ContentType.File, workspace, None,
  1162. 'this_is_a_page', True)
  1163. u1id = user1.user_id
  1164. u2id = user2.user_id
  1165. pcid = p.content_id
  1166. poid = p.owner_id
  1167. transaction.commit()
  1168. ####
  1169. # refresh after commit
  1170. user1 = UserApi(
  1171. current_user=None,
  1172. config=self.config,
  1173. session=self.session
  1174. ).get_one(u1id)
  1175. workspace = WorkspaceApi(
  1176. current_user=user1,
  1177. session=self.session
  1178. ).get_one(wid)
  1179. content = api.get_one(pcid, ContentType.Any, workspace)
  1180. eq_(u1id, content.owner_id)
  1181. eq_(poid, content.owner_id)
  1182. u2api = UserApi(
  1183. session=self.session,
  1184. config=CFG(self.config.get_settings()),
  1185. current_user=None,
  1186. )
  1187. u2 = u2api.get_one(u2id)
  1188. api2 = ContentApi(
  1189. current_user=u2,
  1190. session=self.session,
  1191. show_archived=True
  1192. )
  1193. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1194. with new_revision(
  1195. session=self.session,
  1196. tm=transaction.manager,
  1197. content=content2,
  1198. ):
  1199. api2.archive(content2)
  1200. api2.save(content2)
  1201. transaction.commit()
  1202. # refresh after commit
  1203. user1 = UserApi(
  1204. current_user=None,
  1205. session=self.session,
  1206. config= self.config
  1207. ).get_one(u1id)
  1208. workspace = WorkspaceApi(
  1209. current_user=user1,
  1210. session=self.session,
  1211. ).get_one(wid)
  1212. u2 = UserApi(
  1213. current_user=None,
  1214. session=self.session,
  1215. config=self.config
  1216. ).get_one(u2id)
  1217. api = ContentApi(
  1218. current_user=user1,
  1219. session=self.session,
  1220. show_archived=True,
  1221. )
  1222. api2 = ContentApi(
  1223. current_user=u2,
  1224. session=self.session,
  1225. show_archived=True,
  1226. )
  1227. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1228. eq_(u2id, updated.owner_id,
  1229. 'the owner id should be {} (found {})'.format(u2id,
  1230. updated.owner_id))
  1231. eq_(True, updated.is_archived)
  1232. eq_(ActionDescription.ARCHIVING, updated.revision_type)
  1233. ####
  1234. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1235. with new_revision(
  1236. session=self.session,
  1237. tm=transaction.manager,
  1238. content=updated,
  1239. ):
  1240. api.unarchive(updated)
  1241. api.save(updated2)
  1242. eq_(False, updated2.is_archived)
  1243. eq_(ActionDescription.UNARCHIVING, updated2.revision_type)
  1244. eq_(u1id, updated2.owner_id)
  1245. def test_delete_undelete(self):
  1246. uapi = UserApi(
  1247. session=self.session,
  1248. config=CFG(self.config.get_settings()),
  1249. current_user=None,
  1250. )
  1251. group_api = GroupApi(
  1252. current_user=None,
  1253. session=self.session
  1254. )
  1255. groups = [group_api.get_one(Group.TIM_USER),
  1256. group_api.get_one(Group.TIM_MANAGER),
  1257. group_api.get_one(Group.TIM_ADMIN)]
  1258. user1 = uapi.create_user(email='this.is@user',
  1259. groups=groups, save_now=True)
  1260. u1id = user1.user_id
  1261. workspace_api = WorkspaceApi(current_user=user1, session=self.session)
  1262. workspace = workspace_api.create_workspace(
  1263. 'test workspace',
  1264. save_now=True
  1265. )
  1266. wid = workspace.workspace_id
  1267. user2 = uapi.create_user()
  1268. user2.email = 'this.is@another.user'
  1269. uapi.save(user2)
  1270. RoleApi(
  1271. current_user=user1,
  1272. session=self.session
  1273. ).create_one(
  1274. user2,
  1275. workspace,
  1276. UserRoleInWorkspace.CONTENT_MANAGER,
  1277. with_notif=True,
  1278. flush=True
  1279. )
  1280. # show archived is used at the top end of the test
  1281. api = ContentApi(
  1282. current_user=user1,
  1283. session=self.session,
  1284. show_deleted=True,
  1285. )
  1286. p = api.create(ContentType.File, workspace, None,
  1287. 'this_is_a_page', True)
  1288. u1id = user1.user_id
  1289. u2id = user2.user_id
  1290. pcid = p.content_id
  1291. poid = p.owner_id
  1292. transaction.commit()
  1293. ####
  1294. user1 = UserApi(
  1295. current_user=None,
  1296. session=self.session,
  1297. config=self.config,
  1298. ).get_one(u1id)
  1299. workspace = WorkspaceApi(
  1300. current_user=user1,
  1301. session=self.session,
  1302. ).get_one(wid)
  1303. content = api.get_one(pcid, ContentType.Any, workspace)
  1304. eq_(u1id, content.owner_id)
  1305. eq_(poid, content.owner_id)
  1306. u2 = UserApi(
  1307. current_user=None,
  1308. session=self.session,
  1309. config=self.config,
  1310. ).get_one(u2id)
  1311. api2 = ContentApi(
  1312. current_user=u2,
  1313. session=self.session,
  1314. show_deleted=True,
  1315. )
  1316. content2 = api2.get_one(pcid, ContentType.Any, workspace)
  1317. with new_revision(
  1318. session=self.session,
  1319. tm=transaction.manager,
  1320. content=content2,
  1321. ):
  1322. api2.delete(content2)
  1323. api2.save(content2)
  1324. transaction.commit()
  1325. ####
  1326. user1 = UserApi(
  1327. current_user=None,
  1328. session=self.session,
  1329. config=self.config,
  1330. ).get_one(u1id)
  1331. workspace = WorkspaceApi(
  1332. current_user=user1,
  1333. session=self.session,
  1334. ).get_one(wid)
  1335. # show archived is used at the top end of the test
  1336. api = ContentApi(
  1337. current_user=user1,
  1338. session=self.session,
  1339. show_deleted=True,
  1340. )
  1341. u2 = UserApi(
  1342. current_user=None,
  1343. session=self.session,
  1344. config=self.config,
  1345. ).get_one(u2id)
  1346. api2 = ContentApi(
  1347. current_user=u2,
  1348. session=self.session,
  1349. show_deleted=True
  1350. )
  1351. updated = api2.get_one(pcid, ContentType.Any, workspace)
  1352. eq_(u2id, updated.owner_id,
  1353. 'the owner id should be {} (found {})'.format(u2id,
  1354. updated.owner_id))
  1355. eq_(True, updated.is_deleted)
  1356. eq_(ActionDescription.DELETION, updated.revision_type)
  1357. ####
  1358. updated2 = api.get_one(pcid, ContentType.Any, workspace)
  1359. with new_revision(
  1360. tm=transaction.manager,
  1361. session=self.session,
  1362. content=updated2,
  1363. ):
  1364. api.undelete(updated2)
  1365. api.save(updated2)
  1366. eq_(False, updated2.is_deleted)
  1367. eq_(ActionDescription.UNDELETION, updated2.revision_type)
  1368. eq_(u1id, updated2.owner_id)
  1369. def test_search_in_label(self):
  1370. # HACK - D.A. - 2015-03-09
  1371. # This test is based on a bug which does NOT return results found
  1372. # at root of a workspace (eg a folder)
  1373. uapi = UserApi(
  1374. session=self.session,
  1375. config=CFG(self.config.get_settings()),
  1376. current_user=None,
  1377. )
  1378. group_api = GroupApi(current_user=None,session=self.session)
  1379. groups = [group_api.get_one(Group.TIM_USER),
  1380. group_api.get_one(Group.TIM_MANAGER),
  1381. group_api.get_one(Group.TIM_ADMIN)]
  1382. user = uapi.create_user(email='this.is@user',
  1383. groups=groups, save_now=True)
  1384. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  1385. save_now=True)
  1386. api = ContentApi(
  1387. current_user=user,
  1388. session=self.session
  1389. )
  1390. a = api.create(ContentType.Folder, workspace, None,
  1391. 'this is randomized folder', True)
  1392. p = api.create(ContentType.Page, workspace, a,
  1393. 'this is randomized label content', True)
  1394. with new_revision(
  1395. session=self.session,
  1396. tm=transaction.manager,
  1397. content=p,
  1398. ):
  1399. p.description = 'This is some amazing test'
  1400. api.save(p)
  1401. original_id = p.content_id
  1402. res = api.search(['randomized'])
  1403. eq_(1, len(res.all()))
  1404. item = res.all()[0]
  1405. eq_(original_id, item.content_id)
  1406. def test_search_in_description(self):
  1407. # HACK - D.A. - 2015-03-09
  1408. # This test is based on a bug which does NOT return results found
  1409. # at root of a workspace (eg a folder)
  1410. uapi = UserApi(
  1411. session=self.session,
  1412. config=CFG(self.config.get_settings()),
  1413. current_user=None,
  1414. )
  1415. group_api = GroupApi(current_user=None,session=self.session)
  1416. groups = [group_api.get_one(Group.TIM_USER),
  1417. group_api.get_one(Group.TIM_MANAGER),
  1418. group_api.get_one(Group.TIM_ADMIN)]
  1419. user = uapi.create_user(email='this.is@user',
  1420. groups=groups, save_now=True)
  1421. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  1422. save_now=True)
  1423. api = ContentApi(
  1424. current_user=user,
  1425. session=self.session
  1426. )
  1427. a = api.create(ContentType.Folder, workspace, None,
  1428. 'this is randomized folder', True)
  1429. p = api.create(ContentType.Page, workspace, a,
  1430. 'this is dummy label content', True)
  1431. with new_revision(
  1432. tm=transaction.manager,
  1433. session=self.session,
  1434. content=p,
  1435. ):
  1436. p.description = 'This is some amazing test'
  1437. api.save(p)
  1438. original_id = p.content_id
  1439. res = api.search(['dummy'])
  1440. eq_(1, len(res.all()))
  1441. item = res.all()[0]
  1442. eq_(original_id, item.content_id)
  1443. def test_search_in_label_or_description(self):
  1444. # HACK - D.A. - 2015-03-09
  1445. # This test is based on a bug which does NOT return results found
  1446. # at root of a workspace (eg a folder)
  1447. uapi = UserApi(
  1448. session=self.session,
  1449. config=CFG(self.config.get_settings()),
  1450. current_user=None,
  1451. )
  1452. group_api = GroupApi(current_user=None,session=self.session)
  1453. groups = [group_api.get_one(Group.TIM_USER),
  1454. group_api.get_one(Group.TIM_MANAGER),
  1455. group_api.get_one(Group.TIM_ADMIN)]
  1456. user = uapi.create_user(email='this.is@user',
  1457. groups=groups, save_now=True)
  1458. workspace = WorkspaceApi(current_user=user, session=self.session).create_workspace('test workspace',
  1459. save_now=True)
  1460. api = ContentApi(
  1461. current_user=user,
  1462. session=self.session
  1463. )
  1464. api = ContentApi(
  1465. current_user=user,
  1466. session=self.session
  1467. )
  1468. a = api.create(ContentType.Folder, workspace, None,
  1469. 'this is randomized folder', True)
  1470. p1 = api.create(ContentType.Page, workspace, a,
  1471. 'this is dummy label content', True)
  1472. p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
  1473. with new_revision(
  1474. session=self.session,
  1475. tm=transaction.manager,
  1476. content=p1,
  1477. ):
  1478. p1.description = 'This is some amazing test'
  1479. with new_revision(
  1480. session=self.session,
  1481. tm=transaction.manager,
  1482. content=p2,
  1483. ):
  1484. p2.description = 'What\'s up ?'
  1485. api.save(p1)
  1486. api.save(p2)
  1487. id1 = p1.content_id
  1488. id2 = p2.content_id
  1489. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'test workspace').count())
  1490. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
  1491. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
  1492. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
  1493. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
  1494. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
  1495. res = api.search(['dummy', 'jon'])
  1496. eq_(2, len(res.all()))
  1497. eq_(True, id1 in [o.content_id for o in res.all()])
  1498. eq_(True, id2 in [o.content_id for o in res.all()])
  1499. def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self):
  1500. admin = self.session.query(User).filter(User.email == 'admin@admin.admin').one()
  1501. workspace = self._create_workspace_and_test('workspace_1', admin)
  1502. folder_1 = self._create_content_and_test('folder_1', workspace=workspace, type=ContentType.Folder)
  1503. folder_2 = self._create_content_and_test('folder_2', workspace=workspace, type=ContentType.Folder)
  1504. page_1 = self._create_content_and_test('foo', workspace=workspace, type=ContentType.Page, parent=folder_1)
  1505. page_2 = self._create_content_and_test('bar', workspace=workspace, type=ContentType.Page, parent=folder_2)
  1506. api = ContentApi(
  1507. current_user=admin,
  1508. session=self.session,
  1509. )
  1510. foo_result = api.search(['foo']).all()
  1511. eq_(1, len(foo_result))
  1512. ok_(page_1 in foo_result)
  1513. bar_result = api.search(['bar']).all()
  1514. eq_(1, len(bar_result))
  1515. ok_(page_2 in bar_result)
  1516. with new_revision(
  1517. session=self.session,
  1518. tm=transaction.manager,
  1519. content=folder_1,
  1520. ):
  1521. api.delete(folder_1)
  1522. with new_revision(
  1523. session=self.session,
  1524. tm=transaction.manager,
  1525. content=folder_2,
  1526. ):
  1527. api.archive(folder_2)
  1528. # Actually ContentApi.search don't filter it
  1529. foo_result = api.search(['foo']).all()
  1530. eq_(1, len(foo_result))
  1531. ok_(page_1 in foo_result)
  1532. bar_result = api.search(['bar']).all()
  1533. eq_(1, len(bar_result))
  1534. ok_(page_2 in bar_result)
  1535. # ContentApi offer exclude_unavailable method to do it
  1536. foo_result = api.search(['foo']).all()
  1537. api.exclude_unavailable(foo_result)
  1538. eq_(0, len(foo_result))
  1539. bar_result = api.search(['bar']).all()
  1540. api.exclude_unavailable(bar_result)
  1541. eq_(0, len(bar_result))
  1542. class TestContentApiSecurity(DefaultTest):
  1543. fixtures = [TestFixture, ]
  1544. def test_unit__cant_get_non_access_content__ok__nominal_case(self):
  1545. admin = self.session.query(User)\
  1546. .filter(User.email == 'admin@admin.admin').one()
  1547. bob = self.session.query(User)\
  1548. .filter(User.email == 'bob@fsf.local').one()
  1549. bob_workspace = WorkspaceApi(
  1550. current_user=bob,
  1551. session=self.session,
  1552. ).create_workspace(
  1553. 'bob_workspace',
  1554. save_now=True,
  1555. )
  1556. admin_workspace = WorkspaceApi(
  1557. current_user=admin,
  1558. session=self.session,
  1559. ).create_workspace(
  1560. 'admin_workspace',
  1561. save_now=True,
  1562. )
  1563. bob_page = ContentApi(current_user=bob,session=self.session).create(
  1564. content_type=ContentType.Page,
  1565. workspace=bob_workspace,
  1566. label='bob_page',
  1567. do_save=True,
  1568. )
  1569. admin_page = ContentApi(current_user=admin,session=self.session).create(
  1570. content_type=ContentType.Page,
  1571. workspace=admin_workspace,
  1572. label='admin_page',
  1573. do_save=True,
  1574. )
  1575. bob_viewable = ContentApi(current_user=bob,session=self.session).get_all()
  1576. eq_(1, len(bob_viewable), 'Bob should view only one content')
  1577. eq_(
  1578. 'bob_page',
  1579. bob_viewable[0].label,
  1580. 'Bob should not view "{0}" content'.format(
  1581. bob_viewable[0].label,
  1582. )
  1583. )