Browse Source

Introduce optimisation on multiprocessing

Bastien Sevajol 7 years ago
parent
commit
b449543fb1

+ 149 - 0
perf.py View File

@@ -0,0 +1,149 @@
1
+
2
+
3
+"""
4
+Produce performance measurement
5
+"""
6
+import argparse
7
+import multiprocessing
8
+import os
9
+
10
+import time
11
+from collections import OrderedDict
12
+
13
+from sandbox.perf.simulation import ComputeSubject
14
+from synergine2.config import Config
15
+from synergine2.cycle import CycleManager
16
+from synergine2.log import SynergineLogger
17
+from synergine2.simulation import Simulation, Subjects
18
+
19
+
20
+def simulate(complexity, subject_count, cycle_count, cores):
21
+    config = Config(dict(complexity=complexity, use_x_cores=cores))
22
+    simulation = Simulation(config)
23
+
24
+    subjects = Subjects(simulation=simulation)
25
+    for i in range(subject_count):
26
+        subjects.append(ComputeSubject(
27
+            config=config,
28
+            simulation=simulation,
29
+        ))
30
+
31
+    simulation.subjects = subjects
32
+
33
+    cycle_manager = CycleManager(
34
+        config,
35
+        SynergineLogger('perf'),
36
+        simulation=simulation,
37
+    )
38
+
39
+    for j in range(cycle_count):
40
+        cycle_manager.next()
41
+
42
+
43
+def main():
44
+    parser = argparse.ArgumentParser(description='Perf measures')
45
+    parser.add_argument(
46
+        '--max_cores',
47
+        type=int,
48
+        default=0,
49
+        help='number of used cores',
50
+    )
51
+
52
+    args = parser.parse_args()
53
+
54
+    host_cores = multiprocessing.cpu_count()
55
+    retry = 3
56
+    cycles = 200
57
+    subject_counts = [1, 10, 100, 1000, 5000]
58
+    complexities = [100, 2000]
59
+    max_cores = args.max_cores or host_cores
60
+
61
+    results = []
62
+    datas = OrderedDict()
63
+
64
+    for core_i in range(max_cores):
65
+        # if core_i == 0:
66
+        #     continue
67
+        core_count = core_i + 1
68
+        for subject_count in subject_counts:
69
+            for complexity in complexities:
70
+                print('COMPLEXITY: {}, SUBJECTS: {}, CORES: {}'.format(
71
+                    complexity,
72
+                    subject_count,
73
+                    core_count,
74
+                ), end='')
75
+
76
+                durations = []
77
+                for try_i in range(retry):
78
+                    start_time = time.time()
79
+                    simulate(complexity, subject_count, cycles, core_count)
80
+                    durations.append(time.time() - start_time)
81
+                duration = min(durations)
82
+
83
+                result = {
84
+                    'cores': core_count,
85
+                    'complexity': complexity,
86
+                    'subject_count': subject_count,
87
+                    'cycles': cycles,
88
+                    'duration': duration,
89
+                    'duration_cycle': duration / cycles,
90
+                    'duration_subj_complex': (duration / cycles) / (subject_count * complexity),
91
+                }
92
+                results.append(result)
93
+
94
+                print(': {}s, {}s/c, {}s/C'.format(
95
+                    result['duration'],
96
+                    result['duration_cycle'],
97
+                    result['duration_subj_complex'],
98
+                ))
99
+                datas.setdefault(complexity, {}).setdefault(subject_count, {})[core_count] = result['duration_cycle']
100
+
101
+    for d_complexity, c_values in sorted(datas.items(), key=lambda e: int(e[0])):
102
+        data_file_name = 'DATA_{}'.format(str(d_complexity))
103
+        try:
104
+            os.unlink(data_file_name)
105
+        except FileNotFoundError:
106
+            pass
107
+        with open(data_file_name, 'w+') as file:
108
+            file.writelines(['# (COMPLEXITY {}) SUBJECTS CORES_{}\n'.format(
109
+                str(d_complexity),
110
+                ' CORES_'.join(map(str, range(1, max_cores+1))),
111
+            )])
112
+            for d_subject_count, d_cores in c_values.items():
113
+                line = '{} {}\n'.format(
114
+                    str(d_subject_count),
115
+                    ' '.join(map(str, d_cores.values())),
116
+                )
117
+                file.writelines([line])
118
+
119
+        """
120
+        subj_core = []
121
+            for subj, core_v in c_values.items():
122
+                for core_nb in core_v.keys():
123
+                    subj_core.append((subj, core_nb))
124
+            file.writelines(['# (COMPLEXITY {}) SUBJECTS CORES_{}\n'.format(
125
+                str(d_complexity),
126
+                ' '.join([
127
+                    'SUBJ_{}_COR_{}'.format(
128
+                        subj, core_nb,
129
+                    ) for subj, core_nb in subj_core
130
+                ])
131
+            )])
132
+        """
133
+
134
+    for d_complexity, c_values in datas.items():
135
+        print('')
136
+        print('gnuplot -p -e "set title \\"COMPLEXITY_{}\\"; plot {}"'.format(
137
+            str(d_complexity),
138
+            ','.join([
139
+                ' \'DATA_{}\' using 1:{} title \'CORE_{}\' with lines'.format(
140
+                    d_complexity,
141
+                    d_core+1,
142
+                    d_core,
143
+                ) for d_core in range(1, max_cores+1)
144
+            ])
145
+        ))
146
+
147
+
148
+if __name__ == '__main__':
149
+    main()

+ 1 - 1
sandbox/engulf/behaviour.py View File

@@ -57,7 +57,7 @@ class GrassSpotablePositionsMechanism(SimulationMechanism):
57 57
 
58 58
         for position in positions_chunks[process_number]:
59 59
             arounds = get_around_positions_of_positions(position)
60
-            from_subject = self.simulation.subjects.grass_xyz[position]
60
+            from_subject = self.simulation.subjects.grass_xyz[position][0]
61 61
             around_data = {
62 62
                 'from_subject': from_subject,
63 63
                 'around': [],

+ 2 - 0
sandbox/engulf/simulation.py View File

@@ -20,6 +20,8 @@ class EngulfSubjects(XYZSubjects):
20 20
         if isinstance(value, Cell):
21 21
             try:
22 22
                 self.cell_xyz.get(value.position, []).remove(value)
23
+                if not self.cell_xyz[value.position]:
24
+                    del self.cell_xyz[value.position]
23 25
             except ValueError:
24 26
                 pass
25 27
 

+ 2 - 3
sandbox/life_game/run.py View File

@@ -4,14 +4,13 @@ import sys
4 4
 
5 5
 import logging
6 6
 
7
-from synergine2.config import Config
8
-from synergine2.log import get_default_logger
9
-
10 7
 synergine2_ath = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../'))
11 8
 sys.path.append(synergine2_ath)
12 9
 
13 10
 import collections
14 11
 
12
+from synergine2.config import Config
13
+from synergine2.log import get_default_logger
15 14
 from sandbox.life_game.simulation import Cell, LotOfCellsSignalBehaviour, LifeGame, \
16 15
     EmptyPositionWithLotOfCellAroundEvent
17 16
 from sandbox.life_game.simulation import Empty

+ 0 - 0
sandbox/perf/__init__.py View File


+ 53 - 0
sandbox/perf/run.py View File

@@ -0,0 +1,53 @@
1
+from random import seed
2
+import logging
3
+import os
4
+import sys
5
+
6
+synergine2_ath = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../'))
7
+sys.path.append(synergine2_ath)
8
+
9
+
10
+from sandbox.perf.simulation import ComputeSubject
11
+from synergine2.config import Config
12
+from synergine2.core import Core
13
+from synergine2.cycle import CycleManager
14
+from synergine2.log import get_default_logger
15
+from synergine2.simulation import Simulation, Subjects
16
+from synergine2.terminals import TerminalManager
17
+
18
+
19
+def main():
20
+    seed(42)
21
+
22
+    config = Config(dict(complexity=10000))
23
+    logger = get_default_logger(level=logging.ERROR)
24
+
25
+    simulation = Simulation(config)
26
+    subjects = Subjects(simulation=simulation)
27
+    subjects.extend([ComputeSubject(
28
+        config=config,
29
+        simulation=simulation,
30
+    ) for i in range(500)])
31
+    simulation.subjects = subjects
32
+
33
+    core = Core(
34
+        config=config,
35
+        logger=logger,
36
+        simulation=simulation,
37
+        cycle_manager=CycleManager(
38
+            config=config,
39
+            logger=logger,
40
+            simulation=simulation,
41
+        ),
42
+        terminal_manager=TerminalManager(
43
+            config=config,
44
+            logger=logger,
45
+            terminals=[]
46
+        ),
47
+        cycles_per_seconds=1000000,
48
+    )
49
+    core.run()
50
+
51
+
52
+if __name__ == '__main__':
53
+    main()

+ 55 - 0
sandbox/perf/simulation.py View File

@@ -0,0 +1,55 @@
1
+import random
2
+
3
+from synergine2.simulation import SubjectMechanism, SubjectBehaviour, Event, Subject
4
+
5
+
6
+def compute(complexity: int):
7
+    random_floats = []
8
+    result = 0.0
9
+
10
+    for i in range(complexity):
11
+        random_floats.append(random.uniform(0, 100))
12
+
13
+    for j, random_float in enumerate(random_floats):
14
+        if not j % 2:
15
+            result += random_float
16
+        else:
17
+            result -= random_float
18
+
19
+    return result
20
+
21
+
22
+class ComputeMechanism(SubjectMechanism):
23
+    def run(self):
24
+        complexity = self.config.get('complexity')
25
+        value = compute(complexity)
26
+
27
+        return {
28
+            'mechanism_value': value,
29
+            'complexity': complexity,
30
+        }
31
+
32
+
33
+class ComplexityEvent(Event):
34
+    def __init__(self, complexity, *args, **kwargs):
35
+        self.complexity = complexity
36
+
37
+
38
+class ComputeBehaviour(SubjectBehaviour):
39
+    use = [ComputeMechanism]
40
+
41
+    def run(self, data):
42
+        return not data.get(ComputeMechanism).get('mechanism_value') % 2
43
+
44
+    def action(self, data):
45
+        mechanism_value = data.get(ComputeMechanism).get('mechanism_value')
46
+        complexity = data.get(ComputeMechanism).get('complexity')
47
+
48
+        if not int(str(mechanism_value)[-1]) % 2:
49
+            compute(complexity)
50
+
51
+        return [Event(complexity)]
52
+
53
+
54
+class ComputeSubject(Subject):
55
+    behaviours_classes = [ComputeBehaviour]

+ 2 - 1
synergine2/core.py View File

@@ -1,6 +1,7 @@
1 1
 # coding: utf-8
2 2
 import time
3 3
 
4
+from synergine2.base import BaseObject
4 5
 from synergine2.config import Config
5 6
 from synergine2.cycle import CycleManager
6 7
 from synergine2.log import SynergineLogger
@@ -9,7 +10,7 @@ from synergine2.terminals import TerminalManager
9 10
 from synergine2.terminals import TerminalPackage
10 11
 
11 12
 
12
-class Core(object):
13
+class Core(BaseObject):
13 14
     def __init__(
14 15
         self,
15 16
         config: Config,

+ 3 - 2
synergine2/cycle.py View File

@@ -1,6 +1,7 @@
1 1
 # coding: utf-8
2 2
 import multiprocessing
3 3
 
4
+from synergine2.base import BaseObject
4 5
 from synergine2.config import Config
5 6
 from synergine2.log import SynergineLogger
6 7
 from synergine2.processing import ProcessManager
@@ -13,7 +14,7 @@ from synergine2.simulation import Event
13 14
 from synergine2.utils import ChunkManager
14 15
 
15 16
 
16
-class CycleManager(object):
17
+class CycleManager(BaseObject):
17 18
     def __init__(
18 19
             self,
19 20
             config: Config,
@@ -23,7 +24,7 @@ class CycleManager(object):
23 24
     ):
24 25
         if process_manager is None:
25 26
             process_manager = ProcessManager(
26
-                process_count=multiprocessing.cpu_count(),
27
+                process_count=config.get('use_x_cores', multiprocessing.cpu_count()),
27 28
                 chunk_manager=ChunkManager(multiprocessing.cpu_count()),
28 29
             )
29 30
 

+ 15 - 1
synergine2/log.py View File

@@ -1,15 +1,29 @@
1 1
 # coding: utf-8
2 2
 import logging
3
-
4 3
 import sys
5 4
 import typing
6 5
 
6
+STREAM_HANDLER_TAG = '__STREAM_HANDLER__'
7
+
7 8
 
8 9
 class SynergineLogger(logging.getLoggerClass()):
9 10
     @property
10 11
     def is_debug(self) -> bool:
11 12
         return self.isEnabledFor(logging.DEBUG)
12 13
 
14
+    def __getstate__(self):
15
+        # Multiprocessing fail if stream handler present. Remove it if exist.
16
+        # TODO Bug when want to store STREAM_HANDLER_TAG instead stream handler. str still in handlers after
17
+        # __setstate__ ...
18
+        self.handlers = []
19
+
20
+        return self.__dict__.copy()
21
+
22
+    def __setstate__(self, state):
23
+        self.__dict__ = state
24
+        # TODO: This handler is hardcoded, it must depend of real context
25
+        self.handlers.append(logging.StreamHandler(sys.stdout))
26
+
13 27
 
14 28
 def get_default_logger(name: str='synergine', level: int=logging.ERROR) -> SynergineLogger:
15 29
     """

+ 24 - 53
synergine2/processing.py View File

@@ -1,12 +1,12 @@
1 1
 # coding: utf-8
2 2
 import types
3
-from multiprocessing import Process
4
-from multiprocessing import Manager
3
+from multiprocessing import Pool
5 4
 
5
+from synergine2.base import BaseObject
6 6
 from synergine2.utils import ChunkManager
7 7
 
8 8
 
9
-class ProcessManager(object):
9
+class ProcessManager(BaseObject):
10 10
     def __init__(
11 11
             self,
12 12
             process_count: int,
@@ -14,61 +14,32 @@ class ProcessManager(object):
14 14
     ):
15 15
         self._process_count = process_count
16 16
         self._chunk_manager = chunk_manager
17
+        self.pool = Pool(processes=self._process_count)
17 18
 
18
-    def chunk_and_execute_jobs(self, data: list, job_maker: types.FunctionType) -> list:
19
-        with Manager() as manager:
20
-            processes = list()
21
-            chunks = self._chunk_manager.make_chunks(data)
22
-            results = manager.dict()
23
-
24
-            for process_number in range(self._process_count):
25
-                processes.append(Process(
26
-                    target=self._job_maker_wrapper,
27
-                    args=(
28
-                        process_number,
29
-                        chunks[process_number],
30
-                        results,
31
-                        job_maker,
32
-                    )
33
-                ))
19
+    def __getstate__(self):
20
+        self_dict = self.__dict__.copy()
21
+        self_dict['pool'] = None
22
+        return self_dict
34 23
 
35
-            for process in processes:
36
-                process.start()
24
+    def chunk_and_execute_jobs(self, data: list, job_maker: types.FunctionType) -> list:
25
+        chunks = self._chunk_manager.make_chunks(data)
37 26
 
38
-            for process in processes:
39
-                process.join()
27
+        if self._process_count > 1:
28
+            results = self.pool.starmap(job_maker, [(chunk, i, self._process_count) for i, chunk in enumerate(chunks)])
29
+        else:
30
+            results = [job_maker(data, 0, 1)]
40 31
 
41
-            return results.values()
32
+        return results
42 33
 
43 34
     def execute_jobs(self, data: object, job_maker: types.FunctionType) -> list:
44
-        with Manager() as manager:
45
-            processes = list()
46
-            results = manager.dict()
47
-
48
-            for process_number in range(self._process_count):
49
-                processes.append(Process(
50
-                    target=self._job_maker_wrapper,
51
-                    args=(
52
-                        process_number,
53
-                        data,
54
-                        results,
55
-                        job_maker,
56
-                    )
57
-                ))
35
+        # TODO: Is there a reason to make multiprocessing here ? data is not chunked ...
36
+        if self._process_count > 1:
37
+            results = self.pool.starmap(job_maker, [(data, i, self._process_count) for i in range(self._process_count)])
38
+        else:
39
+            results = [job_maker(data, 0, 1)]
58 40
 
59
-            for process in processes:
60
-                process.start()
41
+        return results
61 42
 
62
-            for process in processes:
63
-                process.join()
64
-
65
-            return results.values()
66
-
67
-    def _job_maker_wrapper(
68
-            self,
69
-            process_number: int,
70
-            data: list,
71
-            results: dict,
72
-            job_maker: types.FunctionType,
73
-    ):
74
-        results[process_number] = job_maker(data, process_number, self._process_count)
43
+    def __del__(self):
44
+        if self.pool:
45
+            self.pool.terminate()

+ 2 - 2
synergine2/simulation.py View File

@@ -7,7 +7,7 @@ from synergine2.config import Config
7 7
 from synergine2.utils import get_mechanisms_classes
8 8
 
9 9
 
10
-class Subject(object):
10
+class Subject(BaseObject):
11 11
     collections = []
12 12
     behaviours_classes = []
13 13
     behaviour_selector_class = None  # type: typing.Type[SubjectBehaviourSelector]
@@ -93,7 +93,7 @@ class Subjects(list):
93 93
             self.adds.append(p_object)
94 94
 
95 95
 
96
-class Simulation(object):
96
+class Simulation(BaseObject):
97 97
     accepted_subject_class = Subjects
98 98
     behaviours_classes = []
99 99
 

+ 2 - 2
synergine2/terminals.py View File

@@ -51,7 +51,7 @@ class TerminalPackage(BaseObject):
51 51
         ))
52 52
 
53 53
 
54
-class Terminal(object):
54
+class Terminal(BaseObject):
55 55
     # Default behaviour is to do nothing.
56 56
     # DEFAULT_SLEEP is sleep time between each queue read
57 57
     default_sleep = 1
@@ -140,7 +140,7 @@ class Terminal(object):
140 140
                         handler(event)
141 141
 
142 142
 
143
-class TerminalManager(object):
143
+class TerminalManager(BaseObject):
144 144
     def __init__(
145 145
         self,
146 146
         config: Config,

+ 5 - 1
synergine2/utils.py View File

@@ -1,10 +1,14 @@
1
+from synergine2.base import BaseObject
1 2
 
2 3
 
3
-class ChunkManager(object):
4
+class ChunkManager(BaseObject):
4 5
     def __init__(self, chunks_numbers: int):
5 6
         self._chunks_numbers = chunks_numbers
6 7
 
7 8
     def make_chunks(self, data: list) -> list:
9
+        if self._chunks_numbers == 1:
10
+            return [data]
11
+
8 12
         i, j, x = len(data), 0, []
9 13
         for k in range(self._chunks_numbers):
10 14
             a, j = j, j + (i + k) // self._chunks_numbers

+ 2 - 0
synergine2/xyz.py View File

@@ -219,6 +219,8 @@ class XYZSubjects(Subjects):
219 219
 
220 220
         try:
221 221
             self.xyz.get(value.position, []).remove(value)
222
+            if not self.xyz[value.position]:
223
+                del self.xyz[value.position]
222 224
         except ValueError:
223 225
             pass
224 226
 

+ 18 - 0
tests/test_processing.py View File

@@ -60,6 +60,24 @@ class TestProcessing(BaseTest):
60 60
         # Goal is 4950
61 61
         assert final_result == 4950
62 62
 
63
+    def test_non_parallel_jobs_with_scalar(self):
64
+        chunk_manager = ChunkManager(1)
65
+        process_manager = ProcessManager(
66
+            process_count=1,
67
+            chunk_manager=chunk_manager,
68
+        )
69
+
70
+        data = list(range(100))
71
+        results = process_manager.chunk_and_execute_jobs(
72
+            data,
73
+            job_maker=self._make_job_with_scalar,
74
+        )
75
+        process_id, final_result = results[0]
76
+
77
+        assert len(results) == 1
78
+        assert process_id == os.getpid()
79
+        assert final_result == 4950
80
+
63 81
     def test_parallel_jobs_with_objects(self):
64 82
         chunk_manager = ChunkManager(4)
65 83
         process_manager = ProcessManager(