test_processing.py 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # coding: utf-8
  2. import os
  3. import multiprocessing
  4. import pytest
  5. from synergine2.processing import ProcessManager
  6. from synergine2.utils import ChunkManager
  7. from tests import BaseTest
  8. class MyFakeClass(object):
  9. def __init__(self, value):
  10. self.value = value
  11. class TestProcessing(BaseTest):
  12. def make_job_with_scalar(
  13. self,
  14. data_chunk: list,
  15. process_number: int,
  16. process_count: int,
  17. ) -> tuple:
  18. current_pid = os.getpid()
  19. result = sum(data_chunk)
  20. return current_pid, result
  21. def make_job_with_object(
  22. self,
  23. data_chunk: list,
  24. process_number: int,
  25. process_count: int,
  26. ) -> tuple:
  27. current_pid = os.getpid()
  28. data = [o.value for o in data_chunk]
  29. result = sum(data)
  30. return current_pid, MyFakeClass(result)
  31. @pytest.mark.skipif(multiprocessing.cpu_count() < 2, reason="requires 2 or more cpus")
  32. def test_parallel_jobs_with_scalar(self):
  33. chunk_manager = ChunkManager(2)
  34. process_manager = ProcessManager(
  35. process_count=2,
  36. chunk_manager=chunk_manager,
  37. )
  38. data = list(range(100))
  39. process_id_list = []
  40. final_result = 0
  41. results = process_manager.chunk_and_execute_jobs(
  42. data,
  43. job_maker=self.make_job_with_scalar,
  44. )
  45. for process_id, result in results:
  46. final_result += result
  47. process_id_list.append(process_id)
  48. # Test each process ids are differents
  49. assert sorted(process_id_list) == \
  50. sorted(list(set(process_id_list)))
  51. # Goal is 4950
  52. assert final_result == 4950
  53. def test_non_parallel_jobs_with_scalar(self):
  54. chunk_manager = ChunkManager(1)
  55. process_manager = ProcessManager(
  56. process_count=1,
  57. chunk_manager=chunk_manager,
  58. )
  59. data = list(range(100))
  60. results = process_manager.chunk_and_execute_jobs(
  61. data,
  62. job_maker=self.make_job_with_scalar,
  63. )
  64. process_id, final_result = results[0]
  65. assert len(results) == 1
  66. assert process_id == os.getpid()
  67. assert final_result == 4950
  68. def test_parallel_jobs_with_objects(self):
  69. chunk_manager = ChunkManager(4)
  70. process_manager = ProcessManager(
  71. process_count=4,
  72. chunk_manager=chunk_manager,
  73. )
  74. data = [MyFakeClass(v) for v in range(100)]
  75. process_id_list = []
  76. final_result = 0
  77. results = process_manager.chunk_and_execute_jobs(
  78. data,
  79. job_maker=self.make_job_with_object,
  80. )
  81. for process_id, result_object in results:
  82. final_result += result_object.value
  83. process_id_list.append(process_id)
  84. # Test each process ids are differents
  85. assert sorted(process_id_list) == \
  86. sorted(list(set(process_id_list)))
  87. # Goal is 4950
  88. assert final_result == 4950