terminals.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. # coding: utf-8
  2. import collections
  3. import typing
  4. from copy import copy
  5. from multiprocessing import Queue
  6. from multiprocessing import Process
  7. from queue import Empty
  8. import time
  9. from synergine2.base import BaseObject
  10. from synergine2.exceptions import SynergineException
  11. from synergine2.share import shared
  12. from synergine2.config import Config
  13. from synergine2.log import get_logger
  14. from synergine2.simulation import Subject
  15. from synergine2.simulation import Event
  16. if typing.TYPE_CHECKING:
  17. from synergine2.core import Core
  18. STOP_SIGNAL = '__STOP_SIGNAL__'
  19. class TerminalPackage(BaseObject):
  20. """
  21. TODO: Update this class considering shared data across processes
  22. """
  23. def __init__(
  24. self,
  25. subjects: [Subject]=None,
  26. add_subjects: [Subject]=None,
  27. remove_subjects: [Subject]=None,
  28. events: [Event]=None,
  29. simulation_actions: [tuple]=None,
  30. subject_actions: [tuple]=None,
  31. is_cycle: bool=False,
  32. sigterm: bool=False,
  33. *args,
  34. **kwargs
  35. ):
  36. self.subjects = subjects
  37. self.add_subjects = add_subjects or []
  38. self.remove_subjects = remove_subjects or []
  39. self.events = events or []
  40. self.simulation_actions = simulation_actions or []
  41. self.subject_actions = subject_actions or []
  42. self.is_cycle = is_cycle
  43. self.sigterm = sigterm
  44. def repr_debug(self) -> str:
  45. subjects = self.subjects or []
  46. return str(dict(
  47. subjects=subjects,
  48. add_subjects=[s.id for s in self.add_subjects],
  49. remove_subjects=[s.id for s in self.remove_subjects],
  50. events=[e.repr_debug() for e in self.events],
  51. simulation_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.simulation_actions],
  52. subject_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.subject_actions],
  53. is_cycle=self.is_cycle,
  54. sigterm=self.sigterm,
  55. ))
  56. class Terminal(BaseObject):
  57. # Default behaviour is to do nothing.
  58. # DEFAULT_SLEEP is sleep time between each queue read
  59. default_sleep = 1
  60. # List of subscribed Event classes. Terminal will not receive events
  61. # who are not instance of listed classes
  62. subscribed_events = [Event]
  63. # Permit to execute terminal in main process, only one terminal can use this
  64. main_process = False
  65. def __init__(
  66. self,
  67. config: Config,
  68. asynchronous: bool=True,
  69. ):
  70. self.config = config
  71. self.logger = get_logger(self.__class__.__name__, config)
  72. self._input_queue = None
  73. self._output_queue = None
  74. self._stop_required = False
  75. self.subjects = {}
  76. self.cycle_events = []
  77. self.event_handlers = collections.defaultdict(list)
  78. self.asynchronous = asynchronous
  79. self.core_process = None # type: Process
  80. def accept_event(self, event: Event) -> bool:
  81. for event_class in self.subscribed_events:
  82. if isinstance(event, event_class):
  83. return True
  84. return False
  85. def start(self, input_queue: Queue, output_queue: Queue) -> None:
  86. self._input_queue = input_queue
  87. self._output_queue = output_queue
  88. self.run()
  89. def execute_as_main_process(self, core: 'Core') -> None:
  90. """
  91. This method is called when the terminal have to be the main process. It will
  92. create a process with the run of core and make it's job here.
  93. """
  94. output_queue = Queue()
  95. input_queue = Queue()
  96. self.logger.info('Start core in a process')
  97. self.core_process = Process(target=core.run, kwargs=dict(
  98. from_terminal=self,
  99. from_terminal_input_queue=output_queue,
  100. from_terminal_output_queue=input_queue,
  101. ), name='Core')
  102. self.core_process.start()
  103. # Core is started, continue this terminal job
  104. self.logger.info('Core started, continue terminal job')
  105. self.start(input_queue=input_queue, output_queue=output_queue)
  106. def run(self):
  107. """
  108. Override this method to create your daemon terminal
  109. """
  110. try:
  111. while self.read():
  112. time.sleep(self.default_sleep)
  113. except KeyboardInterrupt:
  114. pass
  115. def read(self):
  116. self.logger.debug('Read package from core')
  117. while True:
  118. try:
  119. package = self._input_queue.get(block=False, timeout=None)
  120. if package == STOP_SIGNAL:
  121. self.logger.debug('Stop required')
  122. self._stop_required = True
  123. return False
  124. self.logger.debug('Package received')
  125. self.receive(package)
  126. except Empty:
  127. self.logger.debug('No package')
  128. return True # Finished to read Queue
  129. def receive(self, package: TerminalPackage):
  130. shared.purge_data()
  131. self.update_with_package(package)
  132. # End of cycle management signal
  133. self.send(TerminalPackage(is_cycle=True))
  134. def send(self, package: TerminalPackage):
  135. self.logger.debug('Send package to core')
  136. self._output_queue.put(package)
  137. def register_event_handler(self, event_class, func):
  138. self.event_handlers[event_class].append(func)
  139. def update_with_package(self, package: TerminalPackage):
  140. if package.subjects:
  141. self.subjects = {s.id: s for s in package.subjects}
  142. for new_subject in package.add_subjects:
  143. self.subjects[new_subject.id] = new_subject
  144. for deleted_subject in package.remove_subjects:
  145. del self.subjects[deleted_subject.id]
  146. self.cycle_events = package.events
  147. self.execute_event_handlers(package.events)
  148. def execute_event_handlers(self, events: [Event]):
  149. for event in events:
  150. self.logger.debug(
  151. 'Event "{}" received with data: {}'.format(
  152. event.__class__.__name__,
  153. event.repr_debug()
  154. ),
  155. )
  156. for event_class, handlers in self.event_handlers.items():
  157. if isinstance(event, event_class):
  158. for handler in handlers:
  159. handler(event)
  160. class TerminalManager(BaseObject):
  161. def __init__(
  162. self,
  163. config: Config,
  164. terminals: [Terminal]
  165. ):
  166. self.config = config
  167. self.logger = get_logger('TerminalManager', config)
  168. self.terminals = terminals
  169. self.outputs_queues = {}
  170. self.inputs_queues = {}
  171. def get_main_process_terminal(self) -> typing.Optional[Terminal]:
  172. main_process_terminals = [t for t in self.terminals if t.main_process]
  173. if main_process_terminals:
  174. if len(main_process_terminals) > 1:
  175. raise SynergineException('There is more one main process terminal !')
  176. return main_process_terminals[0]
  177. return None
  178. def start(self) -> None:
  179. self.logger.info('Start terminals')
  180. # We exclude here terminal who is run from main process
  181. terminals = [t for t in self.terminals if not t.main_process]
  182. for terminal in terminals:
  183. output_queue = Queue()
  184. self.outputs_queues[terminal] = output_queue
  185. input_queue = Queue()
  186. self.inputs_queues[terminal] = input_queue
  187. process = Process(target=terminal.start, kwargs=dict(
  188. input_queue=output_queue,
  189. output_queue=input_queue,
  190. ), name=terminal.__class__.__name__)
  191. process.start()
  192. def stop(self):
  193. for output_queue in self.outputs_queues.values():
  194. output_queue.put(STOP_SIGNAL)
  195. def send(self, package: TerminalPackage):
  196. self.logger.info('Send package to terminals')
  197. if self.logger.is_debug:
  198. self.logger.debug('Send package to terminals: {}'.format(
  199. str(package.repr_debug()),
  200. ))
  201. for terminal, output_queue in self.outputs_queues.items():
  202. self.logger.info('Send package to terminal {}'.format(terminal.__class__.__name__))
  203. # Terminal maybe don't want all events, so reduce list of event
  204. # Thirst make a copy to personalize this package
  205. terminal_adapted_package = copy(package)
  206. # Duplicate events list to personalize it
  207. terminal_adapted_package.events = terminal_adapted_package.events[:]
  208. for package_event in terminal_adapted_package.events[:]:
  209. if not terminal.accept_event(package_event):
  210. terminal_adapted_package.events.remove(package_event)
  211. if self.logger.is_debug:
  212. self.logger.debug('Send package to terminal {}: {}'.format(
  213. terminal.__class__.__name__,
  214. terminal_adapted_package.repr_debug(),
  215. ))
  216. output_queue.put(terminal_adapted_package)
  217. def receive(self) -> [TerminalPackage]:
  218. self.logger.info('Receive terminals packages')
  219. packages = []
  220. for terminal, input_queue in self.inputs_queues.items():
  221. self.logger.info('Receive terminal {} packages ({})'.format(
  222. terminal.__class__.__name__,
  223. 'sync' if not terminal.asynchronous else 'async'
  224. ))
  225. # When terminal is synchronous, wait it's cycle package
  226. if not terminal.asynchronous:
  227. continue_ = True
  228. while continue_:
  229. package = input_queue.get()
  230. # In case where terminal send package before end of cycle
  231. # management
  232. continue_ = not package.is_cycle
  233. if self.logger.is_debug:
  234. self.logger.debug('Receive package from {}: {}'.format(
  235. terminal.__class__.__name__,
  236. str(package.repr_debug()),
  237. ))
  238. packages.append(package)
  239. if package.sigterm:
  240. return packages
  241. else:
  242. try:
  243. while True:
  244. package = input_queue.get(block=False, timeout=None)
  245. if self.logger.is_debug:
  246. self.logger.debug('Receive package from {}: {}'.format(
  247. str(terminal),
  248. str(package.repr_debug()),
  249. ))
  250. packages.append(package)
  251. except Empty:
  252. pass # Queue is empty
  253. self.logger.info('{} package(s) received'.format(len(packages)))
  254. return packages