terminals.py 8.3KB


  1. # coding: utf-8
  2. import collections
  3. from copy import copy
  4. from multiprocessing import Queue
  5. from multiprocessing import Process
  6. from queue import Empty
  7. import time
  8. from synergine2.base import BaseObject
  9. from synergine2.config import Config
  10. from synergine2.log import SynergineLogger
  11. from synergine2.simulation import Subject
  12. from synergine2.simulation import Event
  13. STOP_SIGNAL = '__STOP_SIGNAL__'
  14. class TerminalPackage(BaseObject):
  15. def __init__(
  16. self,
  17. subjects: [Subject]=None,
  18. add_subjects: [Subject]=None,
  19. remove_subjects: [Subject]=None,
  20. events: [Event]=None,
  21. simulation_actions: [tuple]=None,
  22. subject_actions: [tuple]=None,
  23. is_cycle: bool=False,
  24. *args,
  25. **kwargs
  26. ):
  27. self.subjects = subjects
  28. self.add_subjects = add_subjects or []
  29. self.remove_subjects = remove_subjects or []
  30. self.events = events or []
  31. self.simulation_actions = simulation_actions or []
  32. self.subject_actions = subject_actions or []
  33. self.is_cycle = is_cycle
  34. def repr_debug(self) -> str:
  35. subjects = self.subjects or []
  36. return str(dict(
  37. subjects=subjects,
  38. add_subjects=[s.id for s in self.add_subjects],
  39. remove_subjects=[s.id for s in self.remove_subjects],
  40. events=[e.repr_debug() for e in self.events],
  41. simulation_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.simulation_actions],
  42. subject_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.subject_actions],
  43. is_cycle=self.is_cycle,
  44. ))
  45. class Terminal(BaseObject):
  46. # Default behaviour is to do nothing.
  47. # DEFAULT_SLEEP is sleep time between each queue read
  48. default_sleep = 1
  49. # List of subscribed Event classes. Terminal will not receive events
  50. # who are not instance of listed classes
  51. subscribed_events = [Event]
  52. def __init__(
  53. self,
  54. config: Config,
  55. logger: SynergineLogger,
  56. asynchronous: bool=True,
  57. ):
  58. self.config = config
  59. self.logger = logger
  60. self._input_queue = None
  61. self._output_queue = None
  62. self._stop_required = False
  63. self.subjects = {}
  64. self.cycle_events = []
  65. self.event_handlers = collections.defaultdict(list)
  66. self.asynchronous = asynchronous
  67. def accept_event(self, event: Event) -> bool:
  68. for event_class in self.subscribed_events:
  69. if isinstance(event, event_class):
  70. return True
  71. return False
  72. def start(self, input_queue: Queue, output_queue: Queue) -> None:
  73. self._input_queue = input_queue
  74. self._output_queue = output_queue
  75. self.run()
  76. def run(self):
  77. """
  78. Override this method to create your daemon terminal
  79. """
  80. try:
  81. while self.read():
  82. time.sleep(self.default_sleep)
  83. except KeyboardInterrupt:
  84. pass
  85. def read(self):
  86. while True:
  87. try:
  88. package = self._input_queue.get(block=False, timeout=None)
  89. if package == STOP_SIGNAL:
  90. self._stop_required = True
  91. return False
  92. self.receive(package)
  93. except Empty:
  94. return True # Finished to read Queue
  95. def receive(self, package: TerminalPackage):
  96. self.update_with_package(package)
  97. # End of cycle management signal
  98. self.send(TerminalPackage(is_cycle=True))
  99. def send(self, package: TerminalPackage):
  100. self._output_queue.put(package)
  101. def register_event_handler(self, event_class, func):
  102. self.event_handlers[event_class].append(func)
  103. def update_with_package(self, package: TerminalPackage):
  104. if package.subjects:
  105. self.subjects = {s.id: s for s in package.subjects}
  106. for new_subject in package.add_subjects:
  107. self.subjects[new_subject.id] = new_subject
  108. for deleted_subject in package.remove_subjects:
  109. del self.subjects[deleted_subject.id]
  110. self.cycle_events = package.events
  111. self.execute_event_handlers(package.events)
  112. def execute_event_handlers(self, events: [Event]):
  113. for event in events:
  114. for event_class, handlers in self.event_handlers.items():
  115. if isinstance(event, event_class):
  116. for handler in handlers:
  117. handler(event)
  118. class TerminalManager(BaseObject):
  119. def __init__(
  120. self,
  121. config: Config,
  122. logger: SynergineLogger,
  123. terminals: [Terminal]
  124. ):
  125. self.config = config
  126. self.logger = logger
  127. self.terminals = terminals
  128. self.outputs_queues = {}
  129. self.inputs_queues = {}
  130. def start(self) -> None:
  131. self.logger.info('Start terminals')
  132. for terminal in self.terminals:
  133. # TODO: logs
  134. output_queue = Queue()
  135. self.outputs_queues[terminal] = output_queue
  136. input_queue = Queue()
  137. self.inputs_queues[terminal] = input_queue
  138. process = Process(target=terminal.start, kwargs=dict(
  139. input_queue=output_queue,
  140. output_queue=input_queue,
  141. ))
  142. process.start()
  143. def stop(self):
  144. for output_queue in self.outputs_queues.values():
  145. output_queue.put(STOP_SIGNAL)
  146. def send(self, package: TerminalPackage):
  147. self.logger.info('Send package to terminals')
  148. if self.logger.is_debug:
  149. self.logger.debug('Send package to terminals: {}'.format(
  150. str(package.repr_debug()),
  151. ))
  152. for terminal, output_queue in self.outputs_queues.items():
  153. self.logger.info('Send package to terminal {}'.format(terminal.__class__.__name__))
  154. # Terminal maybe don't want all events, so reduce list of event
  155. # Thirst make a copy to personalize this package
  156. terminal_adapted_package = copy(package)
  157. # Duplicate events list to personalize it
  158. terminal_adapted_package.events = terminal_adapted_package.events[:]
  159. for package_event in terminal_adapted_package.events[:]:
  160. if not terminal.accept_event(package_event):
  161. terminal_adapted_package.events.remove(package_event)
  162. if self.logger.is_debug:
  163. self.logger.debug('Send package to terminal {}: {}'.format(
  164. terminal.__class__.__name__,
  165. terminal_adapted_package.repr_debug(),
  166. ))
  167. output_queue.put(terminal_adapted_package)
  168. def receive(self) -> [TerminalPackage]:
  169. self.logger.info('Receive terminals packages')
  170. packages = []
  171. for terminal, input_queue in self.inputs_queues.items():
  172. self.logger.info('Receive terminal {} packages ({})'.format(
  173. terminal.__class__.__name__,
  174. 'sync' if not terminal.asynchronous else 'async'
  175. ))
  176. # When terminal is synchronous, wait it's cycle package
  177. if not terminal.asynchronous:
  178. continue_ = True
  179. while continue_:
  180. package = input_queue.get()
  181. # In case where terminal send package before end of cycle
  182. # management
  183. continue_ = not package.is_cycle
  184. if self.logger.is_debug:
  185. self.logger.debug('Receive package from {}: {}'.format(
  186. terminal.__class__.__name__,
  187. str(package.repr_debug()),
  188. ))
  189. packages.append(package)
  190. else:
  191. try:
  192. while True:
  193. package = input_queue.get(block=False, timeout=None)
  194. if self.logger.is_debug:
  195. self.logger.debug('Receive package from {}: {}'.format(
  196. str(terminal),
  197. str(package.repr_debug()),
  198. ))
  199. packages.append(package)
  200. except Empty:
  201. pass # Queue is empty
  202. self.logger.info('{} package(s) received'.format(len(packages)))
  203. return packages