processing.py 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # coding: utf-8
  2. import random
  3. import typing
  4. from multiprocessing import Process
  5. from multiprocessing.connection import Connection
  6. from multiprocessing.connection import Pipe
  7. from synergine2.base import BaseObject
  8. from synergine2.config import Config
  9. from synergine2.share import SharedDataManager
  10. STOP = '__STOP__'
  11. # global shared manager
  12. shared_data = SharedDataManager()
  13. # TODO: se jobs
  14. class Job(object):
  15. pass
  16. class Worker(object):
  17. def __init__(
  18. self,
  19. config: Config,
  20. real_job: typing.Callable[..., typing.Any],
  21. ) -> None:
  22. self.config = config
  23. local_read_pipe, local_write_pipe = Pipe(duplex=False)
  24. process_read_pipe, process_write_pipe = Pipe(duplex=False)
  25. self.local_read_pipe = local_read_pipe # type: Connection
  26. self.local_write_pipe = local_write_pipe # type: Connection
  27. self.process_read_pipe = process_read_pipe # type: Connection
  28. self.process_write_pipe = process_write_pipe # type: Connection
  29. self.real_job = real_job
  30. self.process = Process(
  31. target=self.work,
  32. args=(
  33. self.local_write_pipe,
  34. self.process_read_pipe,
  35. ),
  36. kwargs={'seed': random.random()},
  37. name='Worker',
  38. )
  39. self.db = None # TODO delete
  40. self.process.start()
  41. def work(self, *args, **kwargs):
  42. seed_value = kwargs.pop('seed')
  43. random.seed(seed_value)
  44. try:
  45. while True:
  46. args = self.process_read_pipe.recv()
  47. if args == STOP:
  48. return
  49. result = self.real_job(*args)
  50. self.local_write_pipe.send(result)
  51. except KeyboardInterrupt:
  52. return
  53. class ProcessManager(BaseObject):
  54. def __init__(
  55. self,
  56. config: Config,
  57. process_count: int,
  58. job: typing.Callable[..., typing.Any],
  59. ) -> None:
  60. self.config = config
  61. self._process_count = process_count
  62. self.workers = []
  63. self.job = job
  64. @property
  65. def process_count(self) -> int:
  66. return self._process_count
  67. def start_workers(self) -> None:
  68. assert not self.workers
  69. for i in range(self._process_count):
  70. self.workers.append(Worker(self.config, self.job))
  71. def make_them_work(self, message: typing.Any) -> 'TODO':
  72. responses = []
  73. for worker_id, worker in enumerate(self.workers):
  74. worker.process_write_pipe.send((worker_id, self._process_count, message))
  75. for worker in self.workers:
  76. responses.append(worker.local_read_pipe.recv())
  77. return responses
  78. def terminate(self) -> None:
  79. for worker in self.workers:
  80. worker.process_write_pipe.send(STOP)
  81. for worker in self.workers:
  82. worker.process.join()