terminals.py 8.2KB


  1. # coding: utf-8
  2. import collections
  3. from copy import copy
  4. from multiprocessing import Queue
  5. from multiprocessing import Process
  6. from queue import Empty
  7. import time
  8. from synergine2.base import BaseObject
  9. from synergine2.config import Config
  10. from synergine2.log import SynergineLogger
  11. from synergine2.simulation import Subject
  12. from synergine2.simulation import Event
  13. STOP_SIGNAL = '__STOP_SIGNAL__'
  14. class TerminalPackage(BaseObject):
  15. def __init__(
  16. self,
  17. subjects: [Subject]=None,
  18. add_subjects: [Subject]=None,
  19. remove_subjects: [Subject]=None,
  20. events: [Event]=None,
  21. simulation_actions: [tuple]=None,
  22. subject_actions: [tuple]=None,
  23. is_cycle: bool=False,
  24. *args,
  25. **kwargs
  26. ):
  27. self.subjects = subjects
  28. self.add_subjects = add_subjects or []
  29. self.remove_subjects = remove_subjects or []
  30. self.events = events or []
  31. self.simulation_actions = simulation_actions or []
  32. self.subject_actions = subject_actions or []
  33. self.is_cycle = is_cycle
  34. def repr_debug(self) -> str:
  35. subjects = self.subjects or []
  36. return str(dict(
  37. subjects=subjects,
  38. add_subjects=[s.id for s in self.add_subjects],
  39. remove_subjects=[s.id for s in self.remove_subjects],
  40. events=[e.repr_debug() for e in self.events],
  41. simulation_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.simulation_actions],
  42. subject_actions=['{}: {}'.format(a.__class__.__name__, p) for a, p in self.subject_actions],
  43. is_cycle=self.is_cycle,
  44. ))
  45. class Terminal(object):
  46. # Default behaviour is to do nothing.
  47. # DEFAULT_SLEEP is sleep time between each queue read
  48. default_sleep = 1
  49. # List of subscribed Event classes. Terminal will not receive events
  50. # who are not instance of listed classes
  51. subscribed_events = [Event]
  52. def __init__(self, asynchronous: bool=True):
  53. self._input_queue = None
  54. self._output_queue = None
  55. self._stop_required = False
  56. self.subjects = {}
  57. self.cycle_events = []
  58. self.event_handlers = collections.defaultdict(list)
  59. self.asynchronous = asynchronous
  60. def accept_event(self, event: Event) -> bool:
  61. for event_class in self.subscribed_events:
  62. if isinstance(event, event_class):
  63. return True
  64. return False
  65. def start(self, input_queue: Queue, output_queue: Queue) -> None:
  66. self._input_queue = input_queue
  67. self._output_queue = output_queue
  68. self.run()
  69. def run(self):
  70. """
  71. Override this method to create your daemon terminal
  72. """
  73. try:
  74. while self.read():
  75. time.sleep(self.default_sleep)
  76. except KeyboardInterrupt:
  77. pass
  78. def read(self):
  79. while True:
  80. try:
  81. package = self._input_queue.get(block=False, timeout=None)
  82. if package == STOP_SIGNAL:
  83. self._stop_required = True
  84. return False
  85. self.receive(package)
  86. except Empty:
  87. return True # Finished to read Queue
  88. def receive(self, package: TerminalPackage):
  89. self.update_with_package(package)
  90. # End of cycle management signal
  91. self.send(TerminalPackage(is_cycle=True))
  92. def send(self, package: TerminalPackage):
  93. self._output_queue.put(package)
  94. def register_event_handler(self, event_class, func):
  95. self.event_handlers[event_class].append(func)
  96. def update_with_package(self, package: TerminalPackage):
  97. if package.subjects:
  98. self.subjects = {s.id: s for s in package.subjects}
  99. for new_subject in package.add_subjects:
  100. self.subjects[new_subject.id] = new_subject
  101. for deleted_subject in package.remove_subjects:
  102. del self.subjects[deleted_subject.id]
  103. self.cycle_events = package.events
  104. self.execute_event_handlers(package.events)
  105. def execute_event_handlers(self, events: [Event]):
  106. for event in events:
  107. for event_class, handlers in self.event_handlers.items():
  108. if isinstance(event, event_class):
  109. for handler in handlers:
  110. handler(event)
  111. class TerminalManager(object):
  112. def __init__(
  113. self,
  114. config: Config,
  115. logger: SynergineLogger,
  116. terminals: [Terminal]
  117. ):
  118. self.config = config
  119. self.logger = logger
  120. self.terminals = terminals
  121. self.outputs_queues = {}
  122. self.inputs_queues = {}
  123. def start(self) -> None:
  124. self.logger.info('Start terminals')
  125. for terminal in self.terminals:
  126. # TODO: logs
  127. output_queue = Queue()
  128. self.outputs_queues[terminal] = output_queue
  129. input_queue = Queue()
  130. self.inputs_queues[terminal] = input_queue
  131. process = Process(target=terminal.start, kwargs=dict(
  132. input_queue=output_queue,
  133. output_queue=input_queue,
  134. ))
  135. process.start()
  136. def stop(self):
  137. for output_queue in self.outputs_queues.values():
  138. output_queue.put(STOP_SIGNAL)
  139. def send(self, package: TerminalPackage):
  140. self.logger.info('Send package to terminals')
  141. if self.logger.is_debug:
  142. self.logger.debug('Send package to terminals: {}'.format(
  143. str(package.repr_debug()),
  144. ))
  145. for terminal, output_queue in self.outputs_queues.items():
  146. self.logger.info('Send package to terminal {}'.format(terminal.__class__.__name__))
  147. # Terminal maybe don't want all events, so reduce list of event
  148. # Thirst make a copy to personalize this package
  149. terminal_adapted_package = copy(package)
  150. # Duplicate events list to personalize it
  151. terminal_adapted_package.events = terminal_adapted_package.events[:]
  152. for package_event in terminal_adapted_package.events[:]:
  153. if not terminal.accept_event(package_event):
  154. terminal_adapted_package.events.remove(package_event)
  155. if self.logger.is_debug:
  156. self.logger.debug('Send package to terminal {}: {}'.format(
  157. terminal.__class__.__name__,
  158. terminal_adapted_package.repr_debug(),
  159. ))
  160. output_queue.put(terminal_adapted_package)
  161. def receive(self) -> [TerminalPackage]:
  162. self.logger.info('Receive terminals packages')
  163. packages = []
  164. for terminal, input_queue in self.inputs_queues.items():
  165. self.logger.info('Receive terminal {} packages ({})'.format(
  166. terminal.__class__.__name__,
  167. 'sync' if not terminal.asynchronous else 'async'
  168. ))
  169. # When terminal is synchronous, wait it's cycle package
  170. if not terminal.asynchronous:
  171. continue_ = True
  172. while continue_:
  173. package = input_queue.get()
  174. # In case where terminal send package before end of cycle
  175. # management
  176. continue_ = not package.is_cycle
  177. if self.logger.is_debug:
  178. self.logger.debug('Receive package from {}: {}'.format(
  179. terminal.__class__.__name__,
  180. str(package.repr_debug()),
  181. ))
  182. packages.append(package)
  183. else:
  184. try:
  185. while True:
  186. package = input_queue.get(block=False, timeout=None)
  187. if self.logger.is_debug:
  188. self.logger.debug('Receive package from {}: {}'.format(
  189. str(terminal),
  190. str(package.repr_debug()),
  191. ))
  192. packages.append(package)
  193. except Empty:
  194. pass # Queue is empty
  195. self.logger.info('{} package(s) received'.format(len(packages)))
  196. return packages