test_processing.py 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # coding: utf-8
  2. import psutil
  3. import pytest
  4. from synergine2.config import Config
  5. from synergine2.processing import ProcessManager
  6. from synergine2.share import SharedDataManager
  7. from tests import BaseTest
  8. available_cores = len(psutil.Process().cpu_affinity())
  9. class MyFakeClass(object):
  10. def __init__(self, value):
  11. self.value = value
  12. class TestProcessing(BaseTest):
  13. @pytest.mark.timeout(10)
  14. def make_job_with_scalar(
  15. self,
  16. worker_id: int,
  17. process_count: int,
  18. data: list,
  19. ):
  20. result = sum(data)
  21. return result
  22. @pytest.mark.timeout(10)
  23. def make_job_with_object(
  24. self,
  25. worker_id: int,
  26. process_count: int,
  27. data: list,
  28. ):
  29. data = [o.value for o in data]
  30. result = sum(data)
  31. return MyFakeClass(result)
  32. def test_parallel_jobs_with_scalar(self):
  33. process_manager = ProcessManager(
  34. config=Config({}),
  35. process_count=available_cores,
  36. job=self.make_job_with_scalar,
  37. )
  38. process_manager.start_workers()
  39. data = list(range(100))
  40. results = process_manager.make_them_work(data)
  41. process_manager.terminate()
  42. assert sum(results) == 4950 * available_cores
  43. @pytest.mark.timeout(10)
  44. def test_non_parallel_jobs_with_scalar(self):
  45. # TODO: process manager utilise actuellement un cpu quand même, changer ca
  46. process_manager = ProcessManager(
  47. config=Config({}),
  48. process_count=1,
  49. job=self.make_job_with_scalar,
  50. )
  51. process_manager.start_workers()
  52. data = list(range(100))
  53. results = process_manager.make_them_work(data)
  54. process_manager.terminate()
  55. final_result = results[0]
  56. assert len(results) == 1
  57. assert final_result == 4950
  58. @pytest.mark.timeout(10)
  59. def test_parallel_jobs_with_objects(self):
  60. process_manager = ProcessManager(
  61. config=Config({}),
  62. process_count=available_cores,
  63. job=self.make_job_with_object,
  64. )
  65. process_manager.start_workers()
  66. data = [MyFakeClass(v) for v in range(100)]
  67. final_result = 0
  68. results = process_manager.make_them_work(data)
  69. process_manager.terminate()
  70. for result_object in results:
  71. final_result += result_object.value
  72. assert final_result == 4950 * available_cores
  73. @pytest.mark.timeout(10)
  74. def test_shared_memory_with_shared_manager(self):
  75. shared = SharedDataManager()
  76. shared.set('counter', 42)
  77. shared.commit()
  78. def job(*args, **kwargs):
  79. shared.refresh()
  80. counter = shared.get('counter') or 0
  81. return counter + 1
  82. process_manager = ProcessManager(
  83. config=Config({}),
  84. process_count=available_cores,
  85. job=job,
  86. )
  87. process_manager.start_workers()
  88. results = process_manager.make_them_work(None)
  89. process_manager.terminate()
  90. assert results[0] == 43
  91. @pytest.mark.timeout(10)
  92. def test_share_data_with_function(self):
  93. shared = SharedDataManager()
  94. class Foo(object):
  95. counter = shared.create('counter', 0)
  96. def job(*args, **kwargs):
  97. shared.refresh()
  98. counter = shared.get('counter') or 0
  99. return counter + 1
  100. process_manager = ProcessManager(
  101. config=Config({}),
  102. process_count=available_cores,
  103. job=job,
  104. )
  105. process_manager.start_workers()
  106. foo = Foo()
  107. foo.counter = 42
  108. shared.commit()
  109. results = process_manager.make_them_work(None)
  110. assert results[0] == 43
  111. foo.counter = 45
  112. shared.commit()
  113. results = process_manager.make_them_work(None)
  114. assert results[0] == 46
  115. process_manager.terminate()
  116. @pytest.mark.timeout(10)
  117. def test_after_created_shared_data(self):
  118. shared = SharedDataManager()
  119. shared.set('foo_1', 0)
  120. def job(worker_id, processes_count, key):
  121. shared.refresh()
  122. value = shared.get('foo_{}'.format(key)) or 0
  123. return value + 1
  124. process_manager = ProcessManager(
  125. config=Config({}),
  126. process_count=available_cores,
  127. job=job,
  128. )
  129. process_manager.start_workers()
  130. shared.set('foo_1', 42)
  131. shared.commit()
  132. results = process_manager.make_them_work('1')
  133. assert results[0] == 43
  134. shared.set('foo_2', 52)
  135. shared.commit()
  136. results = process_manager.make_them_work('2')
  137. assert results[0] == 53
  138. process_manager.terminate()