|
@@ -1,17 +1,23 @@
|
1
|
1
|
# coding: utf-8
|
2
|
2
|
import multiprocessing
|
|
3
|
+import typing
|
3
|
4
|
|
4
|
5
|
from synergine2.base import BaseObject
|
5
|
6
|
from synergine2.config import Config
|
|
7
|
+from synergine2.exceptions import SynergineException
|
6
|
8
|
from synergine2.log import SynergineLogger
|
7
|
9
|
from synergine2.processing import ProcessManager
|
8
|
|
-from synergine2.simulation import SimulationMechanism
|
9
|
|
-from synergine2.simulation import SimulationBehaviour
|
|
10
|
+from synergine2.share import shared
|
|
11
|
+from synergine2.simulation import Subject
|
10
|
12
|
from synergine2.simulation import Simulation
|
11
|
13
|
from synergine2.simulation import SubjectBehaviour
|
12
|
14
|
from synergine2.simulation import SubjectMechanism
|
13
|
15
|
from synergine2.simulation import Event
|
14
|
|
-from synergine2.utils import ChunkManager, time_it
|
|
16
|
+from synergine2.utils import time_it
|
|
17
|
+
|
|
18
|
+
|
|
19
|
+JOB_TYPE_SUBJECTS = 0
|
|
20
|
+JOB_TYPE_SIMULATION = 1
|
15
|
21
|
|
16
|
22
|
|
17
|
23
|
class CycleManager(BaseObject):
|
|
@@ -22,20 +28,107 @@ class CycleManager(BaseObject):
|
22
|
28
|
simulation: Simulation,
|
23
|
29
|
process_manager: ProcessManager=None,
|
24
|
30
|
):
|
|
31
|
+ # TODO: reproduire le mechanisme d'index de behaviour/etc pour simulation
|
|
32
|
+ self.config = config
|
|
33
|
+ self.logger = logger
|
|
34
|
+ self.simulation = simulation
|
|
35
|
+ self.current_cycle = -1
|
|
36
|
+ self.first_cycle = True
|
|
37
|
+
|
|
38
|
+ self.subject_mechanisms_cache = {} # type: typing.Dict[int, typing.Dict[str, SubjectMechanism]]
|
|
39
|
+ self.subject_behaviours_cache = {} # type: typing.Dict[int, typing.Dict[str, SubjectBehaviour]]
|
|
40
|
+
|
|
41
|
+ # TODO NOW: Les processes devront maintenir une liste des subjects qui sont nouveaux.ne connaissent pas
|
|
42
|
+ # Attention a ce qu'in ne soient pas "expose" quand on créer ces subjects au sein du process.
|
|
43
|
+ # Ces subjects ont vocation à adopter l'id du vrau subject tout de suite après leur instanciation
|
25
|
44
|
if process_manager is None:
|
26
|
45
|
process_manager = ProcessManager(
|
27
|
46
|
config=config,
|
28
|
47
|
# TODO: Changer de config de merde (core.use_x_cores)
|
29
|
48
|
process_count=config.get('core', {}).get('use_x_cores', multiprocessing.cpu_count()),
|
30
|
|
- chunk_manager=ChunkManager(multiprocessing.cpu_count()),
|
|
49
|
+ job=self.job,
|
31
|
50
|
)
|
32
|
|
-
|
33
|
|
- self.config = config
|
34
|
|
- self.logger = logger
|
35
|
|
- self.simulation = simulation
|
36
|
51
|
self.process_manager = process_manager
|
37
|
|
- self.current_cycle = -1
|
38
|
|
- self.first_cycle = True
|
|
52
|
+
|
|
53
|
+ def job(self, worker_id: int, process_count: int, job_type: str) -> 'TODO':
|
|
54
|
+ # ICI: (in process) on doit avoir:
|
|
55
|
+ # La tranche x:y de sujets à traiter
|
|
56
|
+ shared.refresh()
|
|
57
|
+ if job_type == JOB_TYPE_SUBJECTS:
|
|
58
|
+ return self._job_subjects(worker_id, process_count)
|
|
59
|
+ if job_type == JOB_TYPE_SIMULATION:
|
|
60
|
+ return self._job_simulation(worker_id, process_count)
|
|
61
|
+ raise SynergineException('Unknown job type "{}"'.format(job_type))
|
|
62
|
+
|
|
63
|
+ def _job_subjects(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
|
|
64
|
+ # Determine list of process subject to work with
|
|
65
|
+ subject_ids = shared.get('subject_ids')
|
|
66
|
+ chunk_length, rest = divmod(len(subject_ids), process_count)
|
|
67
|
+
|
|
68
|
+ from_ = chunk_length * worker_id
|
|
69
|
+ to_ = from_ + chunk_length
|
|
70
|
+
|
|
71
|
+ if worker_id + 1 == process_count:
|
|
72
|
+ to_ += rest
|
|
73
|
+
|
|
74
|
+ subject_ids_to_parse = subject_ids[from_:to_]
|
|
75
|
+
|
|
76
|
+ # Build list of subjects for compute them
|
|
77
|
+ subjects = []
|
|
78
|
+ for subject_id in subject_ids_to_parse:
|
|
79
|
+ subject = self.simulation.get_or_create_subject(subject_id)
|
|
80
|
+ subjects.append(subject)
|
|
81
|
+
|
|
82
|
+ results_by_subjects = self._subjects_computing(subjects)
|
|
83
|
+ return results_by_subjects
|
|
84
|
+
|
|
85
|
+ def _job_simulation(self, worker_id: int, process_count: int) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
|
|
86
|
+ self.logger.info('Simulation computing (worker {})'.format(worker_id))
|
|
87
|
+
|
|
88
|
+ mechanisms = self.simulation.mechanisms.values()
|
|
89
|
+ mechanisms_data = {}
|
|
90
|
+ behaviours_data = {}
|
|
91
|
+
|
|
92
|
+ self.logger.info('{} mechanisms to compute'.format(str(len(mechanisms))))
|
|
93
|
+ if self.logger.is_debug:
|
|
94
|
+ self.logger.debug('Mechanisms are: {}'.format(
|
|
95
|
+ str([m.repr_debug() for m in mechanisms])
|
|
96
|
+ ))
|
|
97
|
+
|
|
98
|
+ for mechanism in mechanisms:
|
|
99
|
+ mechanism_data = mechanism.run(
|
|
100
|
+ process_number=worker_id,
|
|
101
|
+ process_count=process_count,
|
|
102
|
+ )
|
|
103
|
+
|
|
104
|
+ if self.logger.is_debug:
|
|
105
|
+ self.logger.debug('{} mechanism product data: {}'.format(
|
|
106
|
+ type(mechanism).__name__,
|
|
107
|
+ str(mechanism_data),
|
|
108
|
+ ))
|
|
109
|
+
|
|
110
|
+ mechanisms_data[type(mechanism)] = mechanism_data
|
|
111
|
+
|
|
112
|
+ behaviours = self.simulation.behaviours.values()
|
|
113
|
+ self.logger.info('{} behaviours to compute'.format(str(len(behaviours))))
|
|
114
|
+
|
|
115
|
+ if self.logger.is_debug:
|
|
116
|
+ self.logger.debug('Behaviours are: {}'.format(
|
|
117
|
+ str([b.repr_debug() for b in behaviours])
|
|
118
|
+ ))
|
|
119
|
+
|
|
120
|
+ for behaviour in behaviours:
|
|
121
|
+ behaviour_data = behaviour.run(mechanisms_data) # TODO: Behaviours dependencies
|
|
122
|
+ if self.logger.is_debug:
|
|
123
|
+ self.logger.debug('{} behaviour produce data: {}'.format(
|
|
124
|
+ type(behaviour).__name__,
|
|
125
|
+ behaviour_data,
|
|
126
|
+ ))
|
|
127
|
+
|
|
128
|
+ if behaviour_data:
|
|
129
|
+ behaviours_data[type(behaviour)] = behaviour_data
|
|
130
|
+
|
|
131
|
+ return behaviours_data
|
39
|
132
|
|
40
|
133
|
def next(self) -> [Event]:
|
41
|
134
|
if self.first_cycle:
|
|
@@ -47,6 +140,8 @@ class CycleManager(BaseObject):
|
47
|
140
|
self.logger.info('Process cycle {}'.format(self.current_cycle))
|
48
|
141
|
|
49
|
142
|
events = []
|
|
143
|
+ shared.commit()
|
|
144
|
+
|
50
|
145
|
# TODO: gestion des behaviours non parallelisables
|
51
|
146
|
# TODO: Proposer des ordres d'execution
|
52
|
147
|
with time_it() as elapsed_time:
|
|
@@ -69,10 +164,8 @@ class CycleManager(BaseObject):
|
69
|
164
|
|
70
|
165
|
self.logger.info('Process simulation events')
|
71
|
166
|
|
72
|
|
- results_by_processes = self.process_manager.execute_jobs(
|
73
|
|
- data=self.simulation,
|
74
|
|
- job_maker=self.simulation_computing,
|
75
|
|
- )
|
|
167
|
+ # TODO: Think about compute simulation cycle in workers
|
|
168
|
+ results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SIMULATION)
|
76
|
169
|
|
77
|
170
|
for process_result in results_by_processes:
|
78
|
171
|
for behaviour_class, behaviour_result in process_result.items():
|
|
@@ -107,36 +200,35 @@ class CycleManager(BaseObject):
|
107
|
200
|
results = {}
|
108
|
201
|
|
109
|
202
|
self.logger.info('Process subjects events')
|
110
|
|
-
|
111
|
|
- results_by_processes = self.process_manager.chunk_and_execute_jobs(
|
112
|
|
- data=self.simulation.subjects,
|
113
|
|
- job_maker=self.subjects_computing,
|
114
|
|
- )
|
|
203
|
+ results_by_processes = self.process_manager.make_them_work(JOB_TYPE_SUBJECTS)
|
115
|
204
|
|
116
|
205
|
for process_results in results_by_processes:
|
117
|
206
|
results.update(process_results)
|
118
|
207
|
|
119
|
208
|
# Duplicate list to prevent conflicts with behaviours subjects manipulations
|
120
|
209
|
for subject in self.simulation.subjects[:]:
|
121
|
|
- subject_behaviours = results.get(subject.id, {})
|
|
210
|
+ subject_behaviours_results = results.get(subject.id, {})
|
122
|
211
|
if subject.behaviour_selector:
|
123
|
212
|
# TODO: Looging
|
124
|
|
- subject_behaviours = subject.behaviour_selector.reduce_behaviours(dict(subject_behaviours))
|
|
213
|
+ subject_behaviours_results = subject.behaviour_selector.reduce_behaviours(dict(
|
|
214
|
+ subject_behaviours_results,
|
|
215
|
+ ))
|
125
|
216
|
|
126
|
|
- for behaviour_class, behaviour_data in subject_behaviours.items():
|
|
217
|
+ subject_behaviours = self.get_subject_behaviours(subject)
|
|
218
|
+ for behaviour_class_name, behaviour_data in subject_behaviours_results.items():
|
127
|
219
|
# TODO: Ajouter une etape de selection des actions a faire (genre neuronnal)
|
128
|
220
|
# (genre se cacher et fuir son pas compatibles)
|
129
|
|
- behaviour_events = subject.behaviours[behaviour_class].action(behaviour_data)
|
|
221
|
+ behaviour_events = subject_behaviours[behaviour_class_name].action(behaviour_data)
|
130
|
222
|
|
131
|
223
|
self.logger.info('{} behaviour for subject {} generate {} events'.format(
|
132
|
|
- str(behaviour_class),
|
|
224
|
+ str(behaviour_class_name),
|
133
|
225
|
str(subject.id),
|
134
|
226
|
str(len(behaviour_events)),
|
135
|
227
|
))
|
136
|
228
|
|
137
|
229
|
if self.logger.is_debug:
|
138
|
230
|
self.logger.debug('{} behaviour for subject {} generated events: {}'.format(
|
139
|
|
- str(behaviour_class),
|
|
231
|
+ str(behaviour_class_name),
|
140
|
232
|
str(subject.id),
|
141
|
233
|
str([e.repr_debug() for e in behaviour_events]),
|
142
|
234
|
))
|
|
@@ -146,71 +238,17 @@ class CycleManager(BaseObject):
|
146
|
238
|
self.logger.info('Subjects behaviours generate {} events'.format(len(events)))
|
147
|
239
|
return events
|
148
|
240
|
|
149
|
|
- def simulation_computing(
|
150
|
|
- self,
|
151
|
|
- simulation,
|
152
|
|
- process_number,
|
153
|
|
- process_count,
|
154
|
|
- ):
|
155
|
|
- self.logger.info('Simulation computing')
|
156
|
|
-
|
157
|
|
- # TODO: necessaire de passer simulation ?
|
158
|
|
- mechanisms = self.get_mechanisms_to_compute(simulation)
|
159
|
|
- mechanisms_data = {}
|
160
|
|
- behaviours_data = {}
|
161
|
|
-
|
162
|
|
- self.logger.info('{} mechanisms to compute'.format(str(len(mechanisms))))
|
163
|
|
- if self.logger.is_debug:
|
164
|
|
- self.logger.debug('Mechanisms are: {}'.format(
|
165
|
|
- str([m.repr_debug() for m in mechanisms])
|
166
|
|
- ))
|
167
|
|
-
|
168
|
|
- for mechanism in mechanisms:
|
169
|
|
- mechanism_data = mechanism.run(
|
170
|
|
- process_number=process_number,
|
171
|
|
- process_count=process_count,
|
172
|
|
- )
|
173
|
|
-
|
174
|
|
- if self.logger.is_debug:
|
175
|
|
- self.logger.debug('{} mechanism product data: {}'.format(
|
176
|
|
- type(mechanism).__name__,
|
177
|
|
- str(mechanism_data),
|
178
|
|
- ))
|
179
|
|
-
|
180
|
|
- mechanisms_data[type(mechanism)] = mechanism_data
|
181
|
|
-
|
182
|
|
- behaviours = self.get_behaviours_to_compute(simulation)
|
183
|
|
- self.logger.info('{} behaviours to compute'.format(str(len(behaviours))))
|
184
|
|
-
|
185
|
|
- if self.logger.is_debug:
|
186
|
|
- self.logger.debug('Behaviours are: {}'.format(
|
187
|
|
- str([b.repr_debug() for b in behaviours])
|
188
|
|
- ))
|
189
|
|
-
|
190
|
|
- for behaviour in behaviours:
|
191
|
|
- behaviour_data = behaviour.run(mechanisms_data) # TODO: Behaviours dependencies
|
192
|
|
- if self.logger.is_debug:
|
193
|
|
- self.logger.debug('{} behaviour produce data: {}'.format(
|
194
|
|
- type(behaviour).__name__,
|
195
|
|
- behaviour_data,
|
196
|
|
- ))
|
197
|
|
-
|
198
|
|
- if behaviour_data:
|
199
|
|
- behaviours_data[type(behaviour)] = behaviour_data
|
200
|
|
-
|
201
|
|
- return behaviours_data
|
202
|
|
-
|
203
|
|
- def subjects_computing(
|
|
241
|
+ def _subjects_computing(
|
204
|
242
|
self,
|
205
|
243
|
subjects,
|
206
|
244
|
process_number=None,
|
207
|
245
|
process_count=None,
|
208
|
|
- ):
|
|
246
|
+ ) -> typing.Dict[int, typing.Dict[str, typing.Any]]:
|
209
|
247
|
results = {}
|
210
|
248
|
self.logger.info('Subjects computing: {} subjects to compute'.format(str(len(subjects))))
|
211
|
249
|
|
212
|
250
|
for subject in subjects:
|
213
|
|
- mechanisms = self.get_mechanisms_to_compute(subject)
|
|
251
|
+ mechanisms = self.get_subject_mechanisms(subject)
|
214
|
252
|
|
215
|
253
|
if mechanisms:
|
216
|
254
|
self.logger.info('Subject {}: {} mechanisms'.format(
|
|
@@ -221,13 +259,13 @@ class CycleManager(BaseObject):
|
221
|
259
|
if self.logger.is_debug:
|
222
|
260
|
self.logger.info('Subject {}: mechanisms are: {}'.format(
|
223
|
261
|
str(subject.id),
|
224
|
|
- str([m.repr_debug for m in mechanisms])
|
|
262
|
+ str([m.repr_debug for n, m in mechanisms.items()])
|
225
|
263
|
))
|
226
|
264
|
|
227
|
265
|
mechanisms_data = {}
|
228
|
266
|
behaviours_data = {}
|
229
|
267
|
|
230
|
|
- for mechanism in mechanisms:
|
|
268
|
+ for mechanism_class_name, mechanism in mechanisms.items():
|
231
|
269
|
with time_it() as elapsed_time:
|
232
|
270
|
mechanism_data = mechanism.run()
|
233
|
271
|
if self.logger.is_debug:
|
|
@@ -238,7 +276,7 @@ class CycleManager(BaseObject):
|
238
|
276
|
elapsed_time.get_final_time(),
|
239
|
277
|
))
|
240
|
278
|
|
241
|
|
- mechanisms_data[type(mechanism)] = mechanism_data
|
|
279
|
+ mechanisms_data[mechanism_class_name] = mechanism_data
|
242
|
280
|
|
243
|
281
|
if mechanisms:
|
244
|
282
|
if self.logger.is_debug:
|
|
@@ -247,7 +285,7 @@ class CycleManager(BaseObject):
|
247
|
285
|
str(mechanisms_data),
|
248
|
286
|
))
|
249
|
287
|
|
250
|
|
- subject_behaviours = self.get_behaviours_to_compute(subject)
|
|
288
|
+ subject_behaviours = self.get_subject_behaviours(subject)
|
251
|
289
|
if not subject_behaviours:
|
252
|
290
|
break
|
253
|
291
|
|
|
@@ -256,7 +294,7 @@ class CycleManager(BaseObject):
|
256
|
294
|
str(len(subject_behaviours)),
|
257
|
295
|
))
|
258
|
296
|
|
259
|
|
- for behaviour in subject_behaviours:
|
|
297
|
+ for behaviour_class_name, behaviour in subject_behaviours.items():
|
260
|
298
|
self.logger.info('Subject {}: run {} behaviour'.format(
|
261
|
299
|
str(subject.id),
|
262
|
300
|
str(type(behaviour)),
|
|
@@ -275,25 +313,46 @@ class CycleManager(BaseObject):
|
275
|
313
|
))
|
276
|
314
|
|
277
|
315
|
if behaviour_data:
|
278
|
|
- behaviours_data[type(behaviour)] = behaviour_data
|
|
316
|
+ behaviours_data[behaviour_class_name] = behaviour_data
|
279
|
317
|
|
280
|
318
|
results[subject.id] = behaviours_data
|
281
|
319
|
return results
|
282
|
320
|
|
283
|
|
- def get_mechanisms_to_compute(self, mechanisable) -> [SubjectMechanism, SimulationMechanism]:
|
|
321
|
+ def get_subject_mechanisms(self, subject: Subject) -> typing.Dict[str, SubjectMechanism]:
|
284
|
322
|
# TODO: Implementer un systeme qui inhibe des mechanisme (ex. someil inhibe l'ouie)
|
285
|
|
- return mechanisable.mechanisms.values()
|
|
323
|
+ # Attention: c'est utilisé dans le main process aussi, pertinent de la faire là ?
|
|
324
|
+ try:
|
|
325
|
+ return self.subject_mechanisms_cache[subject.id]
|
|
326
|
+ except KeyError:
|
|
327
|
+ mechanisms = {}
|
|
328
|
+ for mechanism_class_id in shared.get('subject_mechanisms_index')[subject.id]:
|
|
329
|
+ mechanism_class = self.simulation.index[mechanism_class_id]
|
|
330
|
+ mechanism = mechanism_class(
|
|
331
|
+ self.config,
|
|
332
|
+ self.simulation,
|
|
333
|
+ subject,
|
|
334
|
+ )
|
|
335
|
+ mechanisms[mechanism_class.__name__] = mechanism
|
|
336
|
+ self.subject_mechanisms_cache[subject.id] = mechanisms
|
|
337
|
+ return mechanisms
|
286
|
338
|
|
287
|
|
- def get_behaviours_to_compute(self, mechanisable) -> [SubjectBehaviour, SimulationBehaviour]:
|
|
339
|
+ def get_subject_behaviours(self, subject: Subject) -> typing.Dict[str, SubjectBehaviour]:
|
288
|
340
|
# TODO: Implementer un systeme qui inhibe des behaviours (ex. someil inhibe avoir faim)
|
289
|
|
- behaviours = list(mechanisable.behaviours.values())
|
290
|
|
-
|
291
|
|
- for behaviour in behaviours[:]:
|
292
|
|
- if behaviour.frequency != 1:
|
293
|
|
- if self.current_cycle % behaviour.frequency:
|
294
|
|
- behaviours.remove(behaviour)
|
295
|
|
-
|
296
|
|
- return behaviours
|
|
341
|
+ # Attention: c'est utilisé dans le main process aussi, pertinent de la faire là ?
|
|
342
|
+ try:
|
|
343
|
+ return self.subject_behaviours_cache[subject.id]
|
|
344
|
+ except KeyError:
|
|
345
|
+ behaviours = {}
|
|
346
|
+ for behaviour_class_id in shared.get('subject_behaviours_index')[subject.id]:
|
|
347
|
+ behaviour_class = self.simulation.index[behaviour_class_id]
|
|
348
|
+ behaviour = behaviour_class(
|
|
349
|
+ self.config,
|
|
350
|
+ self.simulation,
|
|
351
|
+ subject,
|
|
352
|
+ )
|
|
353
|
+ behaviours[behaviour_class.__name__] = behaviour
|
|
354
|
+ self.subject_behaviours_cache[subject.id] = behaviours
|
|
355
|
+ return behaviours
|
297
|
356
|
|
298
|
357
|
def apply_actions(
|
299
|
358
|
self,
|
|
@@ -364,3 +423,6 @@ class CycleManager(BaseObject):
|
364
|
423
|
|
365
|
424
|
self.logger.info('{} events generated'.format(len(events)))
|
366
|
425
|
return events
|
|
426
|
+
|
|
427
|
+ def stop(self) -> None:
|
|
428
|
+ self.process_manager.terminate()
|