123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- # -*- coding: utf-8 -*-
- import time
-
- from depot.fields.upload import UploadedFile
- from sqlalchemy.sql.elements import and_
- from sqlalchemy.testing import eq_
- import transaction
- import pytest
-
- # from tracim.lib.content import ContentApi
- from tracim_backend.exceptions import ContentRevisionUpdateError
- from tracim_backend.lib.core.content import ContentApi
- from tracim_backend.models import Content
- from tracim_backend.models.revision_protection import new_revision
- from tracim_backend.models import User
- from tracim_backend.models.data import ActionDescription
- from tracim_backend.models.data import ContentRevisionRO
- from tracim_backend.models.data import ContentType
- from tracim_backend.models.data import Workspace
- from tracim_backend.tests import StandardTest
-
-
- class TestContent(StandardTest):
-
- def test_update_without_prepare(self):
- content1 = self.test_create()
- with pytest.raises(ContentRevisionUpdateError):
- content1.description = 'FOO'
- # Raise ContentRevisionUpdateError because revision can't be updated
-
- def test_query(self):
- content1 = self.test_create()
- with new_revision(
- session=self.session,
- tm=transaction.manager,
- content=content1,
- ):
- content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
- self.session.flush()
-
- content2 = self.test_create(key='2')
- with new_revision(
- session=self.session,
- tm=transaction.manager,
- content=content2,
- ):
- content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
- self.session.flush()
-
- workspace1 = self.session.query(Workspace)\
- .filter(Workspace.label == 'TEST_WORKSPACE_1').one()
- workspace2 = self.session.query(Workspace)\
- .filter(Workspace.label == 'TEST_WORKSPACE_2').one()
-
- # To get Content in database
- # we have to join Content and ContentRevisionRO
- # with particular condition:
- # Join have to be on most recent revision
- join_sub_query = self.session.query(ContentRevisionRO.revision_id)\
- .filter(ContentRevisionRO.content_id == Content.id)\
- .order_by(ContentRevisionRO.revision_id.desc())\
- .limit(1)\
- .correlate(Content)
-
- base_query = self.session.query(Content).join(
- ContentRevisionRO,
- and_(
- Content.id == ContentRevisionRO.content_id,
- ContentRevisionRO.revision_id == join_sub_query
- )
- )
-
- pattern = 'TEST_CONTENT_DESCRIPTION_%_UPDATED'
- eq_(2, base_query.filter(Content.description.like(pattern)).count())
-
- eq_(1, base_query.filter(Content.workspace == workspace1).count())
- eq_(1, base_query.filter(Content.workspace == workspace2).count())
-
- content1_from_query = base_query\
- .filter(Content.workspace == workspace1).one()
- eq_(content1.id, content1_from_query.id)
- eq_(
- 'TEST_CONTENT_DESCRIPTION_1_UPDATED',
- content1_from_query.description
- )
-
- user_admin = self.session.query(User)\
- .filter(User.email == 'admin@admin.admin').one()
-
- api = ContentApi(
- current_user=None,
- session=self.session,
- config=self.app_config,
- )
-
- content1_from_api = api.get_one(
- content1.id,
- ContentType.Page,
- workspace1
- )
-
- def test_update(self):
- created_content = self.test_create()
- content = self.session.query(Content)\
- .filter(Content.id == created_content.id).one()
- eq_(1, self.session.query(ContentRevisionRO)
- .filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
-
- with new_revision(
- session=self.session,
- tm=transaction.manager,
- content=content
- ):
- time.sleep(0.00001)
- content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
- self.session.flush()
-
- eq_(
- 2,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_1'
- ).count()
- )
- eq_(
- 1,
- self.session.query(Content).filter(
- Content.id == created_content.id
- ).count()
- )
-
- with new_revision(
- session=self.session,
- tm=transaction.manager,
- content=content
- ):
- time.sleep(0.00001)
- content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
- content.label = 'TEST_CONTENT_1_UPDATED_2'
- self.session.flush()
-
- eq_(
- 1,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2'
- ).count()
- )
- eq_(
- 1,
- self.session.query(Content).filter(
- Content.id == created_content.id
- ).count()
- )
-
- revision_1 = self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1'
- ).one()
- revision_2 = self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED' # nopep8
- ).one()
- revision_3 = self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2' # nopep8
- ).one()
-
- # Updated dates must be different
- assert revision_1.updated < revision_2.updated < revision_3.updated
- # Created dates must be equal
- assert revision_1.created == revision_2.created == revision_3.created
-
- def test_creates(self):
- eq_(
- 0,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_1'
- ).count()
- )
- eq_(
- 0,
- self.session.query(Workspace).filter(
- Workspace.label == 'TEST_WORKSPACE_1'
- ).count()
- )
-
- user_admin = self.session.query(User).filter(
- User.email == 'admin@admin.admin'
- ).one()
- workspace = Workspace(label="TEST_WORKSPACE_1")
- self.session.add(workspace)
- self.session.flush()
- eq_(
- 1,
- self.session.query(Workspace).filter(
- Workspace.label == 'TEST_WORKSPACE_1'
- ).count()
- )
-
- first_content = self._create_content(
- owner=user_admin,
- workspace=workspace,
- type=ContentType.Page,
- label='TEST_CONTENT_1',
- description='TEST_CONTENT_DESCRIPTION_1',
- revision_type=ActionDescription.CREATION,
- is_deleted=False,
- is_archived=False,
- )
-
- eq_(
- 1,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_1'
- ).count()
- )
-
- content = self.session.query(Content).filter(
- Content.id == first_content.id
- ).one()
- eq_('TEST_CONTENT_1', content.label)
- eq_('TEST_CONTENT_DESCRIPTION_1', content.description)
-
- # Create a second content
- second_content = self._create_content(
- owner=user_admin,
- workspace=workspace,
- type=ContentType.Page,
- label='TEST_CONTENT_2',
- description='TEST_CONTENT_DESCRIPTION_2',
- revision_type=ActionDescription.CREATION
- )
-
- eq_(
- 1,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_2'
- ).count()
- )
-
- content = self.session.query(Content).filter(
- Content.id == second_content.id
- ).one()
- eq_('TEST_CONTENT_2', content.label)
- eq_('TEST_CONTENT_DESCRIPTION_2', content.description)
-
- def test_create(self, key='1'):
- eq_(
- 0,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_%s' % key).count()
- )
- eq_(
- 0,
- self.session.query(Workspace).filter(
- Workspace.label == 'TEST_WORKSPACE_%s' % key).count()
- )
-
- user_admin = self.session.query(User).filter(
- User.email == 'admin@admin.admin'
- ).one()
- workspace = Workspace(label="TEST_WORKSPACE_%s" % key)
- self.session.add(workspace)
- self.session.flush()
- eq_(
- 1,
- self.session.query(Workspace).filter(
- Workspace.label == 'TEST_WORKSPACE_%s' % key
- ).count()
- )
-
- created_content = self._create_content(
- owner=user_admin,
- workspace=workspace,
- type=ContentType.Page,
- label='TEST_CONTENT_%s' % key,
- description='TEST_CONTENT_DESCRIPTION_%s' % key,
- revision_type=ActionDescription.CREATION
- )
-
- eq_(
- 1,
- self.session.query(ContentRevisionRO).filter(
- ContentRevisionRO.label == 'TEST_CONTENT_%s' % key
- ).count()
- )
-
- content = self.session.query(Content).filter(
- Content.id == created_content.id
- ).one()
- eq_('TEST_CONTENT_%s' % key, content.label)
- eq_('TEST_CONTENT_DESCRIPTION_%s' % key, content.description)
-
- return created_content
-
- def _get_user(self):
- email = 'admin@admin.admin'
- user_query = self.session.query(User)
- user_filter = user_query.filter(User.email == email)
- user = user_filter.one()
- return user
-
- def _create_content(self, *args, **kwargs):
- content = Content(*args, **kwargs)
- self.session.add(content)
- self.session.flush()
- return content
-
- def _create_content_from_nothing(self):
- user_admin = self._get_user()
- workspace = Workspace(label="TEST_WORKSPACE_1")
- content = self._create_content(
- owner=user_admin,
- workspace=workspace,
- type=ContentType.File,
- label='TEST_CONTENT_1',
- description='TEST_CONTENT_DESCRIPTION_1',
- revision_type=ActionDescription.CREATION,
- )
- return content
-
- def test_unit__content_depot_file(self):
- """ Depot file access thought content property methods. """
- content = self._create_content_from_nothing()
- # tests uninitialized depot file
- eq_(content.depot_file, None)
- # initializes depot file
- # which is able to behave like a python file object
- content.depot_file = b'test'
- # tests initialized depot file
- assert content.depot_file
- # tests type of initialized depot file
- eq_(type(content.depot_file), UploadedFile)
- # tests content of initialized depot file
- # using depot_file.file of type StoredFile to fetch content back
- eq_(content.depot_file.file.read(), b'test')
|