terminals.py 10KB


  1. # coding: utf-8
  2. import collections
  3. import typing
  4. from copy import copy
  5. from multiprocessing import Queue
  6. from multiprocessing import Process
  7. from queue import Empty
  8. import time
  9. from synergine2.base import BaseObject
  10. from synergine2.exceptions import SynergineException
  11. from synergine2.share import shared
  12. from synergine2.config import Config
  13. from synergine2.log import SynergineLogger
  14. from synergine2.simulation import Subject
  15. from synergine2.simulation import Event
  16. if typing.TYPE_CHECKING:
  17. from synergine2.core import Core
  18. STOP_SIGNAL = '__STOP_SIGNAL__'
  19. class TerminalPackage(BaseObject):
  20. """
  21. TODO: Update this class considering shared data across processes
  22. """
  23. def __init__(
  24. self,
  25. subjects: [Subject]=None,
  26. add_subjects: [Subject]=None,
  27. remove_subjects: [Subject]=None,
  28. events: [Event]=None,
  29. simulation_actions: [tuple]=None,
  30. subject_actions: [tuple]=None,
  31. is_cycle: bool=False,
  32. *args,
  33. **kwargs
  34. ):
  35. self.subjects = subjects
  36. self.add_subjects = add_subjects or []
  37. self.remove_subjects = remove_subjects or []
  38. self.events = events or []
  39. self.simulation_actions = simulation_actions or []
  40. self.subject_actions = subject_actions or []
  41. self.is_cycle = is_cycle
  42. def repr_debug(self) -> str:
  43. subjects = self.subjects or []
  44. return str(dict(
  45. subjects=subjects,
  46. add_subjects=[s.id for s in self.add_subjects],
  47. remove_subjects=[s.id for s in self.remove_subjects],
  48. events=[e.repr_debug() for e in self.events],
  49. simulation_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.simulation_actions],
  50. subject_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.subject_actions],
  51. is_cycle=self.is_cycle,
  52. ))
  53. class Terminal(BaseObject):
  54. # Default behaviour is to do nothing.
  55. # DEFAULT_SLEEP is sleep time between each queue read
  56. default_sleep = 1
  57. # List of subscribed Event classes. Terminal will not receive events
  58. # who are not instance of listed classes
  59. subscribed_events = [Event]
  60. # Permit to execute terminal in main process, only one terminal can use this
  61. main_process = False
  62. def __init__(
  63. self,
  64. config: Config,
  65. logger: SynergineLogger,
  66. asynchronous: bool=True,
  67. ):
  68. self.config = config
  69. self.logger = logger
  70. self._input_queue = None
  71. self._output_queue = None
  72. self._stop_required = False
  73. self.subjects = {}
  74. self.cycle_events = []
  75. self.event_handlers = collections.defaultdict(list)
  76. self.asynchronous = asynchronous
  77. self.core_process = None # type: Process
  78. def accept_event(self, event: Event) -> bool:
  79. for event_class in self.subscribed_events:
  80. if isinstance(event, event_class):
  81. return True
  82. return False
  83. def start(self, input_queue: Queue, output_queue: Queue) -> None:
  84. self._input_queue = input_queue
  85. self._output_queue = output_queue
  86. self.run()
  87. def execute_as_main_process(self, core: 'Core') -> None:
  88. """
  89. This method is called when the terminal have to be the main process. It will
  90. create a process with the run of core and make it's job here.
  91. """
  92. output_queue = Queue()
  93. input_queue = Queue()
  94. self.logger.info('Start core in a process')
  95. self.core_process = Process(target=core.run, kwargs=dict(
  96. from_terminal=self,
  97. from_terminal_input_queue=output_queue,
  98. from_terminal_output_queue=input_queue,
  99. ), name='Core')
  100. self.core_process.start()
  101. # Core is started, continue this terminal job
  102. self.logger.info('Core started, continue terminal job')
  103. self.start(input_queue=input_queue, output_queue=output_queue)
  104. def run(self):
  105. """
  106. Override this method to create your daemon terminal
  107. """
  108. try:
  109. while self.read():
  110. time.sleep(self.default_sleep)
  111. except KeyboardInterrupt:
  112. pass
  113. def read(self):
  114. self.logger.debug('Read package from core')
  115. while True:
  116. try:
  117. package = self._input_queue.get(block=False, timeout=None)
  118. if package == STOP_SIGNAL:
  119. self.logger.debug('Stop required')
  120. self._stop_required = True
  121. return False
  122. self.logger.debug('Package received')
  123. self.receive(package)
  124. except Empty:
  125. self.logger.debug('No package')
  126. return True # Finished to read Queue
  127. def receive(self, package: TerminalPackage):
  128. shared.purge_data()
  129. self.update_with_package(package)
  130. # End of cycle management signal
  131. self.send(TerminalPackage(is_cycle=True))
  132. def send(self, package: TerminalPackage):
  133. self.logger.debug('Send package to core')
  134. self._output_queue.put(package)
  135. def register_event_handler(self, event_class, func):
  136. self.event_handlers[event_class].append(func)
  137. def update_with_package(self, package: TerminalPackage):
  138. if package.subjects:
  139. self.subjects = {s.id: s for s in package.subjects}
  140. for new_subject in package.add_subjects:
  141. self.subjects[new_subject.id] = new_subject
  142. for deleted_subject in package.remove_subjects:
  143. del self.subjects[deleted_subject.id]
  144. self.cycle_events = package.events
  145. self.execute_event_handlers(package.events)
  146. def execute_event_handlers(self, events: [Event]):
  147. for event in events:
  148. for event_class, handlers in self.event_handlers.items():
  149. if isinstance(event, event_class):
  150. for handler in handlers:
  151. handler(event)
  152. class TerminalManager(BaseObject):
  153. def __init__(
  154. self,
  155. config: Config,
  156. logger: SynergineLogger,
  157. terminals: [Terminal]
  158. ):
  159. self.config = config
  160. self.logger = logger
  161. self.terminals = terminals
  162. self.outputs_queues = {}
  163. self.inputs_queues = {}
  164. def get_main_process_terminal(self) -> typing.Optional[Terminal]:
  165. main_process_terminals = [t for t in self.terminals if t.main_process]
  166. if main_process_terminals:
  167. if len(main_process_terminals) > 1:
  168. raise SynergineException('There is more one main process terminal !')
  169. return main_process_terminals[0]
  170. return None
  171. def start(self) -> None:
  172. self.logger.info('Start terminals')
  173. # We exclude here terminal who is run from main process
  174. terminals = [t for t in self.terminals if not t.main_process]
  175. for terminal in terminals:
  176. output_queue = Queue()
  177. self.outputs_queues[terminal] = output_queue
  178. input_queue = Queue()
  179. self.inputs_queues[terminal] = input_queue
  180. process = Process(target=terminal.start, kwargs=dict(
  181. input_queue=output_queue,
  182. output_queue=input_queue,
  183. ), name=terminal.__class__.__name__)
  184. process.start()
  185. def stop(self):
  186. for output_queue in self.outputs_queues.values():
  187. output_queue.put(STOP_SIGNAL)
  188. def send(self, package: TerminalPackage):
  189. self.logger.info('Send package to terminals')
  190. if self.logger.is_debug:
  191. self.logger.debug('Send package to terminals: {}'.format(
  192. str(package.repr_debug()),
  193. ))
  194. for terminal, output_queue in self.outputs_queues.items():
  195. self.logger.info('Send package to terminal {}'.format(terminal.__class__.__name__))
  196. # Terminal maybe don't want all events, so reduce list of event
  197. # Thirst make a copy to personalize this package
  198. terminal_adapted_package = copy(package)
  199. # Duplicate events list to personalize it
  200. terminal_adapted_package.events = terminal_adapted_package.events[:]
  201. for package_event in terminal_adapted_package.events[:]:
  202. if not terminal.accept_event(package_event):
  203. terminal_adapted_package.events.remove(package_event)
  204. if self.logger.is_debug:
  205. self.logger.debug('Send package to terminal {}: {}'.format(
  206. terminal.__class__.__name__,
  207. terminal_adapted_package.repr_debug(),
  208. ))
  209. output_queue.put(terminal_adapted_package)
  210. def receive(self) -> [TerminalPackage]:
  211. self.logger.info('Receive terminals packages')
  212. packages = []
  213. for terminal, input_queue in self.inputs_queues.items():
  214. self.logger.info('Receive terminal {} packages ({})'.format(
  215. terminal.__class__.__name__,
  216. 'sync' if not terminal.asynchronous else 'async'
  217. ))
  218. # When terminal is synchronous, wait it's cycle package
  219. if not terminal.asynchronous:
  220. continue_ = True
  221. while continue_:
  222. package = input_queue.get()
  223. # In case where terminal send package before end of cycle
  224. # management
  225. continue_ = not package.is_cycle
  226. if self.logger.is_debug:
  227. self.logger.debug('Receive package from {}: {}'.format(
  228. terminal.__class__.__name__,
  229. str(package.repr_debug()),
  230. ))
  231. packages.append(package)
  232. else:
  233. try:
  234. while True:
  235. package = input_queue.get(block=False, timeout=None)
  236. if self.logger.is_debug:
  237. self.logger.debug('Receive package from {}: {}'.format(
  238. str(terminal),
  239. str(package.repr_debug()),
  240. ))
  241. packages.append(package)
  242. except Empty:
  243. pass # Queue is empty
  244. self.logger.info('{} package(s) received'.format(len(packages)))
  245. return packages