test_content.py 9.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # -*- coding: utf-8 -*-
  2. import time
  3. from depot.fields.upload import UploadedFile
  4. from nose.tools import ok_
  5. from nose.tools import raises
  6. from sqlalchemy.sql.elements import and_
  7. from sqlalchemy.testing import eq_
  8. import transaction
  9. # from tracim.lib.content import ContentApi
  10. from tracim.exceptions import ContentRevisionUpdateError
  11. from tracim.models import Content
  12. from tracim.models.revision_protection import new_revision
  13. from tracim.models import User
  14. from tracim.models.data import ActionDescription
  15. from tracim.models.data import ContentRevisionRO
  16. from tracim.models.data import ContentType
  17. from tracim.models.data import Workspace
  18. from tracim.tests import StandardTest
  19. class TestContent(StandardTest):
  20. @raises(ContentRevisionUpdateError)
  21. def test_update_without_prepare(self):
  22. content1 = self.test_create()
  23. content1.description = 'FOO' # Raise ContentRevisionUpdateError because revision can't be updated
  24. # TODO - G.M - 28-03-2018 - Reenable this test when libContent is available
  25. # def test_query(self):
  26. # content1 = self.test_create()
  27. # with new_revision(content1):
  28. # content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  29. # DBSession.flush()
  30. #
  31. # content2 = self.test_create(key='2')
  32. # with new_revision(content2):
  33. # content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
  34. # DBSession.flush()
  35. #
  36. # workspace1 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').one()
  37. # workspace2 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_2').one()
  38. #
  39. # # To get Content in database we have to join Content and ContentRevisionRO with particular condition:
  40. # # Join have to be on most recent revision
  41. # join_sub_query = DBSession.query(ContentRevisionRO.revision_id)\
  42. # .filter(ContentRevisionRO.content_id == Content.id)\
  43. # .order_by(ContentRevisionRO.revision_id.desc())\
  44. # .limit(1)\
  45. # .correlate(Content)
  46. #
  47. # base_query = DBSession.query(Content)\
  48. # .join(ContentRevisionRO, and_(Content.id == ContentRevisionRO.content_id,
  49. # ContentRevisionRO.revision_id == join_sub_query))
  50. #
  51. # pattern = 'TEST_CONTENT_DESCRIPTION_%_UPDATED'
  52. # eq_(2, base_query.filter(Content.description.like(pattern)).count())
  53. #
  54. # eq_(1, base_query.filter(Content.workspace == workspace1).count())
  55. # eq_(1, base_query.filter(Content.workspace == workspace2).count())
  56. #
  57. # content1_from_query = base_query.filter(Content.workspace == workspace1).one()
  58. # eq_(content1.id, content1_from_query.id)
  59. # eq_('TEST_CONTENT_DESCRIPTION_1_UPDATED', content1_from_query.description)
  60. #
  61. # user_admin = DBSession.query(User).filter(User.email == 'admin@admin.admin').one()
  62. #
  63. # api = ContentApi(None)
  64. #
  65. # content1_from_api = api.get_one(content1.id, ContentType.Page, workspace1)
  66. def test_update(self):
  67. created_content = self.test_create()
  68. content = self.session.query(Content).filter(Content.id == created_content.id).one()
  69. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  70. with new_revision(
  71. dbsession=self.session,
  72. tm=transaction.manager,
  73. content=content
  74. ):
  75. time.sleep(0.00001)
  76. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  77. self.session.flush()
  78. eq_(2, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  79. eq_(1, self.session.query(Content).filter(Content.id == created_content.id).count())
  80. with new_revision(
  81. dbsession=self.session,
  82. tm=transaction.manager,
  83. content=content
  84. ):
  85. time.sleep(0.00001)
  86. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
  87. content.label = 'TEST_CONTENT_1_UPDATED_2'
  88. self.session.flush()
  89. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2').count())
  90. eq_(1, self.session.query(Content).filter(Content.id == created_content.id).count())
  91. revision_1 = self.session.query(ContentRevisionRO)\
  92. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1').one()
  93. revision_2 = self.session.query(ContentRevisionRO)\
  94. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED').one()
  95. revision_3 = self.session.query(ContentRevisionRO)\
  96. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2').one()
  97. # Updated dates must be different
  98. ok_(revision_1.updated < revision_2.updated < revision_3.updated)
  99. # Created dates must be equal
  100. ok_(revision_1.created == revision_2.created == revision_3.created)
  101. def test_creates(self):
  102. eq_(0, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  103. eq_(0, self.session.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').count())
  104. user_admin = self.session.query(User).filter(User.email == 'admin@admin.admin').one()
  105. workspace = Workspace(label="TEST_WORKSPACE_1")
  106. self.session.add(workspace)
  107. self.session.flush()
  108. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').count())
  109. first_content = self._create_content(
  110. owner=user_admin,
  111. workspace=workspace,
  112. type=ContentType.Page,
  113. label='TEST_CONTENT_1',
  114. description='TEST_CONTENT_DESCRIPTION_1',
  115. revision_type=ActionDescription.CREATION,
  116. is_deleted=False,
  117. is_archived=False,
  118. )
  119. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  120. content = self.session.query(Content).filter(Content.id == first_content.id).one()
  121. eq_('TEST_CONTENT_1', content.label)
  122. eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
  123. # Create a second content
  124. second_content = self._create_content(
  125. owner=user_admin,
  126. workspace=workspace,
  127. type=ContentType.Page,
  128. label='TEST_CONTENT_2',
  129. description='TEST_CONTENT_DESCRIPTION_2',
  130. revision_type=ActionDescription.CREATION
  131. )
  132. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_2').count())
  133. content = self.session.query(Content).filter(Content.id == second_content.id).one()
  134. eq_('TEST_CONTENT_2', content.label)
  135. eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
  136. def test_create(self, key='1'):
  137. eq_(0, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count())
  138. eq_(0, self.session.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_%s' % key).count())
  139. user_admin = self.session.query(User).filter(User.email == 'admin@admin.admin').one()
  140. workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
  141. self.session.add(workspace)
  142. self.session.flush()
  143. eq_(1, self.session.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_%s' % key).count())
  144. created_content = self._create_content(
  145. owner=user_admin,
  146. workspace=workspace,
  147. type=ContentType.Page,
  148. label='TEST_CONTENT_%s' % key,
  149. description='TEST_CONTENT_DESCRIPTION_%s' % key,
  150. revision_type=ActionDescription.CREATION
  151. )
  152. eq_(1, self.session.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count())
  153. content = self.session.query(Content).filter(Content.id == created_content.id).one()
  154. eq_('TEST_CONTENT_%s' % key, content.label)
  155. eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
  156. return created_content
  157. def _get_user(self):
  158. email = 'admin@admin.admin'
  159. user_query = self.session.query(User)
  160. user_filter = user_query.filter(User.email == email)
  161. user = user_filter.one()
  162. return user
  163. def _create_content(self, *args, **kwargs):
  164. content = Content(*args, **kwargs)
  165. self.session.add(content)
  166. self.session.flush()
  167. return content
  168. def _create_content_from_nothing(self):
  169. user_admin = self._get_user()
  170. workspace = Workspace(label="TEST_WORKSPACE_1")
  171. content = self._create_content(
  172. owner=user_admin,
  173. workspace=workspace,
  174. type=ContentType.File,
  175. label='TEST_CONTENT_1',
  176. description='TEST_CONTENT_DESCRIPTION_1',
  177. revision_type=ActionDescription.CREATION,
  178. )
  179. return content
  180. def test_unit__content_depot_file(self):
  181. """ Depot file access thought content property methods. """
  182. content = self._create_content_from_nothing()
  183. # tests uninitialized depot file
  184. eq_(content.depot_file, None)
  185. # initializes depot file
  186. # which is able to behave like a python file object
  187. content.depot_file = b'test'
  188. # tests initialized depot file
  189. ok_(content.depot_file)
  190. # tests type of initialized depot file
  191. eq_(type(content.depot_file), UploadedFile)
  192. # tests content of initialized depot file
  193. # using depot_file.file of type StoredFile to fetch content back
  194. eq_(content.depot_file.file.read(), b'test')