123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- # coding: utf-8
- import random
- import typing
- from multiprocessing import Process
- from multiprocessing.connection import Connection
- from multiprocessing.connection import Pipe
-
- from synergine2.base import BaseObject
- from synergine2.config import Config
- from synergine2.share import SharedDataManager
-
- STOP = '__STOP__'
-
- # global shared manager
- shared_data = SharedDataManager()
-
-
- # TODO: se jobs
- class Job(object):
- pass
-
-
- class Worker(object):
- def __init__(
- self,
- config: Config,
- real_job: typing.Callable[..., typing.Any],
- ) -> None:
- self.config = config
-
- local_read_pipe, local_write_pipe = Pipe(duplex=False)
- process_read_pipe, process_write_pipe = Pipe(duplex=False)
-
- self.local_read_pipe = local_read_pipe # type: Connection
- self.local_write_pipe = local_write_pipe # type: Connection
- self.process_read_pipe = process_read_pipe # type: Connection
- self.process_write_pipe = process_write_pipe # type: Connection
-
- self.real_job = real_job
- self.process = Process(
- target=self.work,
- args=(
- self.local_write_pipe,
- self.process_read_pipe,
- ),
- kwargs={'seed': random.random()},
- name='Worker',
- )
- self.db = None # TODO delete
- self.process.start()
-
- def work(self, *args, **kwargs):
- seed_value = kwargs.pop('seed')
- random.seed(seed_value)
- try:
- while True:
- args = self.process_read_pipe.recv()
- if args == STOP:
- return
-
- result = self.real_job(*args)
- self.local_write_pipe.send(result)
- except KeyboardInterrupt:
- return
-
-
- class ProcessManager(BaseObject):
- def __init__(
- self,
- config: Config,
- process_count: int,
- job: typing.Callable[..., typing.Any],
- ) -> None:
- self.config = config
- self._process_count = process_count
- self.workers = []
- self.job = job
-
- @property
- def process_count(self) -> int:
- return self._process_count
-
- def start_workers(self) -> None:
- assert not self.workers
- for i in range(self._process_count):
- self.workers.append(Worker(self.config, self.job))
-
- def make_them_work(self, message: typing.Any) -> 'TODO':
- responses = []
-
- for worker_id, worker in enumerate(self.workers):
- worker.process_write_pipe.send((worker_id, self._process_count, message))
-
- for worker in self.workers:
- responses.append(worker.local_read_pipe.recv())
-
- return responses
-
- def terminate(self) -> None:
- for worker in self.workers:
- worker.process_write_pipe.send(STOP)
-
- for worker in self.workers:
- worker.process.join()
|