test_processing.py 2.1KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import os
  2. from synergine2.processing import ProcessManager
  3. from synergine2.utils import ChunkManager
  4. from tests import BaseTest
  5. class MyFakeClass(object):
  6. def __init__(self, value):
  7. self.value = value
  8. class TestProcessing(BaseTest):
  9. @staticmethod
  10. def _make_job_with_scalar(data_chunk: list) -> tuple:
  11. current_pid = os.getpid()
  12. result = sum(data_chunk)
  13. return current_pid, result
  14. @staticmethod
  15. def _make_job_with_object(data_chunk: list) -> tuple:
  16. current_pid = os.getpid()
  17. data = [o.value for o in data_chunk]
  18. result = sum(data)
  19. return current_pid, MyFakeClass(result)
  20. def test_parallel_jobs_with_scalar(self):
  21. chunk_manager = ChunkManager(4)
  22. process_manager = ProcessManager(
  23. process_count=4,
  24. chunk_manager=chunk_manager,
  25. job_maker=self._make_job_with_scalar,
  26. )
  27. data = list(range(100))
  28. process_id_list = []
  29. final_result = 0
  30. results = process_manager.execute_jobs(data)
  31. for process_id, result in results:
  32. final_result += result
  33. process_id_list.append(process_id)
  34. # Test each process ids are differents
  35. assert sorted(process_id_list) == \
  36. sorted(list(set(process_id_list)))
  37. # Goal is 4950
  38. assert final_result == 4950
  39. def test_parallel_jobs_with_objects(self):
  40. chunk_manager = ChunkManager(4)
  41. process_manager = ProcessManager(
  42. process_count=4,
  43. chunk_manager=chunk_manager,
  44. job_maker=self._make_job_with_object,
  45. )
  46. data = [MyFakeClass(v) for v in range(100)]
  47. process_id_list = []
  48. final_result = 0
  49. results = process_manager.execute_jobs(data)
  50. for process_id, result_object in results:
  51. final_result += result_object.value
  52. process_id_list.append(process_id)
  53. # Test each process ids are differents
  54. assert sorted(process_id_list) == \
  55. sorted(list(set(process_id_list)))
  56. # Goal is 4950
  57. assert final_result == 4950