test_processing.py 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # coding: utf-8
  2. import ctypes
  3. import multiprocessing
  4. import psutil
  5. from multiprocessing import Manager
  6. from multiprocessing import Array
  7. import pytest
  8. from synergine2.config import Config
  9. from synergine2.processing import ProcessManager
  10. from synergine2.share import SharedDataManager
  11. from tests import BaseTest
  12. available_cores = len(psutil.Process().cpu_affinity())
  13. class MyFakeClass(object):
  14. def __init__(self, value):
  15. self.value = value
  16. class TestProcessing(BaseTest):
  17. @pytest.mark.timeout(10)
  18. def make_job_with_scalar(
  19. self,
  20. worker_id: int,
  21. process_count: int,
  22. data: list,
  23. ):
  24. result = sum(data)
  25. return result
  26. @pytest.mark.timeout(10)
  27. def make_job_with_object(
  28. self,
  29. worker_id: int,
  30. process_count: int,
  31. data: list,
  32. ):
  33. data = [o.value for o in data]
  34. result = sum(data)
  35. return MyFakeClass(result)
  36. def test_parallel_jobs_with_scalar(self):
  37. process_manager = ProcessManager(
  38. config=Config({}),
  39. process_count=available_cores,
  40. job=self.make_job_with_scalar,
  41. )
  42. data = list(range(100))
  43. results = process_manager.make_them_work(data)
  44. process_manager.terminate()
  45. assert sum(results) == 4950 * available_cores
  46. @pytest.mark.timeout(10)
  47. def test_non_parallel_jobs_with_scalar(self):
  48. # TODO: process manager utilise actuellement un cpu quand même, changer ca
  49. process_manager = ProcessManager(
  50. config=Config({}),
  51. process_count=1,
  52. job=self.make_job_with_scalar,
  53. )
  54. data = list(range(100))
  55. results = process_manager.make_them_work(data)
  56. process_manager.terminate()
  57. final_result = results[0]
  58. assert len(results) == 1
  59. assert final_result == 4950
  60. @pytest.mark.timeout(10)
  61. def test_parallel_jobs_with_objects(self):
  62. process_manager = ProcessManager(
  63. config=Config({}),
  64. process_count=available_cores,
  65. job=self.make_job_with_object,
  66. )
  67. data = [MyFakeClass(v) for v in range(100)]
  68. final_result = 0
  69. results = process_manager.make_them_work(data)
  70. process_manager.terminate()
  71. for result_object in results:
  72. final_result += result_object.value
  73. assert final_result == 4950 * available_cores
  74. @pytest.mark.timeout(10)
  75. def test_shared_memory_with_shared_manager(self):
  76. shared = SharedDataManager()
  77. shared.set('counter', 42)
  78. shared.commit()
  79. def job(*args, **kwargs):
  80. shared.refresh()
  81. counter = shared.get('counter') or 0
  82. return counter + 1
  83. process_manager = ProcessManager(
  84. config=Config({}),
  85. process_count=available_cores,
  86. job=job,
  87. )
  88. results = process_manager.make_them_work(None)
  89. process_manager.terminate()
  90. assert results[0] == 43
  91. @pytest.mark.timeout(10)
  92. def test_share_data_with_function(self):
  93. shared = SharedDataManager()
  94. class Foo(object):
  95. counter = shared.create('counter', 0)
  96. def job(*args, **kwargs):
  97. shared.refresh()
  98. counter = shared.get('counter') or 0
  99. return counter + 1
  100. process_manager = ProcessManager(
  101. config=Config({}),
  102. process_count=available_cores,
  103. job=job,
  104. )
  105. foo = Foo()
  106. foo.counter = 42
  107. shared.commit()
  108. results = process_manager.make_them_work(None)
  109. assert results[0] == 43
  110. foo.counter = 45
  111. shared.commit()
  112. results = process_manager.make_them_work(None)
  113. assert results[0] == 46
  114. process_manager.terminate()
  115. @pytest.mark.timeout(10)
  116. def test_after_created_shared_data(self):
  117. shared = SharedDataManager()
  118. shared.set('foo_1', 0)
  119. def job(worker_id, processes_count, key):
  120. shared.refresh()
  121. value = shared.get('foo_{}'.format(key)) or 0
  122. return value + 1
  123. process_manager = ProcessManager(
  124. config=Config({}),
  125. process_count=available_cores,
  126. job=job,
  127. )
  128. shared.set('foo_1', 42)
  129. shared.commit()
  130. results = process_manager.make_them_work('1')
  131. assert results[0] == 43
  132. shared.set('foo_2', 52)
  133. shared.commit()
  134. results = process_manager.make_them_work('2')
  135. assert results[0] == 53
  136. process_manager.terminate()