test_processing.py 2.3KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import os
  2. from synergine2.processing import ProcessManager
  3. from synergine2.utils import ChunkManager
  4. from tests import BaseTest
  5. class MyFakeClass(object):
  6. def __init__(self, value):
  7. self.value = value
  8. class TestProcessing(BaseTest):
  9. @staticmethod
  10. def _make_job_with_scalar(
  11. data_chunk: list,
  12. process_number: int,
  13. process_count: int,
  14. ) -> tuple:
  15. current_pid = os.getpid()
  16. result = sum(data_chunk)
  17. return current_pid, result
  18. @staticmethod
  19. def _make_job_with_object(
  20. data_chunk: list,
  21. process_number: int,
  22. process_count: int,
  23. ) -> tuple:
  24. current_pid = os.getpid()
  25. data = [o.value for o in data_chunk]
  26. result = sum(data)
  27. return current_pid, MyFakeClass(result)
  28. def test_parallel_jobs_with_scalar(self):
  29. chunk_manager = ChunkManager(4)
  30. process_manager = ProcessManager(
  31. process_count=4,
  32. chunk_manager=chunk_manager,
  33. )
  34. data = list(range(100))
  35. process_id_list = []
  36. final_result = 0
  37. results = process_manager.chunk_and_execute_jobs(
  38. data,
  39. job_maker=self._make_job_with_scalar,
  40. )
  41. for process_id, result in results:
  42. final_result += result
  43. process_id_list.append(process_id)
  44. # Test each process ids are differents
  45. assert sorted(process_id_list) == \
  46. sorted(list(set(process_id_list)))
  47. # Goal is 4950
  48. assert final_result == 4950
  49. def test_parallel_jobs_with_objects(self):
  50. chunk_manager = ChunkManager(4)
  51. process_manager = ProcessManager(
  52. process_count=4,
  53. chunk_manager=chunk_manager,
  54. )
  55. data = [MyFakeClass(v) for v in range(100)]
  56. process_id_list = []
  57. final_result = 0
  58. results = process_manager.chunk_and_execute_jobs(
  59. data,
  60. job_maker=self._make_job_with_object,
  61. )
  62. for process_id, result_object in results:
  63. final_result += result_object.value
  64. process_id_list.append(process_id)
  65. # Test each process ids are differents
  66. assert sorted(process_id_list) == \
  67. sorted(list(set(process_id_list)))
  68. # Goal is 4950
  69. assert final_result == 4950