test_content.py 9.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # -*- coding: utf-8 -*-
  2. import time
  3. from nose.tools import ok_
  4. from nose.tools import raises
  5. from sqlalchemy.sql.elements import and_
  6. from sqlalchemy.testing import eq_
  7. from depot.fields.upload import UploadedFile
  8. from tracim.lib.content import ContentApi
  9. from tracim.lib.exception import ContentRevisionUpdateError
  10. from tracim.model import Content
  11. from tracim.model import DBSession
  12. from tracim.model import new_revision
  13. from tracim.model import User
  14. from tracim.model.data import ActionDescription
  15. from tracim.model.data import ContentRevisionRO
  16. from tracim.model.data import ContentType
  17. from tracim.model.data import Workspace
  18. from tracim.tests import TestStandard
  19. class TestContent(TestStandard):
  20. @raises(ContentRevisionUpdateError)
  21. def test_update_without_prepare(self):
  22. content1 = self.test_create()
  23. content1.description = 'FOO' # Raise ContentRevisionUpdateError because revision can't be updated
  24. def test_query(self):
  25. content1 = self.test_create()
  26. with new_revision(content1):
  27. content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  28. DBSession.flush()
  29. content2 = self.test_create(key='2')
  30. with new_revision(content2):
  31. content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
  32. DBSession.flush()
  33. workspace1 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').one()
  34. workspace2 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_2').one()
  35. # To get Content in database we have to join Content and ContentRevisionRO with particular condition:
  36. # Join have to be on most recent revision
  37. join_sub_query = DBSession.query(ContentRevisionRO.revision_id)\
  38. .filter(ContentRevisionRO.content_id == Content.id)\
  39. .order_by(ContentRevisionRO.revision_id.desc())\
  40. .limit(1)\
  41. .correlate(Content)
  42. base_query = DBSession.query(Content)\
  43. .join(ContentRevisionRO, and_(Content.id == ContentRevisionRO.content_id,
  44. ContentRevisionRO.revision_id == join_sub_query))
  45. eq_(2, base_query.count())
  46. eq_(1, base_query.filter(Content.workspace == workspace1).count())
  47. eq_(1, base_query.filter(Content.workspace == workspace2).count())
  48. content1_from_query = base_query.filter(Content.workspace == workspace1).one()
  49. eq_(content1.id, content1_from_query.id)
  50. eq_('TEST_CONTENT_DESCRIPTION_1_UPDATED', content1_from_query.description)
  51. user_admin = DBSession.query(User).filter(User.email == 'admin@admin.admin').one()
  52. api = ContentApi(None)
  53. content1_from_api = api.get_one(content1.id, ContentType.Page, workspace1)
  54. def test_update(self):
  55. created_content = self.test_create()
  56. content = DBSession.query(Content).filter(Content.id == created_content.id).one()
  57. eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  58. with new_revision(content):
  59. time.sleep(0.00001)
  60. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
  61. DBSession.flush()
  62. eq_(2, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  63. eq_(1, DBSession.query(Content).filter(Content.id == created_content.id).count())
  64. with new_revision(content):
  65. time.sleep(0.00001)
  66. content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
  67. content.label = 'TEST_CONTENT_1_UPDATED_2'
  68. DBSession.flush()
  69. eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2').count())
  70. eq_(1, DBSession.query(Content).filter(Content.id == created_content.id).count())
  71. revision_1 = DBSession.query(ContentRevisionRO)\
  72. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1').one()
  73. revision_2 = DBSession.query(ContentRevisionRO)\
  74. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED').one()
  75. revision_3 = DBSession.query(ContentRevisionRO)\
  76. .filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2').one()
  77. # Updated dates must be different
  78. ok_(revision_1.updated < revision_2.updated < revision_3.updated)
  79. # Created dates must be equal
  80. ok_(revision_1.created == revision_2.created == revision_3.created)
  81. def test_creates(self):
  82. eq_(0, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  83. eq_(0, DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').count())
  84. user_admin = DBSession.query(User).filter(User.email == 'admin@admin.admin').one()
  85. workspace = Workspace(label="TEST_WORKSPACE_1")
  86. DBSession.add(workspace)
  87. DBSession.flush()
  88. eq_(1, DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').count())
  89. first_content = self._create_content(
  90. owner=user_admin,
  91. workspace=workspace,
  92. type=ContentType.Page,
  93. label='TEST_CONTENT_1',
  94. description='TEST_CONTENT_DESCRIPTION_1',
  95. revision_type=ActionDescription.CREATION,
  96. is_deleted=False, # TODO: pk ?
  97. is_archived=False, # TODO: pk ?
  98. # file_content=None, # TODO: pk ? (J'ai du mettre nullable=True)
  99. )
  100. eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
  101. content = DBSession.query(Content).filter(Content.id == first_content.id).one()
  102. eq_('TEST_CONTENT_1', content.label)
  103. eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
  104. # Create a second content
  105. second_content = self._create_content(
  106. owner=user_admin,
  107. workspace=workspace,
  108. type=ContentType.Page,
  109. label='TEST_CONTENT_2',
  110. description='TEST_CONTENT_DESCRIPTION_2',
  111. revision_type=ActionDescription.CREATION
  112. )
  113. eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_2').count())
  114. content = DBSession.query(Content).filter(Content.id == second_content.id).one()
  115. eq_('TEST_CONTENT_2', content.label)
  116. eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
  117. def test_create(self, key='1'):
  118. eq_(0, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count())
  119. eq_(0, DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_%s' % key).count())
  120. user_admin = DBSession.query(User).filter(User.email == 'admin@admin.admin').one()
  121. workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
  122. DBSession.add(workspace)
  123. DBSession.flush()
  124. eq_(1, DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_%s' % key).count())
  125. created_content = self._create_content(
  126. owner=user_admin,
  127. workspace=workspace,
  128. type=ContentType.Page,
  129. label='TEST_CONTENT_%s' % key,
  130. description='TEST_CONTENT_DESCRIPTION_%s' % key,
  131. revision_type=ActionDescription.CREATION
  132. )
  133. eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count())
  134. content = DBSession.query(Content).filter(Content.id == created_content.id).one()
  135. eq_('TEST_CONTENT_%s' % key, content.label)
  136. eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
  137. return created_content
  138. def _get_user(self):
  139. email = 'admin@admin.admin'
  140. user_query = DBSession.query(User)
  141. user_filter = user_query.filter(User.email == email)
  142. user = user_filter.one()
  143. return user
  144. def _create_content(self, *args, **kwargs):
  145. content = Content(*args, **kwargs)
  146. DBSession.add(content)
  147. DBSession.flush()
  148. return content
  149. def _create_content_from_nothing(self):
  150. user_admin = self._get_user()
  151. workspace = Workspace(label="TEST_WORKSPACE_1")
  152. content = self._create_content(
  153. owner=user_admin,
  154. workspace=workspace,
  155. type=ContentType.File,
  156. label='TEST_CONTENT_1',
  157. description='TEST_CONTENT_DESCRIPTION_1',
  158. revision_type=ActionDescription.CREATION,
  159. )
  160. return content
  161. def test_unit__content_depot_file_uid(self):
  162. """ Depot file access thought content property methods. """
  163. content = self._create_content_from_nothing()
  164. # tests uninitialized depot file
  165. eq_(content.depot_file_uid, None)
  166. # initializes depot file
  167. content.depot_file_uid = b'test'
  168. # tests initialized depot file
  169. ok_(content.depot_file_uid)
  170. # tests type of initialized depot file
  171. eq_(type(content.depot_file_uid), UploadedFile)
  172. # tests content of initialized depot file
  173. eq_(content.depot_file_uid.file.read(), b'test')