test_processing.py 2.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # coding: utf-8
  2. import os
  3. import psutil
  4. from synergine2.processing import ProcessManager
  5. from synergine2.utils import ChunkManager
  6. from tests import BaseTest
  7. available_cores = len(psutil.Process().cpu_affinity())
  8. class MyFakeClass(object):
  9. def __init__(self, value):
  10. self.value = value
  11. class TestProcessing(BaseTest):
  12. def make_job_with_scalar(
  13. self,
  14. data_chunk: list,
  15. process_number: int,
  16. process_count: int,
  17. ) -> tuple:
  18. current_pid = os.getpid()
  19. result = sum(data_chunk)
  20. return current_pid, result
  21. def make_job_with_object(
  22. self,
  23. data_chunk: list,
  24. process_number: int,
  25. process_count: int,
  26. ) -> tuple:
  27. current_pid = os.getpid()
  28. data = [o.value for o in data_chunk]
  29. result = sum(data)
  30. return current_pid, MyFakeClass(result)
  31. def test_parallel_jobs_with_scalar(self):
  32. chunk_manager = ChunkManager(available_cores)
  33. process_manager = ProcessManager(
  34. process_count=available_cores,
  35. chunk_manager=chunk_manager,
  36. )
  37. data = list(range(100))
  38. process_id_list = []
  39. final_result = 0
  40. results = process_manager.chunk_and_execute_jobs(
  41. data,
  42. job_maker=self.make_job_with_scalar,
  43. )
  44. for process_id, result in results:
  45. final_result += result
  46. process_id_list.append(process_id)
  47. # Goal is 4950
  48. assert final_result == 4950
  49. def test_non_parallel_jobs_with_scalar(self):
  50. chunk_manager = ChunkManager(1)
  51. process_manager = ProcessManager(
  52. process_count=1,
  53. chunk_manager=chunk_manager,
  54. )
  55. data = list(range(100))
  56. results = process_manager.chunk_and_execute_jobs(
  57. data,
  58. job_maker=self.make_job_with_scalar,
  59. )
  60. process_id, final_result = results[0]
  61. assert len(results) == 1
  62. assert process_id == os.getpid()
  63. assert final_result == 4950
  64. def test_parallel_jobs_with_objects(self):
  65. chunk_manager = ChunkManager(available_cores)
  66. process_manager = ProcessManager(
  67. process_count=available_cores,
  68. chunk_manager=chunk_manager,
  69. )
  70. data = [MyFakeClass(v) for v in range(100)]
  71. process_id_list = []
  72. final_result = 0
  73. results = process_manager.chunk_and_execute_jobs(
  74. data,
  75. job_maker=self.make_job_with_object,
  76. )
  77. for process_id, result_object in results:
  78. final_result += result_object.value
  79. process_id_list.append(process_id)
  80. # Goal is 4950
  81. assert final_result == 4950