cycle.py 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. # coding: utf-8
  2. import multiprocessing
  3. import typing
  4. import time
  5. from synergine2.base import BaseObject
  6. from synergine2.config import Config
  7. from synergine2.exceptions import SynergineException
  8. from synergine2.log import SynergineLogger
  9. from synergine2.processing import ProcessManager
  10. from synergine2.share import shared
  11. from synergine2.simulation import Subject
  12. from synergine2.simulation import Simulation
  13. from synergine2.simulation import SubjectBehaviour
  14. from synergine2.simulation import SubjectMechanism
  15. from synergine2.simulation import Event
  16. from synergine2.utils import time_it
  17. JOB_TYPE_SUBJECTS = 0
  18. JOB_TYPE_SIMULATION = 1
  19. class CycleManager(BaseObject):
  20. def __init__(
  21. self,
  22. config: Config,
  23. logger: SynergineLogger,
  24. simulation: Simulation,
  25. process_manager: ProcessManager=None,
  26. ):
  27. # TODO: reproduire le mechanisme d'index de behaviour/etc pour simulation
  28. self.config = config
  29. self.logger = logger
  30. self.simulation = simulation
  31. self.current_cycle = -1
  32. self.first_cycle = True
  33. # TODO NOW: Les processes devront maintenir une liste des subjects qui sont nouveaux.ne connaissent pas
  34. # Attention a ce qu'in ne soient pas "expose" quand on créer ces subjects au sein du process.
  35. # Ces subjects ont vocation à adopter l'id du vrau subject tout de suite après leur instanciation
  36. if process_manager is None:
  37. process_manager = ProcessManager(
  38. config=config,
  39. # TODO: Changer de config de merde (core.use_x_cores)
  40. process_count=config.get('core', {}).get('use_x_cores', multiprocessing.cpu_count()),
  41. job=self.job,
  42. )
  43. self.process_manager = process_manager
  44. def job(self, worker_id: int, process_count: int, job_type: str) -> 'TODO':
  45. # ICI: (in process) on doit avoir:
  46. # La tranche x:y de sujets à traiter
  47. shared.refresh()
  48. if job_type == JOB_TYPE_SUBJECTS:
  49. return self._job_subjects(worker_id, process_count)
  50. if job_type == JOB_TYPE_SIMULATION:
  51. return self._job_simulation(worker_id, process_count)
  52. raise SynergineException('Unknown job type "{}"'.format(job_type))
  53. def _job_subjects(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  54. # Determine list of process subject to work with
  55. subject_ids = shared.get('subject_ids')
  56. chunk_length, rest = divmod(len(subject_ids), process_count)
  57. from_ = chunk_length * worker_id
  58. to_ = from_ + chunk_length
  59. if worker_id + 1 == process_count:
  60. to_ += rest
  61. subject_ids_to_parse = subject_ids[from_:to_]
  62. # Build list of subjects for compute them
  63. subjects = []
  64. for subject_id in subject_ids_to_parse:
  65. subject = self.simulation.get_or_create_subject(subject_id)
  66. subjects.append(subject)
  67. results_by_subjects = self._subjects_computing(subjects)
  68. return results_by_subjects
  69. def _job_simulation(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  70. self.logger.info('Simulation computing (worker {})'.format(worker_id))
  71. mechanisms = self.simulation.mechanisms.values()
  72. mechanisms_data = {}
  73. behaviours_data = {}
  74. self.logger.info('{} mechanisms to compute'.format(str(len(mechanisms))))
  75. if self.logger.is_debug:
  76. self.logger.debug('Mechanisms are: {}'.format(
  77. str([m.repr_debug() for m in mechanisms])
  78. ))
  79. for mechanism in mechanisms:
  80. mechanism_data = mechanism.run(
  81. process_number=worker_id,
  82. process_count=process_count,
  83. )
  84. if self.logger.is_debug:
  85. self.logger.debug('{} mechanism product data: {}'.format(
  86. type(mechanism).__name__,
  87. str(mechanism_data),
  88. ))
  89. mechanisms_data[mechanism.__class__] = mechanism_data
  90. behaviours = self.simulation.behaviours.values()
  91. self.logger.info('{} behaviours to compute'.format(str(len(behaviours))))
  92. if self.logger.is_debug:
  93. self.logger.debug('Behaviours are: {}'.format(
  94. str([b.repr_debug() for b in behaviours])
  95. ))
  96. for behaviour in behaviours:
  97. behaviour_data = behaviour.run(mechanisms_data) # TODO: Behaviours dependencies
  98. if self.logger.is_debug:
  99. self.logger.debug('{} behaviour produce data: {}'.format(
  100. type(behaviour).__name__,
  101. behaviour_data,
  102. ))
  103. if behaviour_data:
  104. behaviours_data[behaviour.__class__] = behaviour_data
  105. return behaviours_data
  106. def next(self) -> [Event]:
  107. if self.first_cycle:
  108. # To dispatch subjects add/removes, enable track on them
  109. self.simulation.subjects.track_changes = True
  110. self.first_cycle = False
  111. self.current_cycle += 1
  112. self.logger.info('Process cycle {}'.format(self.current_cycle))
  113. events = []
  114. shared.commit()
  115. # TODO: gestion des behaviours non parallelisables
  116. # TODO: Proposer des ordres d'execution
  117. with time_it() as elapsed_time:
  118. events.extend(self._get_subjects_events())
  119. print('Cycle subjects events duration: {}s'.format(elapsed_time.get_final_time()))
  120. with time_it() as elapsed_time:
  121. events.extend(self._get_simulation_events())
  122. print('Cycle simulation events duration: {}s'.format(elapsed_time.get_final_time()))
  123. self.logger.info('Cycle {} generate {} events'.format(
  124. str(self.current_cycle),
  125. str(len(events)),
  126. ))
  127. return events
  128. def _get_simulation_events(self) -> [Event]:
  129. events = []
  130. results = {}
  131. self.logger.info('Process simulation events')
  132. # TODO: Think about compute simulation cycle in workers
  133. results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SIMULATION)
  134. for process_result in results_by_processes:
  135. for behaviour_class, behaviour_result in process_result.items():
  136. results[behaviour_class] = behaviour_class.merge_data(
  137. behaviour_result,
  138. results.get(behaviour_class),
  139. )
  140. self.logger.info('Simulation generate {} behaviours'.format(len(results)))
  141. # Make events
  142. for behaviour_class, behaviour_data in results.items():
  143. behaviour_events = self.simulation.behaviours[behaviour_class].action(behaviour_data)
  144. self.logger.info('{} behaviour generate {} events'.format(
  145. str(behaviour_class),
  146. str(len(behaviour_events)),
  147. ))
  148. if self.logger.is_debug:
  149. self.logger.debug('{} behaviour generated events: {}'.format(
  150. str(behaviour_class),
  151. str([e.repr_debug() for e in behaviour_events]),
  152. ))
  153. events.extend(behaviour_events)
  154. self.logger.info('Simulation behaviours generate {} events'.format(len(events)))
  155. return events
  156. def _get_subjects_events(self) -> [Event]:
  157. events = []
  158. results = {}
  159. self.logger.info('Process subjects events')
  160. results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SUBJECTS)
  161. for process_results in results_by_processes:
  162. results.update(process_results)
  163. # Duplicate list to prevent conflicts with behaviours subjects manipulations
  164. for subject in self.simulation.subjects[:]:
  165. subject_behaviours_results = results.get(subject.id, {})
  166. if subject.behaviour_selector:
  167. # TODO: Looging
  168. subject_behaviours_results = subject.behaviour_selector.reduce_behaviours(dict(
  169. subject_behaviours_results,
  170. ))
  171. subject_behaviours = subject.behaviours
  172. for behaviour_class, behaviour_data in subject_behaviours_results.items():
  173. # TODO: Ajouter une etape de selection des actions a faire (genre neuronnal)
  174. # (genre se cacher et fuir son pas compatibles)
  175. behaviour_events = subject_behaviours[behaviour_class].action(behaviour_data)
  176. self.logger.info('{} behaviour for subject {} generate {} events'.format(
  177. str(behaviour_class.__name__),
  178. str(subject.id),
  179. str(len(behaviour_events)),
  180. ))
  181. if self.logger.is_debug:
  182. self.logger.debug('{} behaviour for subject {} generated events: {}'.format(
  183. str(behaviour_class.__name__),
  184. str(subject.id),
  185. str([e.repr_debug() for e in behaviour_events]),
  186. ))
  187. events.extend(behaviour_events)
  188. self.logger.info('Subjects behaviours generate {} events'.format(len(events)))
  189. return events
  190. def _subjects_computing(
  191. self,
  192. subjects,
  193. process_number=None,
  194. process_count=None,
  195. ) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
  196. results = {}
  197. self.logger.info('Subjects computing: {} subjects to compute'.format(str(len(subjects))))
  198. for subject in subjects:
  199. mechanisms = subject.mechanisms.values()
  200. if mechanisms:
  201. self.logger.info('Subject {}: {} mechanisms'.format(
  202. str(subject.id),
  203. str(len(mechanisms)),
  204. ))
  205. if self.logger.is_debug:
  206. self.logger.info('Subject {}: mechanisms are: {}'.format(
  207. str(subject.id),
  208. str([m.repr_debug for m in mechanisms])
  209. ))
  210. mechanisms_data = {}
  211. behaviours_data = {}
  212. for mechanism in mechanisms:
  213. with time_it() as elapsed_time:
  214. mechanism_data = mechanism.run()
  215. if self.logger.is_debug:
  216. self.logger.debug('Subject {}: {} mechanisms produce data: {} in {}s'.format(
  217. str(subject.id),
  218. type(mechanism).__name__,
  219. str(mechanism_data),
  220. elapsed_time.get_final_time(),
  221. ))
  222. mechanisms_data[mechanism.__class__] = mechanism_data
  223. if mechanisms:
  224. if self.logger.is_debug:
  225. self.logger.info('Subject {}: mechanisms data are: {}'.format(
  226. str(subject.id),
  227. str(mechanisms_data),
  228. ))
  229. subject_behaviours = subject.behaviours
  230. if not subject_behaviours:
  231. break
  232. self.logger.info('Subject {}: have {} behaviours'.format(
  233. str(subject.id),
  234. str(len(subject_behaviours)),
  235. ))
  236. for behaviour_key, behaviour in list(subject_behaviours.items()):
  237. self.logger.info('Subject {}: run {} behaviour'.format(
  238. str(subject.id),
  239. str(type(behaviour)),
  240. ))
  241. if behaviour.is_terminated():
  242. del subject.behaviours[behaviour_key]
  243. # We identify behaviour data with it's class to be able to intersect it after subprocess data collect
  244. if behaviour.is_skip(self.current_cycle):
  245. behaviour_data = False
  246. self.logger.debug('Subject {}: behaviour {} skip'.format(
  247. str(subject.id),
  248. str(type(behaviour)),
  249. ))
  250. else:
  251. with time_it() as elapsed_time:
  252. behaviour.last_execution_time = time.time()
  253. behaviour_data = behaviour.run(mechanisms_data) # TODO: Behaviours dependencies
  254. if self.logger.is_debug:
  255. self.logger.debug('Subject {}: behaviour {} produce data: {} in {}s'.format(
  256. str(type(behaviour)),
  257. str(subject.id),
  258. str(behaviour_data),
  259. elapsed_time.get_final_time(),
  260. ))
  261. if behaviour_data:
  262. behaviours_data[behaviour.__class__] = behaviour_data
  263. results[subject.id] = behaviours_data
  264. return results
  265. def apply_actions(
  266. self,
  267. simulation_actions: [tuple]=None,
  268. subject_actions: [tuple]=None,
  269. ) -> [Event]:
  270. """
  271. TODO: bien specifier la forme des parametres
  272. simulation_actions = [(class, {'data': 'foo'})]
  273. subject_actions = [(subject_id, [(class, {'data': 'foo'}])]
  274. """
  275. simulation_actions = simulation_actions or []
  276. subject_actions = subject_actions or []
  277. events = []
  278. self.logger.info('Apply {} simulation_actions and {} subject_actions'.format(
  279. len(simulation_actions),
  280. len(subject_actions),
  281. ))
  282. for subject_id, behaviours_and_data in subject_actions:
  283. subject = self.simulation.subjects.index.get(subject_id)
  284. for behaviour_class, behaviour_data in behaviours_and_data:
  285. behaviour = behaviour_class(
  286. simulation=self.simulation,
  287. subject=subject,
  288. )
  289. self.logger.info('Apply {} behaviour on subject {}'.format(
  290. str(behaviour_class),
  291. str(subject_id),
  292. ))
  293. if self.logger.is_debug:
  294. self.logger.debug('{} behaviour data is {}'.format(
  295. str(behaviour_class),
  296. str(behaviour_data),
  297. ))
  298. behaviour_events = behaviour.action(behaviour_data)
  299. self.logger.info('{} events from behaviour {} from subject {}'.format(
  300. len(behaviour_events),
  301. str(behaviour_class),
  302. str(subject_id),
  303. ))
  304. if self.logger.is_debug:
  305. self.logger.debug('Events from behaviour {} from subject {} are: {}'.format(
  306. str(behaviour_class),
  307. str(subject_id),
  308. str([e.repr_debug() for e in behaviour_events])
  309. ))
  310. events.extend(behaviour_events)
  311. for behaviour_class, behaviour_data in simulation_actions:
  312. behaviour = behaviour_class(
  313. config=self.config,
  314. simulation=self.simulation,
  315. )
  316. self.logger.info('Apply {} simulation behaviour'.format(
  317. str(behaviour_class),
  318. ))
  319. behaviour_events = behaviour.action(behaviour_data)
  320. if self.logger.is_debug:
  321. self.logger.debug('Events from behaviour {} are: {}'.format(
  322. str(behaviour_class),
  323. str([e.repr_debug() for e in behaviour_events])
  324. ))
  325. events.extend(behaviour_events)
  326. self.logger.info('{} events generated'.format(len(events)))
  327. return events
  328. def stop(self) -> None:
  329. self.process_manager.terminate()