test_processing.py 4.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # coding: utf-8
  2. import psutil
  3. import pytest
  4. from synergine2.config import Config
  5. from synergine2.processing import ProcessManager
  6. from synergine2.share import SharedDataManager
  7. from tests import BaseTest
  8. available_cores = len(psutil.Process().cpu_affinity())
  9. class MyFakeClass(object):
  10. def __init__(self, value):
  11. self.value = value
  12. class TestProcessing(BaseTest):
  13. @pytest.mark.timeout(10)
  14. def make_job_with_scalar(
  15. self,
  16. worker_id: int,
  17. process_count: int,
  18. data: list,
  19. ):
  20. result = sum(data)
  21. return result
  22. @pytest.mark.timeout(10)
  23. def make_job_with_object(
  24. self,
  25. worker_id: int,
  26. process_count: int,
  27. data: list,
  28. ):
  29. data = [o.value for o in data]
  30. result = sum(data)
  31. return MyFakeClass(result)
  32. def test_parallel_jobs_with_scalar(self):
  33. process_manager = ProcessManager(
  34. config=Config({}),
  35. process_count=available_cores,
  36. job=self.make_job_with_scalar,
  37. )
  38. data = list(range(100))
  39. results = process_manager.make_them_work(data)
  40. process_manager.terminate()
  41. assert sum(results) == 4950 * available_cores
  42. @pytest.mark.timeout(10)
  43. def test_non_parallel_jobs_with_scalar(self):
  44. # TODO: process manager utilise actuellement un cpu quand même, changer ca
  45. process_manager = ProcessManager(
  46. config=Config({}),
  47. process_count=1,
  48. job=self.make_job_with_scalar,
  49. )
  50. data = list(range(100))
  51. results = process_manager.make_them_work(data)
  52. process_manager.terminate()
  53. final_result = results[0]
  54. assert len(results) == 1
  55. assert final_result == 4950
  56. @pytest.mark.timeout(10)
  57. def test_parallel_jobs_with_objects(self):
  58. process_manager = ProcessManager(
  59. config=Config({}),
  60. process_count=available_cores,
  61. job=self.make_job_with_object,
  62. )
  63. data = [MyFakeClass(v) for v in range(100)]
  64. final_result = 0
  65. results = process_manager.make_them_work(data)
  66. process_manager.terminate()
  67. for result_object in results:
  68. final_result += result_object.value
  69. assert final_result == 4950 * available_cores
  70. @pytest.mark.timeout(10)
  71. def test_shared_memory_with_shared_manager(self):
  72. shared = SharedDataManager()
  73. shared.set('counter', 42)
  74. shared.commit()
  75. def job(*args, **kwargs):
  76. shared.refresh()
  77. counter = shared.get('counter') or 0
  78. return counter + 1
  79. process_manager = ProcessManager(
  80. config=Config({}),
  81. process_count=available_cores,
  82. job=job,
  83. )
  84. results = process_manager.make_them_work(None)
  85. process_manager.terminate()
  86. assert results[0] == 43
  87. @pytest.mark.timeout(10)
  88. def test_share_data_with_function(self):
  89. shared = SharedDataManager()
  90. class Foo(object):
  91. counter = shared.create('counter', 0)
  92. def job(*args, **kwargs):
  93. shared.refresh()
  94. counter = shared.get('counter') or 0
  95. return counter + 1
  96. process_manager = ProcessManager(
  97. config=Config({}),
  98. process_count=available_cores,
  99. job=job,
  100. )
  101. foo = Foo()
  102. foo.counter = 42
  103. shared.commit()
  104. results = process_manager.make_them_work(None)
  105. assert results[0] == 43
  106. foo.counter = 45
  107. shared.commit()
  108. results = process_manager.make_them_work(None)
  109. assert results[0] == 46
  110. process_manager.terminate()
  111. @pytest.mark.timeout(10)
  112. def test_after_created_shared_data(self):
  113. shared = SharedDataManager()
  114. shared.set('foo_1', 0)
  115. def job(worker_id, processes_count, key):
  116. shared.refresh()
  117. value = shared.get('foo_{}'.format(key)) or 0
  118. return value + 1
  119. process_manager = ProcessManager(
  120. config=Config({}),
  121. process_count=available_cores,
  122. job=job,
  123. )
  124. shared.set('foo_1', 42)
  125. shared.commit()
  126. results = process_manager.make_them_work('1')
  127. assert results[0] == 43
  128. shared.set('foo_2', 52)
  129. shared.commit()
  130. results = process_manager.make_them_work('2')
  131. assert results[0] == 53
  132. process_manager.terminate()