test_content.py 11KB


  1. # -*- coding: utf-8 -*-
  2. import time
  3. from depot.fields.upload import UploadedFile
  4. from sqlalchemy.sql.elements import and_
  5. from sqlalchemy.testing import eq_
  6. import transaction
  7. import pytest
  8. # from tracim.lib.content import ContentApi
  9. from tracim.exceptions import ContentRevisionUpdateError
  10. from tracim.lib.core.content import ContentApi
  11. from tracim.models import Content
  12. from tracim.models.revision_protection import new_revision
  13. from tracim.models import User
  14. from tracim.models.data import ActionDescription
  15. from tracim.models.data import ContentRevisionRO
  16. from tracim.models.data import ContentType
  17. from tracim.models.data import Workspace
  18. from tracim.tests import StandardTest
  19. class TestContent(StandardTest):
  20. def test_update_without_prepare(self):
  21. content1 = self.test_create()
  22. with pytest.raises(ContentRevisionUpdateError):
  23. content1.description = 'FOO'
  24. # Raise ContentRevisionUpdateError because revision can't be updated
  25. def test_query(self):
  26. content1 = self.test_create()
  27. with new_revision(
  28. session=self.session,
  29. tm=transaction.manager,
  30. content=content1,
  31. ):
  32. content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  33. self.session.flush()
  34. content2 = self.test_create(key='2')
  35. with new_revision(
  36. session=self.session,
  37. tm=transaction.manager,
  38. content=content2,
  39. ):
  40. content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
  41. self.session.flush()
  42. workspace1 = self.session.query(Workspace)\
  43. .filter(Workspace.label == 'TEST_WORKSPACE_1').one()
  44. workspace2 = self.session.query(Workspace)\
  45. .filter(Workspace.label == 'TEST_WORKSPACE_2').one()
  46. # To get Content in database
  47. # we have to join Content and ContentRevisionRO
  48. # with particular condition:
  49. # Join have to be on most recent revision
  50. join_sub_query = self.session.query(ContentRevisionRO.revision_id)\
  51. .filter(ContentRevisionRO.content_id == Content.id)\
  52. .order_by(ContentRevisionRO.revision_id.desc())\
  53. .limit(1)\
  54. .correlate(Content)
  55. base_query = self.session.query(Content).join(
  56. ContentRevisionRO,
  57. and_(
  58. Content.id == ContentRevisionRO.content_id,
  59. ContentRevisionRO.revision_id == join_sub_query
  60. )
  61. )
  62. pattern = 'TEST_CONTENT_DESCRIPTION_%_UPDATED'
  63. eq_(2, base_query.filter(Content.description.like(pattern)).count())
  64. eq_(1, base_query.filter(Content.workspace == workspace1).count())
  65. eq_(1, base_query.filter(Content.workspace == workspace2).count())
  66. content1_from_query = base_query\
  67. .filter(Content.workspace == workspace1).one()
  68. eq_(content1.id, content1_from_query.id)
  69. eq_(
  70. 'TEST_CONTENT_DESCRIPTION_1_UPDATED',
  71. content1_from_query.description
  72. )
  73. user_admin = self.session.query(User)\
  74. .filter(User.email == 'admin@admin.admin').one()
  75. api = ContentApi(
  76. current_user=None,
  77. session=self.session,
  78. config=self.app_config,
  79. )
  80. content1_from_api = api.get_one(
  81. content1.id,
  82. ContentType.Page,
  83. workspace1
  84. )
  85. def test_update(self):
  86. created_content = self.test_create()
  87. content = self.session.query(Content)\
  88. .filter(Content.id == created_content.id).one()
  89. eq_(1, self.session.query(ContentRevisionRO)
  90. .filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  91. with new_revision(
  92. session=self.session,
  93. tm=transaction.manager,
  94. content=content
  95. ):
  96. time.sleep(0.00001)
  97. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  98. self.session.flush()
  99. eq_(
  100. 2,
  101. self.session.query(ContentRevisionRO).filter(
  102. ContentRevisionRO.label == 'TEST_CONTENT_1'
  103. ).count()
  104. )
  105. eq_(
  106. 1,
  107. self.session.query(Content).filter(
  108. Content.id == created_content.id
  109. ).count()
  110. )
  111. with new_revision(
  112. session=self.session,
  113. tm=transaction.manager,
  114. content=content
  115. ):
  116. time.sleep(0.00001)
  117. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
  118. content.label = 'TEST_CONTENT_1_UPDATED_2'
  119. self.session.flush()
  120. eq_(
  121. 1,
  122. self.session.query(ContentRevisionRO).filter(
  123. ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2'
  124. ).count()
  125. )
  126. eq_(
  127. 1,
  128. self.session.query(Content).filter(
  129. Content.id == created_content.id
  130. ).count()
  131. )
  132. revision_1 = self.session.query(ContentRevisionRO).filter(
  133. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1'
  134. ).one()
  135. revision_2 = self.session.query(ContentRevisionRO).filter(
  136. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED' # nopep8
  137. ).one()
  138. revision_3 = self.session.query(ContentRevisionRO).filter(
  139. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2' # nopep8
  140. ).one()
  141. # Updated dates must be different
  142. assert revision_1.updated < revision_2.updated < revision_3.updated
  143. # Created dates must be equal
  144. assert revision_1.created == revision_2.created == revision_3.created
  145. def test_creates(self):
  146. eq_(
  147. 0,
  148. self.session.query(ContentRevisionRO).filter(
  149. ContentRevisionRO.label == 'TEST_CONTENT_1'
  150. ).count()
  151. )
  152. eq_(
  153. 0,
  154. self.session.query(Workspace).filter(
  155. Workspace.label == 'TEST_WORKSPACE_1'
  156. ).count()
  157. )
  158. user_admin = self.session.query(User).filter(
  159. User.email == 'admin@admin.admin'
  160. ).one()
  161. workspace = Workspace(label="TEST_WORKSPACE_1")
  162. self.session.add(workspace)
  163. self.session.flush()
  164. eq_(
  165. 1,
  166. self.session.query(Workspace).filter(
  167. Workspace.label == 'TEST_WORKSPACE_1'
  168. ).count()
  169. )
  170. first_content = self._create_content(
  171. owner=user_admin,
  172. workspace=workspace,
  173. type=ContentType.Page,
  174. label='TEST_CONTENT_1',
  175. description='TEST_CONTENT_DESCRIPTION_1',
  176. revision_type=ActionDescription.CREATION,
  177. is_deleted=False,
  178. is_archived=False,
  179. )
  180. eq_(
  181. 1,
  182. self.session.query(ContentRevisionRO).filter(
  183. ContentRevisionRO.label == 'TEST_CONTENT_1'
  184. ).count()
  185. )
  186. content = self.session.query(Content).filter(
  187. Content.id == first_content.id
  188. ).one()
  189. eq_('TEST_CONTENT_1', content.label)
  190. eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
  191. # Create a second content
  192. second_content = self._create_content(
  193. owner=user_admin,
  194. workspace=workspace,
  195. type=ContentType.Page,
  196. label='TEST_CONTENT_2',
  197. description='TEST_CONTENT_DESCRIPTION_2',
  198. revision_type=ActionDescription.CREATION
  199. )
  200. eq_(
  201. 1,
  202. self.session.query(ContentRevisionRO).filter(
  203. ContentRevisionRO.label == 'TEST_CONTENT_2'
  204. ).count()
  205. )
  206. content = self.session.query(Content).filter(
  207. Content.id == second_content.id
  208. ).one()
  209. eq_('TEST_CONTENT_2', content.label)
  210. eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
  211. def test_create(self, key='1'):
  212. eq_(
  213. 0,
  214. self.session.query(ContentRevisionRO).filter(
  215. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count()
  216. )
  217. eq_(
  218. 0,
  219. self.session.query(Workspace).filter(
  220. Workspace.label == 'TEST_WORKSPACE_%s' % key).count()
  221. )
  222. user_admin = self.session.query(User).filter(
  223. User.email == 'admin@admin.admin'
  224. ).one()
  225. workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
  226. self.session.add(workspace)
  227. self.session.flush()
  228. eq_(
  229. 1,
  230. self.session.query(Workspace).filter(
  231. Workspace.label == 'TEST_WORKSPACE_%s' % key
  232. ).count()
  233. )
  234. created_content = self._create_content(
  235. owner=user_admin,
  236. workspace=workspace,
  237. type=ContentType.Page,
  238. label='TEST_CONTENT_%s' % key,
  239. description='TEST_CONTENT_DESCRIPTION_%s' % key,
  240. revision_type=ActionDescription.CREATION
  241. )
  242. eq_(
  243. 1,
  244. self.session.query(ContentRevisionRO).filter(
  245. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key
  246. ).count()
  247. )
  248. content = self.session.query(Content).filter(
  249. Content.id == created_content.id
  250. ).one()
  251. eq_('TEST_CONTENT_%s' % key, content.label)
  252. eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
  253. return created_content
  254. def _get_user(self):
  255. email = 'admin@admin.admin'
  256. user_query = self.session.query(User)
  257. user_filter = user_query.filter(User.email == email)
  258. user = user_filter.one()
  259. return user
  260. def _create_content(self, *args, **kwargs):
  261. content = Content(*args, **kwargs)
  262. self.session.add(content)
  263. self.session.flush()
  264. return content
  265. def _create_content_from_nothing(self):
  266. user_admin = self._get_user()
  267. workspace = Workspace(label="TEST_WORKSPACE_1")
  268. content = self._create_content(
  269. owner=user_admin,
  270. workspace=workspace,
  271. type=ContentType.File,
  272. label='TEST_CONTENT_1',
  273. description='TEST_CONTENT_DESCRIPTION_1',
  274. revision_type=ActionDescription.CREATION,
  275. )
  276. return content
  277. def test_unit__content_depot_file(self):
  278. """ Depot file access thought content property methods. """
  279. content = self._create_content_from_nothing()
  280. # tests uninitialized depot file
  281. eq_(content.depot_file, None)
  282. # initializes depot file
  283. # which is able to behave like a python file object
  284. content.depot_file = b'test'
  285. # tests initialized depot file
  286. assert content.depot_file
  287. # tests type of initialized depot file
  288. eq_(type(content.depot_file), UploadedFile)
  289. # tests content of initialized depot file
  290. # using depot_file.file of type StoredFile to fetch content back
  291. eq_(content.depot_file.file.read(), b'test')