test_content.py 11KB


  1. # -*- coding: utf-8 -*-
  2. import time
  3. from depot.fields.upload import UploadedFile
  4. from sqlalchemy.sql.elements import and_
  5. from sqlalchemy.testing import eq_
  6. import transaction
  7. import pytest
  8. # from tracim.lib.content import ContentApi
  9. from tracim_backend.exceptions import ContentRevisionUpdateError
  10. from tracim_backend.lib.core.content import ContentApi
  11. from tracim_backend.models import Content
  12. from tracim_backend.models.revision_protection import new_revision
  13. from tracim_backend.models import User
  14. from tracim_backend.models.data import ActionDescription
  15. from tracim_backend.models.data import ContentRevisionRO
  16. from tracim_backend.app_models.contents import CONTENT_TYPES
  17. from tracim_backend.models.data import Workspace
  18. from tracim_backend.tests import StandardTest
  19. class TestContent(StandardTest):
  20. def test_update_without_prepare(self):
  21. content1 = self.test_create()
  22. with pytest.raises(ContentRevisionUpdateError):
  23. content1.description = 'FOO'
  24. # Raise ContentRevisionUpdateError because revision can't be updated
  25. def test_query(self):
  26. content1 = self.test_create()
  27. with new_revision(
  28. session=self.session,
  29. tm=transaction.manager,
  30. content=content1,
  31. ):
  32. content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  33. self.session.flush()
  34. content2 = self.test_create(key='2')
  35. with new_revision(
  36. session=self.session,
  37. tm=transaction.manager,
  38. content=content2,
  39. ):
  40. content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
  41. self.session.flush()
  42. workspace1 = self.session.query(Workspace)\
  43. .filter(Workspace.label == 'TEST_WORKSPACE_1').one()
  44. workspace2 = self.session.query(Workspace)\
  45. .filter(Workspace.label == 'TEST_WORKSPACE_2').one()
  46. # To get Content in database
  47. # we have to join Content and ContentRevisionRO
  48. # with particular condition:
  49. # Join have to be on most recent revision
  50. join_sub_query = self.session.query(ContentRevisionRO.revision_id)\
  51. .filter(ContentRevisionRO.content_id == Content.id)\
  52. .order_by(ContentRevisionRO.revision_id.desc())\
  53. .limit(1)\
  54. .correlate(Content)
  55. base_query = self.session.query(Content).join(
  56. ContentRevisionRO,
  57. and_(
  58. Content.id == ContentRevisionRO.content_id,
  59. ContentRevisionRO.revision_id == join_sub_query
  60. )
  61. )
  62. pattern = 'TEST_CONTENT_DESCRIPTION_%_UPDATED'
  63. eq_(2, base_query.filter(Content.description.like(pattern)).count())
  64. eq_(1, base_query.filter(Content.workspace == workspace1).count())
  65. eq_(1, base_query.filter(Content.workspace == workspace2).count())
  66. content1_from_query = base_query\
  67. .filter(Content.workspace == workspace1).one()
  68. eq_(content1.id, content1_from_query.id)
  69. eq_(
  70. 'TEST_CONTENT_DESCRIPTION_1_UPDATED',
  71. content1_from_query.description
  72. )
  73. user_admin = self.session.query(User)\
  74. .filter(User.email == 'admin@admin.admin').one()
  75. api = ContentApi(
  76. current_user=None,
  77. session=self.session,
  78. config=self.app_config,
  79. )
  80. content1_from_api = api.get_one(
  81. content1.id,
  82. CONTENT_TYPES.Page.slug,
  83. workspace1
  84. )
  85. def test_update(self):
  86. created_content = self.test_create()
  87. content = self.session.query(Content)\
  88. .filter(Content.id == created_content.id).one()
  89. eq_(1, self.session.query(ContentRevisionRO)
  90. .filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  91. with new_revision(
  92. session=self.session,
  93. tm=transaction.manager,
  94. content=content
  95. ):
  96. time.sleep(0.00001)
  97. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  98. self.session.flush()
  99. eq_(
  100. 2,
  101. self.session.query(ContentRevisionRO).filter(
  102. ContentRevisionRO.label == 'TEST_CONTENT_1'
  103. ).count()
  104. )
  105. eq_(
  106. 1,
  107. self.session.query(Content).filter(
  108. Content.id == created_content.id
  109. ).count()
  110. )
  111. with new_revision(
  112. session=self.session,
  113. tm=transaction.manager,
  114. content=content
  115. ):
  116. time.sleep(0.00001)
  117. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
  118. content.label = 'TEST_CONTENT_1_UPDATED_2'
  119. self.session.flush()
  120. eq_(
  121. 1,
  122. self.session.query(ContentRevisionRO).filter(
  123. ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2'
  124. ).count()
  125. )
  126. eq_(
  127. 1,
  128. self.session.query(Content).filter(
  129. Content.id == created_content.id
  130. ).count()
  131. )
  132. revision_1 = self.session.query(ContentRevisionRO).filter(
  133. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1'
  134. ).one()
  135. revision_2 = self.session.query(ContentRevisionRO).filter(
  136. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED' # nopep8
  137. ).one()
  138. revision_3 = self.session.query(ContentRevisionRO).filter(
  139. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2' # nopep8
  140. ).one()
  141. # Updated dates must be different
  142. assert revision_1.updated < revision_2.updated < revision_3.updated
  143. # Created dates must be equal
  144. assert revision_1.created == revision_2.created == revision_3.created
  145. def test_creates(self):
  146. eq_(
  147. 0,
  148. self.session.query(ContentRevisionRO).filter(
  149. ContentRevisionRO.label == 'TEST_CONTENT_1'
  150. ).count()
  151. )
  152. eq_(
  153. 0,
  154. self.session.query(Workspace).filter(
  155. Workspace.label == 'TEST_WORKSPACE_1'
  156. ).count()
  157. )
  158. user_admin = self.session.query(User).filter(
  159. User.email == 'admin@admin.admin'
  160. ).one()
  161. workspace = Workspace(label="TEST_WORKSPACE_1")
  162. self.session.add(workspace)
  163. self.session.flush()
  164. eq_(
  165. 1,
  166. self.session.query(Workspace).filter(
  167. Workspace.label == 'TEST_WORKSPACE_1'
  168. ).count()
  169. )
  170. first_content = self._create_content(
  171. owner=user_admin,
  172. workspace=workspace,
  173. type=CONTENT_TYPES.Page.slug,
  174. label='TEST_CONTENT_1',
  175. description='TEST_CONTENT_DESCRIPTION_1',
  176. revision_type=ActionDescription.CREATION,
  177. is_deleted=False,
  178. is_archived=False,
  179. )
  180. eq_(
  181. 1,
  182. self.session.query(ContentRevisionRO).filter(
  183. ContentRevisionRO.label == 'TEST_CONTENT_1'
  184. ).count()
  185. )
  186. content = self.session.query(Content).filter(
  187. Content.id == first_content.id
  188. ).one()
  189. eq_('TEST_CONTENT_1', content.label)
  190. eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
  191. # Create a second content
  192. second_content = self._create_content(
  193. owner=user_admin,
  194. workspace=workspace,
  195. type=CONTENT_TYPES.Page.slug,
  196. label='TEST_CONTENT_2',
  197. description='TEST_CONTENT_DESCRIPTION_2',
  198. revision_type=ActionDescription.CREATION
  199. )
  200. eq_(
  201. 1,
  202. self.session.query(ContentRevisionRO).filter(
  203. ContentRevisionRO.label == 'TEST_CONTENT_2'
  204. ).count()
  205. )
  206. content = self.session.query(Content).filter(
  207. Content.id == second_content.id
  208. ).one()
  209. eq_('TEST_CONTENT_2', content.label)
  210. eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
  211. def test_create(self, key='1'):
  212. eq_(
  213. 0,
  214. self.session.query(ContentRevisionRO).filter(
  215. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count()
  216. )
  217. eq_(
  218. 0,
  219. self.session.query(Workspace).filter(
  220. Workspace.label == 'TEST_WORKSPACE_%s' % key).count()
  221. )
  222. user_admin = self.session.query(User).filter(
  223. User.email == 'admin@admin.admin'
  224. ).one()
  225. workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
  226. self.session.add(workspace)
  227. self.session.flush()
  228. eq_(
  229. 1,
  230. self.session.query(Workspace).filter(
  231. Workspace.label == 'TEST_WORKSPACE_%s' % key
  232. ).count()
  233. )
  234. created_content = self._create_content(
  235. owner=user_admin,
  236. workspace=workspace,
  237. type=CONTENT_TYPES.Page.slug,
  238. label='TEST_CONTENT_%s' % key,
  239. description='TEST_CONTENT_DESCRIPTION_%s' % key,
  240. revision_type=ActionDescription.CREATION
  241. )
  242. eq_(
  243. 1,
  244. self.session.query(ContentRevisionRO).filter(
  245. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key
  246. ).count()
  247. )
  248. content = self.session.query(Content).filter(
  249. Content.id == created_content.id
  250. ).one()
  251. eq_('TEST_CONTENT_%s' % key, content.label)
  252. eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
  253. return created_content
  254. def _get_user(self):
  255. email = 'admin@admin.admin'
  256. user_query = self.session.query(User)
  257. user_filter = user_query.filter(User.email == email)
  258. user = user_filter.one()
  259. return user
  260. def _create_content(self, *args, **kwargs):
  261. content = Content(*args, **kwargs)
  262. self.session.add(content)
  263. self.session.flush()
  264. return content
  265. def _create_content_from_nothing(self):
  266. user_admin = self._get_user()
  267. workspace = Workspace(label="TEST_WORKSPACE_1")
  268. content = self._create_content(
  269. owner=user_admin,
  270. workspace=workspace,
  271. type=CONTENT_TYPES.File.slug,
  272. label='TEST_CONTENT_1',
  273. description='TEST_CONTENT_DESCRIPTION_1',
  274. revision_type=ActionDescription.CREATION,
  275. )
  276. return content
  277. def test_unit__content_depot_file(self):
  278. """ Depot file access thought content property methods. """
  279. content = self._create_content_from_nothing()
  280. # tests uninitialized depot file
  281. eq_(content.depot_file, None)
  282. # initializes depot file
  283. # which is able to behave like a python file object
  284. content.depot_file = b'test'
  285. # tests initialized depot file
  286. assert content.depot_file
  287. # tests type of initialized depot file
  288. eq_(type(content.depot_file), UploadedFile)
  289. # tests content of initialized depot file
  290. # using depot_file.file of type StoredFile to fetch content back
  291. eq_(content.depot_file.file.read(), b'test')