processing.py 1.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # coding: utf-8
  2. import types
  3. from multiprocessing import Pool
  4. from synergine2.base import BaseObject
  5. from synergine2.utils import ChunkManager
  6. class ProcessManager(BaseObject):
  7. def __init__(
  8. self,
  9. process_count: int,
  10. chunk_manager: ChunkManager,
  11. ):
  12. self._process_count = process_count
  13. self._chunk_manager = chunk_manager
  14. self.pool = Pool(processes=self._process_count)
  15. def __getstate__(self):
  16. self_dict = self.__dict__.copy()
  17. self_dict['pool'] = None
  18. return self_dict
  19. def chunk_and_execute_jobs(self, data: list, job_maker: types.FunctionType) -> list:
  20. chunks = self._chunk_manager.make_chunks(data)
  21. if self._process_count > 1:
  22. print('USE POOL')
  23. results = self.pool.starmap(job_maker, [(chunk, i, self._process_count) for i, chunk in enumerate(chunks)])
  24. else:
  25. print('USE MONO')
  26. results = [job_maker(data, 0, 1)]
  27. return results
  28. def execute_jobs(self, data: object, job_maker: types.FunctionType) -> list:
  29. # TODO: Is there a reason to make multiprocessing here ? data is not chunked ...
  30. if self._process_count > 1:
  31. results = self.pool.starmap(job_maker, [(data, i, self._process_count) for i in range(self._process_count)])
  32. else:
  33. results = [job_maker(data, 0, 1)]
  34. return results
  35. def __del__(self):
  36. if self.pool:
  37. self.pool.terminate()