cycle.py 18KB


  1. # coding: utf-8
  2. import multiprocessing
  3. import typing
  4. import time
  5. from synergine2.base import BaseObject
  6. from synergine2.config import Config
  7. from synergine2.exceptions import SynergineException
  8. from synergine2.log import get_logger
  9. from synergine2.processing import ProcessManager
  10. from synergine2.share import shared
  11. from synergine2.simulation import Subject
  12. from synergine2.simulation import Simulation
  13. from synergine2.simulation import SubjectBehaviour
  14. from synergine2.simulation import SimulationBehaviour
  15. from synergine2.simulation import SubjectMechanism
  16. from synergine2.simulation import SimulationMechanism
  17. from synergine2.simulation import Event
  18. from synergine2.utils import time_it
  19. JOB_TYPE_SUBJECTS = 0
  20. JOB_TYPE_SIMULATION = 1
  21. class CycleManager(BaseObject):
  22. def __init__(
  23. self,
  24. config: Config,
  25. simulation: Simulation,
  26. process_manager: ProcessManager=None,
  27. ):
  28. # TODO: reproduire le mechanisme d'index de behaviour/etc pour simulation
  29. self.config = config
  30. self.logger = get_logger('Cycle', config)
  31. self.simulation = simulation
  32. self.current_cycle = -1
  33. self.first_cycle = True
  34. # TODO NOW: Les processes devront maintenir une liste des subjects qui sont nouveaux.ne connaissent pas
  35. # Attention a ce qu'in ne soient pas "expose" quand on créer ces subjects au sein du process.
  36. # Ces subjects ont vocation à adopter l'id du vrau subject tout de suite après leur instanciation
  37. if process_manager is None:
  38. process_manager = ProcessManager(
  39. config=config,
  40. # TODO: Changer de config de merde (core.use_x_cores)
  41. process_count=config.get('core', {}).get('use_x_cores', multiprocessing.cpu_count()),
  42. job=self.job,
  43. )
  44. self.process_manager = process_manager
  45. def job(self, worker_id: int, process_count: int, job_type: str) -> 'TODO':
  46. # ICI: (in process) on doit avoir:
  47. # La tranche x:y de sujets à traiter
  48. shared.refresh()
  49. if job_type == JOB_TYPE_SUBJECTS:
  50. return self._job_subjects(worker_id, process_count)
  51. if job_type == JOB_TYPE_SIMULATION:
  52. return self._job_simulation(worker_id, process_count)
  53. raise SynergineException('Unknown job type "{}"'.format(job_type))
  54. def _job_subjects(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  55. # Determine list of process subject to work with
  56. subject_ids = shared.get('subject_ids')
  57. chunk_length, rest = divmod(len(subject_ids), process_count)
  58. from_ = chunk_length * worker_id
  59. to_ = from_ + chunk_length
  60. if worker_id + 1 == process_count:
  61. to_ += rest
  62. subject_ids_to_parse = subject_ids[from_:to_]
  63. # Build list of subjects for compute them
  64. subjects = []
  65. for subject_id in subject_ids_to_parse:
  66. subject = self.simulation.get_or_create_subject(subject_id)
  67. subjects.append(subject)
  68. results_by_subjects = self._subjects_computing(subjects)
  69. return results_by_subjects
  70. def _job_simulation(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  71. self.logger.info('Simulation computing (worker {})'.format(worker_id))
  72. behaviours = self.get_simulation_active_behaviours()
  73. mechanisms = self.get_mechanisms_from_behaviours(behaviours)
  74. mechanisms_data = {}
  75. behaviours_data = {}
  76. self.logger.info('{} mechanisms to compute'.format(str(len(mechanisms))))
  77. if self.logger.is_debug:
  78. self.logger.debug('Mechanisms are: {}'.format(
  79. str([m.repr_debug() for m in mechanisms])
  80. ))
  81. for mechanism in mechanisms:
  82. mechanism_data = mechanism.run(
  83. process_number=worker_id,
  84. process_count=process_count,
  85. )
  86. if self.logger.is_debug:
  87. self.logger.debug('{} mechanism product data: {}'.format(
  88. type(mechanism).__name__,
  89. str(mechanism_data),
  90. ))
  91. mechanisms_data[mechanism.__class__] = mechanism_data
  92. behaviours = self.simulation.behaviours.values()
  93. self.logger.info('{} behaviours to compute'.format(str(len(behaviours))))
  94. if self.logger.is_debug:
  95. self.logger.debug('Behaviours are: {}'.format(
  96. str([b.repr_debug() for b in behaviours])
  97. ))
  98. for behaviour in behaviours:
  99. if behaviour.is_skip(self.current_cycle):
  100. behaviour_data = False
  101. self.logger.debug('Simulation: behaviour {} skip'.format(
  102. str(type(behaviour)),
  103. ))
  104. else:
  105. # TODO: Behaviours dependencies
  106. behaviour_data = behaviour.run(mechanisms_data)
  107. if self.logger.is_debug:
  108. self.logger.debug('{} behaviour produce data: {}'.format(
  109. type(behaviour).__name__,
  110. behaviour_data,
  111. ))
  112. if behaviour_data:
  113. behaviours_data[behaviour.__class__] = behaviour_data
  114. return behaviours_data
  115. def next(self) -> [Event]:
  116. if self.first_cycle:
  117. self.process_manager.start_workers()
  118. # To dispatch subjects add/removes, enable track on them
  119. self.simulation.subjects.track_changes = True
  120. self.first_cycle = False
  121. self.current_cycle += 1
  122. self.logger.info('Process cycle {}'.format(self.current_cycle))
  123. events = []
  124. shared.commit()
  125. # TODO: gestion des behaviours non parallelisables
  126. # TODO: Proposer des ordres d'execution
  127. with time_it() as elapsed_time:
  128. events.extend(self._get_subjects_events())
  129. self.logger.debug('Cycle subjects events duration: {}s'.format(
  130. elapsed_time.get_final_time()),
  131. )
  132. with time_it() as elapsed_time:
  133. events.extend(self._get_simulation_events())
  134. self.logger.debug('Cycle simulation events duration: {}s'.format(
  135. elapsed_time.get_final_time()),
  136. )
  137. self.logger.info('Cycle {} generate {} events'.format(
  138. str(self.current_cycle),
  139. str(len(events)),
  140. ))
  141. return events
  142. def _get_simulation_events(self) -> [Event]:
  143. events = []
  144. results = {}
  145. self.logger.info('Process simulation events')
  146. # TODO: Think about compute simulation cycle in workers
  147. results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SIMULATION)
  148. for process_result in results_by_processes:
  149. for behaviour_class, behaviour_result in process_result.items():
  150. results[behaviour_class] = behaviour_class.merge_data(
  151. behaviour_result,
  152. results.get(behaviour_class),
  153. )
  154. self.logger.info('Simulation generate {} behaviours'.format(len(results)))
  155. # Make events
  156. for behaviour_class, behaviour_data in results.items():
  157. behaviour_events = self.simulation.behaviours[behaviour_class].action(behaviour_data)
  158. self.logger.info('{} behaviour generate {} events'.format(
  159. str(behaviour_class),
  160. str(len(behaviour_events)),
  161. ))
  162. if self.logger.is_debug:
  163. self.logger.debug('{} behaviour generated events: {}'.format(
  164. str(behaviour_class),
  165. str([e.repr_debug() for e in behaviour_events]),
  166. ))
  167. events.extend(behaviour_events)
  168. self.logger.info('Simulation behaviours generate {} events'.format(len(events)))
  169. return events
  170. def _get_subjects_events(self) -> [Event]:
  171. events = []
  172. results = {}
  173. self.logger.info('Process subjects events')
  174. results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SUBJECTS)
  175. for process_results in results_by_processes:
  176. results.update(process_results)
  177. # Duplicate list to prevent conflicts with behaviours subjects manipulations
  178. for subject in self.simulation.subjects[:]:
  179. subject_behaviours_results = results.get(subject.id, {})
  180. if subject.behaviour_selector:
  181. # TODO: Looging
  182. subject_behaviours_results = subject.behaviour_selector.reduce_behaviours(dict(
  183. subject_behaviours_results,
  184. ))
  185. subject_behaviours = subject.behaviours
  186. for behaviour_class, behaviour_data in subject_behaviours_results.items():
  187. # TODO: Ajouter une etape de selection des actions a faire (genre neuronnal)
  188. # (genre se cacher et fuir son pas compatibles)
  189. behaviour_events = subject_behaviours[behaviour_class].action(behaviour_data)
  190. self.logger.info('{} behaviour for subject {} generate {} events'.format(
  191. str(behaviour_class.__name__),
  192. str(subject.id),
  193. str(len(behaviour_events)),
  194. ))
  195. if self.logger.is_debug:
  196. self.logger.debug('{} behaviour for subject {} generated events: {}'.format(
  197. str(behaviour_class.__name__),
  198. str(subject.id),
  199. str([e.repr_debug() for e in behaviour_events]),
  200. ))
  201. events.extend(behaviour_events)
  202. self.logger.info('Subjects behaviours generate {} events'.format(len(events)))
  203. return events
  204. def _subjects_computing(
  205. self,
  206. subjects,
  207. process_number=None,
  208. process_count=None,
  209. ) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  210. results = {}
  211. self.logger.info('Subjects computing: {} subjects to compute'.format(str(len(subjects))))
  212. for subject in subjects:
  213. subject_behaviours = self.get_active_subject_behaviors(subject)
  214. if not subject_behaviours:
  215. continue
  216. mechanisms = self.get_mechanisms_from_behaviors(subject_behaviours, subject)
  217. if mechanisms:
  218. self.logger.info('Subject {}: {} mechanisms'.format(
  219. str(subject.id),
  220. str(len(mechanisms)),
  221. ))
  222. if self.logger.is_debug:
  223. self.logger.info('Subject {}: mechanisms are: {}'.format(
  224. str(subject.id),
  225. str([m.repr_debug for m in mechanisms])
  226. ))
  227. mechanisms_data = {}
  228. behaviours_data = {}
  229. for mechanism in mechanisms:
  230. with time_it() as elapsed_time:
  231. mechanism_data = mechanism.run()
  232. if self.logger.is_debug:
  233. self.logger.debug('Subject {}: {} mechanisms produce data: {} in {}s'.format(
  234. str(subject.id),
  235. type(mechanism).__name__,
  236. str(mechanism_data),
  237. elapsed_time.get_final_time(),
  238. ))
  239. mechanisms_data[mechanism.__class__] = mechanism_data
  240. if mechanisms:
  241. if self.logger.is_debug:
  242. self.logger.info('Subject {}: mechanisms data are: {}'.format(
  243. str(subject.id),
  244. str(mechanisms_data),
  245. ))
  246. self.logger.info('Subject {}: have {} behaviours'.format(
  247. str(subject.id),
  248. str(len(subject_behaviours)),
  249. ))
  250. for behaviour in subject_behaviours:
  251. behaviour_key = type(behaviour)
  252. self.logger.info('Subject {}: run {} behaviour'.format(
  253. str(subject.id),
  254. str(type(behaviour)),
  255. ))
  256. if behaviour.is_terminated():
  257. del subject.behaviours[behaviour_key]
  258. # We identify behaviour data with it's class to be able to intersect
  259. # it after subprocess data collect
  260. if behaviour.is_skip(self.current_cycle):
  261. behaviour_data = False
  262. self.logger.debug('Subject {}: behaviour {} skip'.format(
  263. str(subject.id),
  264. str(type(behaviour)),
  265. ))
  266. else:
  267. with time_it() as elapsed_time:
  268. behaviour.last_execution_time = time.time()
  269. # TODO: Behaviours dependencies
  270. behaviour_data = behaviour.run(mechanisms_data)
  271. if self.logger.is_debug:
  272. self.logger.debug(
  273. 'Subject {}: behaviour {} produce '
  274. 'data: {} in {}s'.format(
  275. str(type(behaviour)),
  276. str(subject.id),
  277. str(behaviour_data),
  278. elapsed_time.get_time(),
  279. )
  280. )
  281. if behaviour_data:
  282. behaviours_data[behaviour.__class__] = behaviour_data
  283. results[subject.id] = behaviours_data
  284. return results
  285. def apply_actions(
  286. self,
  287. simulation_actions: [tuple]=None,
  288. subject_actions: [tuple]=None,
  289. ) -> [Event]:
  290. """
  291. TODO: bien specifier la forme des parametres
  292. simulation_actions = [(class, {'data': 'foo'})]
  293. subject_actions = [(subject_id, [(class, {'data': 'foo'}])]
  294. """
  295. simulation_actions = simulation_actions or []
  296. subject_actions = subject_actions or []
  297. events = []
  298. self.logger.info('Apply {} simulation_actions and {} subject_actions'.format(
  299. len(simulation_actions),
  300. len(subject_actions),
  301. ))
  302. for subject_id, behaviours_and_data in subject_actions:
  303. subject = self.simulation.subjects.index.get(subject_id)
  304. for behaviour_class, behaviour_data in behaviours_and_data:
  305. behaviour = behaviour_class(
  306. simulation=self.simulation,
  307. subject=subject,
  308. )
  309. self.logger.info('Apply {} behaviour on subject {}'.format(
  310. str(behaviour_class),
  311. str(subject_id),
  312. ))
  313. if self.logger.is_debug:
  314. self.logger.debug('{} behaviour data is {}'.format(
  315. str(behaviour_class),
  316. str(behaviour_data),
  317. ))
  318. behaviour_events = behaviour.action(behaviour_data)
  319. self.logger.info('{} events from behaviour {} from subject {}'.format(
  320. len(behaviour_events),
  321. str(behaviour_class),
  322. str(subject_id),
  323. ))
  324. if self.logger.is_debug:
  325. self.logger.debug('Events from behaviour {} from subject {} are: {}'.format(
  326. str(behaviour_class),
  327. str(subject_id),
  328. str([e.repr_debug() for e in behaviour_events])
  329. ))
  330. events.extend(behaviour_events)
  331. for behaviour_class, behaviour_data in simulation_actions:
  332. behaviour = behaviour_class(
  333. config=self.config,
  334. simulation=self.simulation,
  335. )
  336. self.logger.info('Apply {} simulation behaviour'.format(
  337. str(behaviour_class),
  338. ))
  339. behaviour_events = behaviour.action(behaviour_data)
  340. if self.logger.is_debug:
  341. self.logger.debug('Events from behaviour {} are: {}'.format(
  342. str(behaviour_class),
  343. str([e.repr_debug() for e in behaviour_events])
  344. ))
  345. events.extend(behaviour_events)
  346. self.logger.info('{} events generated'.format(len(events)))
  347. return events
  348. def stop(self) -> None:
  349. self.process_manager.terminate()
  350. def get_active_subject_behaviors(
  351. self,
  352. subject: Subject,
  353. ) -> typing.List[SubjectBehaviour]:
  354. behaviours = []
  355. for behaviour in subject.behaviours.values():
  356. if behaviour.is_skip(self.current_cycle):
  357. self.logger.debug('Subject {}: behaviour {} skip'.format(
  358. str(subject.id),
  359. str(type(behaviour)),
  360. ))
  361. else:
  362. behaviours.append(behaviour)
  363. return behaviours
  364. def get_mechanisms_from_behaviors(
  365. self,
  366. subject_behaviours: typing.List[SubjectBehaviour],
  367. subject: Subject,
  368. ) -> typing.List[SubjectMechanism]:
  369. # TODO BS 20180109: Not very optimized ... could be enhanced
  370. mechanisms = set()
  371. for subject_mechanism in subject.mechanisms.values():
  372. for subject_behaviour in subject_behaviours:
  373. for behaviour_mechanism_class in subject_behaviour.use:
  374. if isinstance(subject_mechanism, behaviour_mechanism_class):
  375. mechanisms.add(subject_mechanism)
  376. return list(mechanisms)
  377. def get_simulation_active_behaviours(self) -> typing.List[SimulationBehaviour]:
  378. behaviours = []
  379. for behaviour in self.simulation.behaviours.values():
  380. if behaviour.is_skip(self.current_cycle):
  381. self.logger.debug('Simulation: behaviour {} skip'.format(
  382. str(type(behaviour)),
  383. ))
  384. else:
  385. behaviours.append(behaviour)
  386. return behaviours
  387. def get_mechanisms_from_behaviours(
  388. self,
  389. behaviours: typing.List[SimulationBehaviour],
  390. ) -> typing.List[SimulationMechanism]:
  391. # TODO BS 20180109: Not very optimized ... could be enhanced
  392. mechanisms = set()
  393. for simulation_mechanism in self.simulation.mechanisms.values():
  394. for simulation_behaviour in behaviours:
  395. for simulation_mechanism_class in simulation_behaviour.use:
  396. if isinstance(simulation_mechanism, simulation_mechanism_class):
  397. mechanisms.add(simulation_mechanism)
  398. return list(mechanisms)