123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # coding: utf-8
- import time
-
- from multiprocessing import Queue
-
- from synergine2.base import BaseObject
- from synergine2.config import Config
- from synergine2.cycle import CycleManager
- from synergine2.log import get_logger
- from synergine2.simulation import Simulation
- from synergine2.terminals import TerminalManager
- from synergine2.terminals import Terminal
- from synergine2.terminals import TerminalPackage
- from synergine2.utils import time_it
-
-
- class Core(BaseObject):
- def __init__(
- self,
- config: Config,
- simulation: Simulation,
- cycle_manager: CycleManager,
- terminal_manager: TerminalManager=None,
- cycles_per_seconds: float=1.0,
- ):
- self.config = config
- self.logger = get_logger('Core', config)
- self.simulation = simulation
- self.cycle_manager = cycle_manager
- self.terminal_manager = terminal_manager or TerminalManager(config, [])
- self._loop_delta = 1./cycles_per_seconds
- self._current_cycle_start_time = None
- self._continue = True
- self.main_process_terminal = None # type: Terminal
-
- def run(
- self,
- from_terminal: Terminal=None,
- from_terminal_input_queue: Queue=None,
- from_terminal_output_queue: Queue=None,
- ):
- self.logger.info('Run core')
- try:
- # Execute terminal in main process if needed
- if not from_terminal:
- self.main_process_terminal \
- = self.terminal_manager.get_main_process_terminal()
- if self.main_process_terminal:
- self.logger.info(
- 'The "{}" terminal have to be the main process'
- ', start it now'.format(
- self.main_process_terminal.__class__.__name__,
- ),
- )
- self.main_process_terminal.execute_as_main_process(self)
- return
- else:
- # A terminal is main process, so we have to add it's queues to terminal
- # manager
- self.terminal_manager.inputs_queues[from_terminal] \
- = from_terminal_input_queue
- self.terminal_manager.outputs_queues[from_terminal] \
- = from_terminal_output_queue
-
- self.terminal_manager.start()
-
- start_package = TerminalPackage(
- subjects=self.simulation.subjects,
- )
- self.logger.info('Send start package to terminals')
- self.terminal_manager.send(start_package)
-
- while self._continue:
- self._start_cycle()
-
- events = []
- packages = self.terminal_manager.receive()
- for package in packages:
- if package.sigterm:
- self.logger.info('SIGTERM received from terminal package')
- self._continue = False
-
- events.extend(self.cycle_manager.apply_actions(
- simulation_actions=package.simulation_actions,
- subject_actions=package.subject_actions,
- ))
-
- with time_it() as elapsed_time:
- events.extend(self.cycle_manager.next())
-
- self.logger.info('Cycle duration: {}s'.format(
- elapsed_time.get_final_time(),
- ))
-
- cycle_package = TerminalPackage(
- events=events,
- add_subjects=self.simulation.subjects.adds,
- remove_subjects=self.simulation.subjects.removes,
- is_cycle=True,
- )
- self.terminal_manager.send(cycle_package)
-
- # Reinitialize these list for next cycle
- self.simulation.subjects.adds = []
- self.simulation.subjects.removes = []
-
- self._end_cycle()
- except KeyboardInterrupt:
- self.logger.info('KeyboardInterrupt: stop the loop')
- pass # Just stop while
- except Exception as exc:
- self.logger.exception('Fatal error during simulation')
-
- self.logger.info('Getting out of loop. Terminating.')
- self.terminal_manager.stop()
- self.cycle_manager.stop()
- self.logger.info('Terminated')
-
- def _start_cycle(self):
- time_ = time.time()
- self.logger.info('Start cycle at time {}'.format(time_))
- self._current_cycle_start_time = time_
-
- def _end_cycle(self) -> None:
- """
- Make a sleep if cycle duration take less time of wanted (see
- cycles_per_seconds constructor parameter)
- """
- time_ = time.time()
- self.logger.info('End of cycle at time {}'.format(time_))
- cycle_duration = time_ - self._current_cycle_start_time
- sleep_time = self._loop_delta - cycle_duration
- self.logger.info('Sleep time is {}'.format(sleep_time))
- if sleep_time > 0:
- time.sleep(sleep_time)
|