core.py 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # coding: utf-8
  2. import time
  3. from multiprocessing import Queue
  4. from synergine2.base import BaseObject
  5. from synergine2.config import Config
  6. from synergine2.cycle import CycleManager
  7. from synergine2.log import get_logger
  8. from synergine2.simulation import Simulation
  9. from synergine2.terminals import TerminalManager
  10. from synergine2.terminals import Terminal
  11. from synergine2.terminals import TerminalPackage
  12. from synergine2.utils import time_it
  13. class Core(BaseObject):
  14. def __init__(
  15. self,
  16. config: Config,
  17. simulation: Simulation,
  18. cycle_manager: CycleManager,
  19. terminal_manager: TerminalManager=None,
  20. cycles_per_seconds: float=1.0,
  21. ):
  22. self.config = config
  23. self.logger = get_logger('Core', config)
  24. self.simulation = simulation
  25. self.cycle_manager = cycle_manager
  26. self.terminal_manager = terminal_manager or TerminalManager(config, [])
  27. self._loop_delta = 1./cycles_per_seconds
  28. self._current_cycle_start_time = None
  29. self._continue = True
  30. self.main_process_terminal = None # type: Terminal
  31. def run(
  32. self,
  33. from_terminal: Terminal=None,
  34. from_terminal_input_queue: Queue=None,
  35. from_terminal_output_queue: Queue=None,
  36. ):
  37. self.logger.info('Run core')
  38. try:
  39. # Execute terminal in main process if needed
  40. if not from_terminal:
  41. self.main_process_terminal \
  42. = self.terminal_manager.get_main_process_terminal()
  43. if self.main_process_terminal:
  44. self.logger.info(
  45. 'The "{}" terminal have to be the main process'
  46. ', start it now'.format(
  47. self.main_process_terminal.__class__.__name__,
  48. ),
  49. )
  50. self.main_process_terminal.execute_as_main_process(self)
  51. return
  52. else:
  53. # A terminal is main process, so we have to add it's queues to terminal
  54. # manager
  55. self.terminal_manager.inputs_queues[from_terminal] \
  56. = from_terminal_input_queue
  57. self.terminal_manager.outputs_queues[from_terminal] \
  58. = from_terminal_output_queue
  59. self.terminal_manager.start()
  60. start_package = TerminalPackage(
  61. subjects=self.simulation.subjects,
  62. )
  63. self.logger.info('Send start package to terminals')
  64. self.terminal_manager.send(start_package)
  65. while self._continue:
  66. self._start_cycle()
  67. events = []
  68. packages = self.terminal_manager.receive()
  69. for package in packages:
  70. if package.sigterm:
  71. self.logger.info('SIGTERM received from terminal package')
  72. self._continue = False
  73. events.extend(self.cycle_manager.apply_actions(
  74. simulation_actions=package.simulation_actions,
  75. subject_actions=package.subject_actions,
  76. ))
  77. with time_it() as elapsed_time:
  78. events.extend(self.cycle_manager.next())
  79. self.logger.info('Cycle duration: {}s'.format(
  80. elapsed_time.get_final_time(),
  81. ))
  82. cycle_package = TerminalPackage(
  83. events=events,
  84. add_subjects=self.simulation.subjects.adds,
  85. remove_subjects=self.simulation.subjects.removes,
  86. is_cycle=True,
  87. )
  88. self.terminal_manager.send(cycle_package)
  89. # Reinitialize these list for next cycle
  90. self.simulation.subjects.adds = []
  91. self.simulation.subjects.removes = []
  92. self._end_cycle()
  93. except KeyboardInterrupt:
  94. self.logger.info('KeyboardInterrupt: stop the loop')
  95. pass # Just stop while
  96. except Exception as exc:
  97. self.logger.exception('Fatal error during simulation')
  98. self.logger.info('Getting out of loop. Terminating.')
  99. self.terminal_manager.stop()
  100. self.cycle_manager.stop()
  101. self.logger.info('Terminated')
  102. def _start_cycle(self):
  103. time_ = time.time()
  104. self.logger.info('Start cycle at time {}'.format(time_))
  105. self._current_cycle_start_time = time_
  106. def _end_cycle(self) -> None:
  107. """
  108. Make a sleep if cycle duration take less time of wanted (see
  109. cycles_per_seconds constructor parameter)
  110. """
  111. time_ = time.time()
  112. self.logger.info('End of cycle at time {}'.format(time_))
  113. cycle_duration = time_ - self._current_cycle_start_time
  114. sleep_time = self._loop_delta - cycle_duration
  115. self.logger.info('Sleep time is {}'.format(sleep_time))
  116. if sleep_time > 0:
  117. time.sleep(sleep_time)