terminals.py 10KB


  1. # coding: utf-8
  2. import collections
  3. import typing
  4. from copy import copy
  5. from multiprocessing import Queue
  6. from multiprocessing import Process
  7. from queue import Empty
  8. import time
  9. from synergine2.base import BaseObject
  10. from synergine2.exceptions import SynergineException
  11. from synergine2.share import shared
  12. from synergine2.config import Config
  13. from synergine2.log import get_logger
  14. from synergine2.simulation import Subject
  15. from synergine2.simulation import Event
  16. if typing.TYPE_CHECKING:
  17. from synergine2.core import Core
  18. STOP_SIGNAL = '__STOP_SIGNAL__'
  19. class TerminalPackage(BaseObject):
  20. """
  21. TODO: Update this class considering shared data across processes
  22. """
  23. def __init__(
  24. self,
  25. subjects: [Subject]=None,
  26. add_subjects: [Subject]=None,
  27. remove_subjects: [Subject]=None,
  28. events: [Event]=None,
  29. simulation_actions: [tuple]=None,
  30. subject_actions: [tuple]=None,
  31. is_cycle: bool=False,
  32. *args,
  33. **kwargs
  34. ):
  35. self.subjects = subjects
  36. self.add_subjects = add_subjects or []
  37. self.remove_subjects = remove_subjects or []
  38. self.events = events or []
  39. self.simulation_actions = simulation_actions or []
  40. self.subject_actions = subject_actions or []
  41. self.is_cycle = is_cycle
  42. def repr_debug(self) -> str:
  43. subjects = self.subjects or []
  44. return str(dict(
  45. subjects=subjects,
  46. add_subjects=[s.id for s in self.add_subjects],
  47. remove_subjects=[s.id for s in self.remove_subjects],
  48. events=[e.repr_debug() for e in self.events],
  49. simulation_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.simulation_actions],
  50. subject_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.subject_actions],
  51. is_cycle=self.is_cycle,
  52. ))
  53. class Terminal(BaseObject):
  54. # Default behaviour is to do nothing.
  55. # DEFAULT_SLEEP is sleep time between each queue read
  56. default_sleep = 1
  57. # List of subscribed Event classes. Terminal will not receive events
  58. # who are not instance of listed classes
  59. subscribed_events = [Event]
  60. # Permit to execute terminal in main process, only one terminal can use this
  61. main_process = False
  62. def __init__(
  63. self,
  64. config: Config,
  65. asynchronous: bool=True,
  66. ):
  67. self.config = config
  68. self.logger = get_logger(self.__class__.__name__, config)
  69. self._input_queue = None
  70. self._output_queue = None
  71. self._stop_required = False
  72. self.subjects = {}
  73. self.cycle_events = []
  74. self.event_handlers = collections.defaultdict(list)
  75. self.asynchronous = asynchronous
  76. self.core_process = None # type: Process
  77. def accept_event(self, event: Event) -> bool:
  78. for event_class in self.subscribed_events:
  79. if isinstance(event, event_class):
  80. return True
  81. return False
  82. def start(self, input_queue: Queue, output_queue: Queue) -> None:
  83. self._input_queue = input_queue
  84. self._output_queue = output_queue
  85. self.run()
  86. def execute_as_main_process(self, core: 'Core') -> None:
  87. """
  88. This method is called when the terminal have to be the main process. It will
  89. create a process with the run of core and make it's job here.
  90. """
  91. output_queue = Queue()
  92. input_queue = Queue()
  93. self.logger.info('Start core in a process')
  94. self.core_process = Process(target=core.run, kwargs=dict(
  95. from_terminal=self,
  96. from_terminal_input_queue=output_queue,
  97. from_terminal_output_queue=input_queue,
  98. ), name='Core')
  99. self.core_process.start()
  100. # Core is started, continue this terminal job
  101. self.logger.info('Core started, continue terminal job')
  102. self.start(input_queue=input_queue, output_queue=output_queue)
  103. def run(self):
  104. """
  105. Override this method to create your daemon terminal
  106. """
  107. try:
  108. while self.read():
  109. time.sleep(self.default_sleep)
  110. except KeyboardInterrupt:
  111. pass
  112. def read(self):
  113. self.logger.debug('Read package from core')
  114. while True:
  115. try:
  116. package = self._input_queue.get(block=False, timeout=None)
  117. if package == STOP_SIGNAL:
  118. self.logger.debug('Stop required')
  119. self._stop_required = True
  120. return False
  121. self.logger.debug('Package received')
  122. self.receive(package)
  123. except Empty:
  124. self.logger.debug('No package')
  125. return True # Finished to read Queue
  126. def receive(self, package: TerminalPackage):
  127. shared.purge_data()
  128. self.update_with_package(package)
  129. # End of cycle management signal
  130. self.send(TerminalPackage(is_cycle=True))
  131. def send(self, package: TerminalPackage):
  132. self.logger.debug('Send package to core')
  133. self._output_queue.put(package)
  134. def register_event_handler(self, event_class, func):
  135. self.event_handlers[event_class].append(func)
  136. def update_with_package(self, package: TerminalPackage):
  137. if package.subjects:
  138. self.subjects = {s.id: s for s in package.subjects}
  139. for new_subject in package.add_subjects:
  140. self.subjects[new_subject.id] = new_subject
  141. for deleted_subject in package.remove_subjects:
  142. del self.subjects[deleted_subject.id]
  143. self.cycle_events = package.events
  144. self.execute_event_handlers(package.events)
  145. def execute_event_handlers(self, events: [Event]):
  146. for event in events:
  147. for event_class, handlers in self.event_handlers.items():
  148. if isinstance(event, event_class):
  149. for handler in handlers:
  150. handler(event)
  151. class TerminalManager(BaseObject):
  152. def __init__(
  153. self,
  154. config: Config,
  155. terminals: [Terminal]
  156. ):
  157. self.config = config
  158. self.logger = get_logger('TerminalManager', config)
  159. self.terminals = terminals
  160. self.outputs_queues = {}
  161. self.inputs_queues = {}
  162. def get_main_process_terminal(self) -> typing.Optional[Terminal]:
  163. main_process_terminals = [t for t in self.terminals if t.main_process]
  164. if main_process_terminals:
  165. if len(main_process_terminals) > 1:
  166. raise SynergineException('There is more one main process terminal !')
  167. return main_process_terminals[0]
  168. return None
  169. def start(self) -> None:
  170. self.logger.info('Start terminals')
  171. # We exclude here terminal who is run from main process
  172. terminals = [t for t in self.terminals if not t.main_process]
  173. for terminal in terminals:
  174. output_queue = Queue()
  175. self.outputs_queues[terminal] = output_queue
  176. input_queue = Queue()
  177. self.inputs_queues[terminal] = input_queue
  178. process = Process(target=terminal.start, kwargs=dict(
  179. input_queue=output_queue,
  180. output_queue=input_queue,
  181. ), name=terminal.__class__.__name__)
  182. process.start()
  183. def stop(self):
  184. for output_queue in self.outputs_queues.values():
  185. output_queue.put(STOP_SIGNAL)
  186. def send(self, package: TerminalPackage):
  187. self.logger.info('Send package to terminals')
  188. if self.logger.is_debug:
  189. self.logger.debug('Send package to terminals: {}'.format(
  190. str(package.repr_debug()),
  191. ))
  192. for terminal, output_queue in self.outputs_queues.items():
  193. self.logger.info('Send package to terminal {}'.format(terminal.__class__.__name__))
  194. # Terminal maybe don't want all events, so reduce list of event
  195. # Thirst make a copy to personalize this package
  196. terminal_adapted_package = copy(package)
  197. # Duplicate events list to personalize it
  198. terminal_adapted_package.events = terminal_adapted_package.events[:]
  199. for package_event in terminal_adapted_package.events[:]:
  200. if not terminal.accept_event(package_event):
  201. terminal_adapted_package.events.remove(package_event)
  202. if self.logger.is_debug:
  203. self.logger.debug('Send package to terminal {}: {}'.format(
  204. terminal.__class__.__name__,
  205. terminal_adapted_package.repr_debug(),
  206. ))
  207. output_queue.put(terminal_adapted_package)
  208. def receive(self) -> [TerminalPackage]:
  209. self.logger.info('Receive terminals packages')
  210. packages = []
  211. for terminal, input_queue in self.inputs_queues.items():
  212. self.logger.info('Receive terminal {} packages ({})'.format(
  213. terminal.__class__.__name__,
  214. 'sync' if not terminal.asynchronous else 'async'
  215. ))
  216. # When terminal is synchronous, wait it's cycle package
  217. if not terminal.asynchronous:
  218. continue_ = True
  219. while continue_:
  220. package = input_queue.get()
  221. # In case where terminal send package before end of cycle
  222. # management
  223. continue_ = not package.is_cycle
  224. if self.logger.is_debug:
  225. self.logger.debug('Receive package from {}: {}'.format(
  226. terminal.__class__.__name__,
  227. str(package.repr_debug()),
  228. ))
  229. packages.append(package)
  230. else:
  231. try:
  232. while True:
  233. package = input_queue.get(block=False, timeout=None)
  234. if self.logger.is_debug:
  235. self.logger.debug('Receive package from {}: {}'.format(
  236. str(terminal),
  237. str(package.repr_debug()),
  238. ))
  239. packages.append(package)
  240. except Empty:
  241. pass # Queue is empty
  242. self.logger.info('{} package(s) received'.format(len(packages)))
  243. return packages