Browse Source

Process Manager and Chun Manager + tests

Bastien Sevajol 7 years ago
parent
commit
0da2f54c39
6 changed files with 132 additions and 0 deletions
  1. 0 0
      synergine2/__init__.py
  2. 51 0
      synergine2/processing.py
  3. 12 0
      synergine2/utils.py
  4. 4 0
      tests/__init__.py
  5. 38 0
      tests/test_processing.py
  6. 27 0
      tests/test_utils.py

+ 0 - 0
synergine2/__init__.py View File


+ 51 - 0
synergine2/processing.py View File

@@ -0,0 +1,51 @@
1
+import types
2
+from multiprocessing import Process
3
+from multiprocessing import Manager
4
+
5
+from synergine2.utils import ChunkManager
6
+
7
+
8
+class ProcessManager(object):
9
+    def __init__(
10
+            self,
11
+            process_count: int,
12
+            chunk_manager: ChunkManager,
13
+            job_maker: types.FunctionType,
14
+    ):
15
+        self._process_count = process_count
16
+        self._chunk_manager = chunk_manager
17
+        self._job_maker = job_maker
18
+
19
+    def execute_jobs(self, data: list) -> tuple:
20
+        with Manager() as manager:
21
+            processes = list()
22
+            chunks = self._chunk_manager.make_chunks(data)
23
+            results = manager.dict()
24
+
25
+            # TODO: retrouver tests pour savoir si
26
+            # les keeped alive sont mieux
27
+            for process_number in range(self._process_count):
28
+                processes.append(Process(
29
+                    target=self._job_maker_wrapper,
30
+                    args=(
31
+                        process_number,
32
+                        chunks[process_number],
33
+                        results,
34
+                    )
35
+                ))
36
+
37
+            for process in processes:
38
+                process.start()
39
+
40
+            for process in processes:
41
+                process.join()
42
+
43
+            return results.values()
44
+
45
+    def _job_maker_wrapper(
46
+            self,
47
+            process_number: int,
48
+            chunk: list,
49
+            results: dict,
50
+    ):
51
+        results[process_number] = self._job_maker(chunk)

+ 12 - 0
synergine2/utils.py View File

@@ -0,0 +1,12 @@
1
+
2
+
3
+class ChunkManager(object):
4
+    def __init__(self, chunks_numbers: int):
5
+        self._chunks_numbers = chunks_numbers
6
+
7
+    def make_chunks(self, data: list) -> list:
8
+        i, j, x = len(data), 0, []
9
+        for k in range(self._chunks_numbers):
10
+            a, j = j, j + (i + k) // self._chunks_numbers
11
+            x.append(data[a:j])
12
+        return x

+ 4 - 0
tests/__init__.py View File

@@ -0,0 +1,4 @@
1
+
2
+
3
+class BaseTest(object):
4
+    pass

+ 38 - 0
tests/test_processing.py View File

@@ -0,0 +1,38 @@
1
+import os
2
+
3
+from synergine2.processing import ProcessManager
4
+from synergine2.utils import ChunkManager
5
+from tests import BaseTest
6
+
7
+
8
+class TestProcessing(BaseTest):
9
+    @staticmethod
10
+    def _make_job(data_chunk: list) -> tuple:
11
+        current_pid = os.getpid()
12
+        result = sum(data_chunk)
13
+        return current_pid, result
14
+
15
+    def test_parallel_jobs(self):
16
+        chunk_manager = ChunkManager(4)
17
+        process_manager = ProcessManager(
18
+            process_count=4,
19
+            chunk_manager=chunk_manager,
20
+            job_maker=self._make_job,
21
+        )
22
+
23
+        data = list(range(100))
24
+        process_id_list = []
25
+        final_result = 0
26
+
27
+        results = process_manager.execute_jobs(data)
28
+
29
+        for process_id, result in results:
30
+            final_result += result
31
+            process_id_list.append(process_id)
32
+
33
+        # Test each process ids are differents
34
+        assert sorted(process_id_list) == \
35
+            sorted(list(set(process_id_list)))
36
+
37
+        # Goal is 4950
38
+        assert final_result == 4950

+ 27 - 0
tests/test_utils.py View File

@@ -0,0 +1,27 @@
1
+from synergine2.utils import ChunkManager
2
+from tests import BaseTest
3
+
4
+
5
+class TestUtils(BaseTest):
6
+    def test_chunk_manager_round(self):
7
+        chunk_manager = ChunkManager(4)
8
+        data = list(range(100))
9
+
10
+        chunks = chunk_manager.make_chunks(data)
11
+
12
+        assert len(chunks) == 4
13
+        for chunk in chunks:
14
+            assert len(chunk) == 25
15
+
16
+    def test_chunk_manager_not_round(self):
17
+        chunk_manager = ChunkManager(4)
18
+        data = list(range(101))
19
+
20
+        chunks = chunk_manager.make_chunks(data)
21
+
22
+        assert len(chunks) == 4
23
+        for chunk_number, chunk in enumerate(chunks):
24
+            if chunk_number == 3:
25
+                assert len(chunk) == 26
26
+            else:
27
+                assert len(chunk) == 25