test_content.py 11KB


  1. # -*- coding: utf-8 -*-
  2. import time
  3. from depot.fields.upload import UploadedFile
  4. from nose.tools import ok_
  5. from nose.tools import raises
  6. from sqlalchemy.sql.elements import and_
  7. from sqlalchemy.testing import eq_
  8. import transaction
  9. # from tracim.lib.content import ContentApi
  10. from tracim.exceptions import ContentRevisionUpdateError
  11. from tracim.lib.core.content import ContentApi
  12. from tracim.models import Content
  13. from tracim.models.revision_protection import new_revision
  14. from tracim.models import User
  15. from tracim.models.data import ActionDescription
  16. from tracim.models.data import ContentRevisionRO
  17. from tracim.models.data import ContentType
  18. from tracim.models.data import Workspace
  19. from tracim.tests import StandardTest
  20. class TestContent(StandardTest):
  21. @raises(ContentRevisionUpdateError)
  22. def test_update_without_prepare(self):
  23. content1 = self.test_create()
  24. content1.description = 'FOO'
  25. # Raise ContentRevisionUpdateError because revision can't be updated
  26. def test_query(self):
  27. content1 = self.test_create()
  28. with new_revision(
  29. session=self.session,
  30. tm=transaction.manager,
  31. content=content1,
  32. ):
  33. content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  34. self.session.flush()
  35. content2 = self.test_create(key='2')
  36. with new_revision(
  37. session=self.session,
  38. tm=transaction.manager,
  39. content=content2,
  40. ):
  41. content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
  42. self.session.flush()
  43. workspace1 = self.session.query(Workspace)\
  44. .filter(Workspace.label == 'TEST_WORKSPACE_1').one()
  45. workspace2 = self.session.query(Workspace)\
  46. .filter(Workspace.label == 'TEST_WORKSPACE_2').one()
  47. # To get Content in database
  48. # we have to join Content and ContentRevisionRO
  49. # with particular condition:
  50. # Join have to be on most recent revision
  51. join_sub_query = self.session.query(ContentRevisionRO.revision_id)\
  52. .filter(ContentRevisionRO.content_id == Content.id)\
  53. .order_by(ContentRevisionRO.revision_id.desc())\
  54. .limit(1)\
  55. .correlate(Content)
  56. base_query = self.session.query(Content).join(
  57. ContentRevisionRO,
  58. and_(
  59. Content.id == ContentRevisionRO.content_id,
  60. ContentRevisionRO.revision_id == join_sub_query
  61. )
  62. )
  63. pattern = 'TEST_CONTENT_DESCRIPTION_%_UPDATED'
  64. eq_(2, base_query.filter(Content.description.like(pattern)).count())
  65. eq_(1, base_query.filter(Content.workspace == workspace1).count())
  66. eq_(1, base_query.filter(Content.workspace == workspace2).count())
  67. content1_from_query = base_query\
  68. .filter(Content.workspace == workspace1).one()
  69. eq_(content1.id, content1_from_query.id)
  70. eq_(
  71. 'TEST_CONTENT_DESCRIPTION_1_UPDATED',
  72. content1_from_query.description
  73. )
  74. user_admin = self.session.query(User)\
  75. .filter(User.email == 'admin@admin.admin').one()
  76. api = ContentApi(
  77. current_user=None,
  78. session=self.session,
  79. config=self.app_config,
  80. )
  81. content1_from_api = api.get_one(
  82. content1.id,
  83. ContentType.Page,
  84. workspace1
  85. )
  86. def test_update(self):
  87. created_content = self.test_create()
  88. content = self.session.query(Content)\
  89. .filter(Content.id == created_content.id).one()
  90. eq_(1, self.session.query(ContentRevisionRO)
  91. .filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  92. with new_revision(
  93. session=self.session,
  94. tm=transaction.manager,
  95. content=content
  96. ):
  97. time.sleep(0.00001)
  98. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  99. self.session.flush()
  100. eq_(
  101. 2,
  102. self.session.query(ContentRevisionRO).filter(
  103. ContentRevisionRO.label == 'TEST_CONTENT_1'
  104. ).count()
  105. )
  106. eq_(
  107. 1,
  108. self.session.query(Content).filter(
  109. Content.id == created_content.id
  110. ).count()
  111. )
  112. with new_revision(
  113. session=self.session,
  114. tm=transaction.manager,
  115. content=content
  116. ):
  117. time.sleep(0.00001)
  118. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
  119. content.label = 'TEST_CONTENT_1_UPDATED_2'
  120. self.session.flush()
  121. eq_(
  122. 1,
  123. self.session.query(ContentRevisionRO).filter(
  124. ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2'
  125. ).count()
  126. )
  127. eq_(
  128. 1,
  129. self.session.query(Content).filter(
  130. Content.id == created_content.id
  131. ).count()
  132. )
  133. revision_1 = self.session.query(ContentRevisionRO).filter(
  134. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1'
  135. ).one()
  136. revision_2 = self.session.query(ContentRevisionRO).filter(
  137. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED' # nopep8
  138. ).one()
  139. revision_3 = self.session.query(ContentRevisionRO).filter(
  140. ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2' # nopep8
  141. ).one()
  142. # Updated dates must be different
  143. ok_(revision_1.updated < revision_2.updated < revision_3.updated)
  144. # Created dates must be equal
  145. ok_(revision_1.created == revision_2.created == revision_3.created)
  146. def test_creates(self):
  147. eq_(
  148. 0,
  149. self.session.query(ContentRevisionRO).filter(
  150. ContentRevisionRO.label == 'TEST_CONTENT_1'
  151. ).count()
  152. )
  153. eq_(
  154. 0,
  155. self.session.query(Workspace).filter(
  156. Workspace.label == 'TEST_WORKSPACE_1'
  157. ).count()
  158. )
  159. user_admin = self.session.query(User).filter(
  160. User.email == 'admin@admin.admin'
  161. ).one()
  162. workspace = Workspace(label="TEST_WORKSPACE_1")
  163. self.session.add(workspace)
  164. self.session.flush()
  165. eq_(
  166. 1,
  167. self.session.query(Workspace).filter(
  168. Workspace.label == 'TEST_WORKSPACE_1'
  169. ).count()
  170. )
  171. first_content = self._create_content(
  172. owner=user_admin,
  173. workspace=workspace,
  174. type=ContentType.Page,
  175. label='TEST_CONTENT_1',
  176. description='TEST_CONTENT_DESCRIPTION_1',
  177. revision_type=ActionDescription.CREATION,
  178. is_deleted=False,
  179. is_archived=False,
  180. )
  181. eq_(
  182. 1,
  183. self.session.query(ContentRevisionRO).filter(
  184. ContentRevisionRO.label == 'TEST_CONTENT_1'
  185. ).count()
  186. )
  187. content = self.session.query(Content).filter(
  188. Content.id == first_content.id
  189. ).one()
  190. eq_('TEST_CONTENT_1', content.label)
  191. eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
  192. # Create a second content
  193. second_content = self._create_content(
  194. owner=user_admin,
  195. workspace=workspace,
  196. type=ContentType.Page,
  197. label='TEST_CONTENT_2',
  198. description='TEST_CONTENT_DESCRIPTION_2',
  199. revision_type=ActionDescription.CREATION
  200. )
  201. eq_(
  202. 1,
  203. self.session.query(ContentRevisionRO).filter(
  204. ContentRevisionRO.label == 'TEST_CONTENT_2'
  205. ).count()
  206. )
  207. content = self.session.query(Content).filter(
  208. Content.id == second_content.id
  209. ).one()
  210. eq_('TEST_CONTENT_2', content.label)
  211. eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
  212. def test_create(self, key='1'):
  213. eq_(
  214. 0,
  215. self.session.query(ContentRevisionRO).filter(
  216. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count()
  217. )
  218. eq_(
  219. 0,
  220. self.session.query(Workspace).filter(
  221. Workspace.label == 'TEST_WORKSPACE_%s' % key).count()
  222. )
  223. user_admin = self.session.query(User).filter(
  224. User.email == 'admin@admin.admin'
  225. ).one()
  226. workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
  227. self.session.add(workspace)
  228. self.session.flush()
  229. eq_(
  230. 1,
  231. self.session.query(Workspace).filter(
  232. Workspace.label == 'TEST_WORKSPACE_%s' % key
  233. ).count()
  234. )
  235. created_content = self._create_content(
  236. owner=user_admin,
  237. workspace=workspace,
  238. type=ContentType.Page,
  239. label='TEST_CONTENT_%s' % key,
  240. description='TEST_CONTENT_DESCRIPTION_%s' % key,
  241. revision_type=ActionDescription.CREATION
  242. )
  243. eq_(
  244. 1,
  245. self.session.query(ContentRevisionRO).filter(
  246. ContentRevisionRO.label == 'TEST_CONTENT_%s' % key
  247. ).count()
  248. )
  249. content = self.session.query(Content).filter(
  250. Content.id == created_content.id
  251. ).one()
  252. eq_('TEST_CONTENT_%s' % key, content.label)
  253. eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
  254. return created_content
  255. def _get_user(self):
  256. email = 'admin@admin.admin'
  257. user_query = self.session.query(User)
  258. user_filter = user_query.filter(User.email == email)
  259. user = user_filter.one()
  260. return user
  261. def _create_content(self, *args, **kwargs):
  262. content = Content(*args, **kwargs)
  263. self.session.add(content)
  264. self.session.flush()
  265. return content
  266. def _create_content_from_nothing(self):
  267. user_admin = self._get_user()
  268. workspace = Workspace(label="TEST_WORKSPACE_1")
  269. content = self._create_content(
  270. owner=user_admin,
  271. workspace=workspace,
  272. type=ContentType.File,
  273. label='TEST_CONTENT_1',
  274. description='TEST_CONTENT_DESCRIPTION_1',
  275. revision_type=ActionDescription.CREATION,
  276. )
  277. return content
  278. def test_unit__content_depot_file(self):
  279. """ Depot file access thought content property methods. """
  280. content = self._create_content_from_nothing()
  281. # tests uninitialized depot file
  282. eq_(content.depot_file, None)
  283. # initializes depot file
  284. # which is able to behave like a python file object
  285. content.depot_file = b'test'
  286. # tests initialized depot file
  287. ok_(content.depot_file)
  288. # tests type of initialized depot file
  289. eq_(type(content.depot_file), UploadedFile)
  290. # tests content of initialized depot file
  291. # using depot_file.file of type StoredFile to fetch content back
  292. eq_(content.depot_file.file.read(), b'test')