Pārlūkot izejas kodu

share data with memcached

Bastien Sevajol 6 gadus atpakaļ
vecāks
revīzija
9660f25c57
6 mainītis faili ar 320 papildinājumiem un 86 dzēšanām
  1. 1 0
      requirements.txt
  2. 1 0
      synergine2/cycle.py
  3. 110 37
      synergine2/processing.py
  4. 57 0
      synergine2/share.py
  5. 105 49
      tests/test_processing.py
  6. 46 0
      tests/test_share.py

+ 1 - 0
requirements.txt Parādīt failu

@@ -17,3 +17,4 @@ pytest-xdist==1.16.0
17 17
 PyYAML==3.12
18 18
 six==1.10.0
19 19
 typing==3.6.1
20
+pylibmc==1.5.2

+ 1 - 0
synergine2/cycle.py Parādīt failu

@@ -24,6 +24,7 @@ class CycleManager(BaseObject):
24 24
     ):
25 25
         if process_manager is None:
26 26
             process_manager = ProcessManager(
27
+                config=config,
27 28
                 # TODO: Changer de config de merde (core.use_x_cores)
28 29
                 process_count=config.get('core', {}).get('use_x_cores', multiprocessing.cpu_count()),
29 30
                 chunk_manager=ChunkManager(multiprocessing.cpu_count()),

+ 110 - 37
synergine2/processing.py Parādīt failu

@@ -1,47 +1,120 @@
1 1
 # coding: utf-8
2
-import types
3
-from multiprocessing import Pool
2
+import typing
3
+from multiprocessing import Process
4
+from multiprocessing.connection import Connection
5
+from multiprocessing.connection import Pipe
4 6
 
5 7
 from synergine2.base import BaseObject
6
-from synergine2.utils import ChunkManager
8
+from synergine2.config import Config
9
+from synergine2.share import SharedDataManager
10
+
11
+STOP = '__STOP__'
12
+
13
+# global shared manager
14
+shared_data = SharedDataManager()
15
+
16
+
17
+# TODO: se jobs
18
+class Job(object):
19
+    pass
20
+
21
+
22
+class Worker(object):
23
+    def __init__(
24
+        self,
25
+        config: Config,
26
+        real_job: typing.Callable[..., typing.Any],
27
+    ) -> None:
28
+        self.config = config
29
+
30
+        local_read_pipe, local_write_pipe = Pipe(duplex=False)
31
+        process_read_pipe, process_write_pipe = Pipe(duplex=False)
32
+
33
+        self.local_read_pipe = local_read_pipe  # type: Connection
34
+        self.local_write_pipe = local_write_pipe  # type: Connection
35
+        self.process_read_pipe = process_read_pipe  # type: Connection
36
+        self.process_write_pipe = process_write_pipe  # type: Connection
37
+
38
+        self.real_job = real_job
39
+        self.process = Process(
40
+            target=self.work,
41
+            args=(
42
+                self.local_write_pipe,
43
+                self.process_read_pipe,
44
+            )
45
+        )
46
+        self.db = None  # type: RedisDatabase
47
+        self.process.start()
48
+
49
+    def work(self, *args, **kwargs):
50
+        while True:
51
+            message = self.process_read_pipe.recv()
52
+            if message == STOP:
53
+                return
54
+
55
+            result = self.real_job(message)
56
+            self.local_write_pipe.send(result)
7 57
 
8 58
 
9 59
 class ProcessManager(BaseObject):
10 60
     def __init__(
11 61
             self,
62
+            config: Config,
12 63
             process_count: int,
13
-            chunk_manager: ChunkManager,
14
-    ):
64
+            job: typing.Callable[..., typing.Any],
65
+    ) -> None:
66
+        self.config = config
15 67
         self._process_count = process_count
16
-        self._chunk_manager = chunk_manager
17
-        self.pool = Pool(processes=self._process_count)
18
-
19
-    def __getstate__(self):
20
-        self_dict = self.__dict__.copy()
21
-        self_dict['pool'] = None
22
-        return self_dict
23
-
24
-    def chunk_and_execute_jobs(self, data: list, job_maker: types.FunctionType) -> list:
25
-        chunks = self._chunk_manager.make_chunks(data)
26
-
27
-        if self._process_count > 1:
28
-            print('USE POOL')
29
-            results = self.pool.starmap(job_maker, [(chunk, i, self._process_count) for i, chunk in enumerate(chunks)])
30
-        else:
31
-            print('USE MONO')
32
-            results = [job_maker(data, 0, 1)]
33
-
34
-        return results
35
-
36
-    def execute_jobs(self, data: object, job_maker: types.FunctionType) -> list:
37
-        # TODO: Is there a reason to make multiprocessing here ? data is not chunked ...
38
-        if self._process_count > 1:
39
-            results = self.pool.starmap(job_maker, [(data, i, self._process_count) for i in range(self._process_count)])
40
-        else:
41
-            results = [job_maker(data, 0, 1)]
42
-
43
-        return results
44
-
45
-    def __del__(self):
46
-        if self.pool:
47
-            self.pool.terminate()
68
+        self.workers = []
69
+        self.start_workers(process_count, job)
70
+
71
+    def start_workers(self, worker_count: int, job: typing.Callable[..., typing.Any]) -> None:
72
+        assert not self.workers
73
+        for i in range(worker_count):
74
+            self.workers.append(Worker(self.config, job))
75
+
76
+    def make_them_work(self, message: typing.Any) -> 'TODO':
77
+        responses = []
78
+
79
+        for worker in self.workers:
80
+            worker.process_write_pipe.send(message)
81
+
82
+        for worker in self.workers:
83
+            responses.append(worker.local_read_pipe.recv())
84
+
85
+        return responses
86
+
87
+    def terminate(self) -> None:
88
+        for worker in self.workers:
89
+            worker.process_write_pipe.send(STOP)
90
+
91
+        for worker in self.workers:
92
+            worker.process.join()
93
+
94
+    #
95
+    # def chunk_and_execute_jobs(self, data: list, job_maker: types.FunctionType) -> list:
96
+    #     chunks = self._chunk_manager.make_chunks(data)
97
+    #
98
+    #     if self._process_count > 1:
99
+    #         print('USE POOL')
100
+    #         results = self.pool.starmap(job_maker, [(chunk, i, self._process_count) for i, chunk in enumerate(chunks)])
101
+    #     else:
102
+    #         print('USE MONO')
103
+    #         results = [job_maker(data, 0, 1)]
104
+    #
105
+    #     return results
106
+    #
107
+    # def execute_jobs(self, data: object, job_maker: types.FunctionType) -> list:
108
+    #     # TODO: Is there a reason to make multiprocessing here ? data is not chunked ...
109
+    #     if self._process_count > 1:
110
+    #         results = self.pool.starmap(job_maker, [(data, i, self._process_count) for i in range(self._process_count)])
111
+    #     else:
112
+    #         results = [job_maker(data, 0, 1)]
113
+    #
114
+    #     return results
115
+    #
116
+    # def __del__(self):
117
+    #     # TODO: DEV
118
+    #     return
119
+    #     if self.pool:
120
+    #         self.pool.terminate()

+ 57 - 0
synergine2/share.py Parādīt failu

@@ -0,0 +1,57 @@
1
+# coding: utf-8
2
+import typing
3
+
4
+import pylibmc
5
+
6
+from synergine2.exceptions import SynergineException
7
+
8
+
9
+class SharedDataManager(object):
10
+    """
11
+    This object is designed to own shared memory between processes. It must be feed (with set method) before
12
+    start of processes. Processes will only be able to access shared memory filled here before start.
13
+    """
14
+    def __init__(self):
15
+        self._mc = pylibmc.Client(['127.0.0.1'], binary=True, behaviors={"tcp_nodelay": True, "ketama": True})
16
+
17
+    def set(self, key: str, value: typing.Any) -> None:
18
+        self._mc.set(key, value)
19
+
20
+    def get(self, key) -> typing.Any:
21
+        return self._mc.get(key)
22
+
23
+    def create(
24
+        self,
25
+        key: str,
26
+        value,
27
+        indexes=None,
28
+    ):
29
+        def get_key(obj):
30
+            return key
31
+
32
+        def get_key_with_id(obj):
33
+            return key.format(id=obj.id)
34
+
35
+        if '{id}' in key:
36
+            key_formatter = get_key_with_id
37
+        else:
38
+            self.set(key, value)
39
+            key_formatter = get_key
40
+
41
+        def fget(self_):
42
+            return self.get(key)
43
+
44
+        def fset(self_, value_):
45
+            self.set(key_formatter(self_), value_)
46
+
47
+        def fdel(self_):
48
+            raise SynergineException('You cannot delete a shared data')
49
+
50
+        shared_property = property(
51
+            fget=fget,
52
+            fset=fset,
53
+            fdel=fdel,
54
+        )
55
+
56
+        return shared_property
57
+

+ 105 - 49
tests/test_processing.py Parādīt failu

@@ -1,9 +1,15 @@
1 1
 # coding: utf-8
2
-import os
2
+import ctypes
3
+import multiprocessing
3 4
 import psutil
5
+from multiprocessing import Manager
6
+from multiprocessing import Array
4 7
 
8
+import pytest
9
+
10
+from synergine2.config import Config
5 11
 from synergine2.processing import ProcessManager
6
-from synergine2.utils import ChunkManager
12
+from synergine2.share import SharedDataManager
7 13
 from tests import BaseTest
8 14
 
9 15
 available_cores = len(psutil.Process().cpu_affinity())
@@ -17,85 +23,135 @@ class MyFakeClass(object):
17 23
 class TestProcessing(BaseTest):
18 24
     def make_job_with_scalar(
19 25
             self,
20
-            data_chunk: list,
21
-            process_number: int,
22
-            process_count: int,
23
-    ) -> tuple:
24
-        current_pid = os.getpid()
25
-        result = sum(data_chunk)
26
-        return current_pid, result
26
+            data: list,
27
+    ):
28
+        result = sum(data)
29
+        return result
27 30
 
28 31
     def make_job_with_object(
29 32
             self,
30
-            data_chunk: list,
31
-            process_number: int,
32
-            process_count: int,
33
-    ) -> tuple:
34
-        current_pid = os.getpid()
35
-        data = [o.value for o in data_chunk]
33
+            data: list,
34
+    ):
35
+        data = [o.value for o in data]
36 36
         result = sum(data)
37
-        return current_pid, MyFakeClass(result)
37
+        return MyFakeClass(result)
38 38
 
39 39
     def test_parallel_jobs_with_scalar(self):
40
-        chunk_manager = ChunkManager(available_cores)
41 40
         process_manager = ProcessManager(
41
+            config=Config({}),
42 42
             process_count=available_cores,
43
-            chunk_manager=chunk_manager,
43
+            job=self.make_job_with_scalar,
44 44
         )
45 45
 
46 46
         data = list(range(100))
47
-        process_id_list = []
48
-        final_result = 0
49 47
 
50
-        results = process_manager.chunk_and_execute_jobs(
51
-            data,
52
-            job_maker=self.make_job_with_scalar,
53
-        )
54
-
55
-        for process_id, result in results:
56
-            final_result += result
57
-            process_id_list.append(process_id)
48
+        results = process_manager.make_them_work(data)
49
+        process_manager.terminate()
58 50
 
59
-        # Goal is 4950
60
-        assert final_result == 4950
51
+        assert sum(results) == 39600
61 52
 
62 53
     def test_non_parallel_jobs_with_scalar(self):
63
-        chunk_manager = ChunkManager(1)
54
+        # TODO: process manager utilise actuellement un cpu quand même, changer ca
64 55
         process_manager = ProcessManager(
56
+            config=Config({}),
65 57
             process_count=1,
66
-            chunk_manager=chunk_manager,
58
+            job=self.make_job_with_scalar,
67 59
         )
68 60
 
69 61
         data = list(range(100))
70
-        results = process_manager.chunk_and_execute_jobs(
71
-            data,
72
-            job_maker=self.make_job_with_scalar,
73
-        )
74
-        process_id, final_result = results[0]
62
+        results = process_manager.make_them_work(data)
63
+        process_manager.terminate()
64
+        final_result = results[0]
75 65
 
76 66
         assert len(results) == 1
77
-        assert process_id == os.getpid()
78 67
         assert final_result == 4950
79 68
 
80 69
     def test_parallel_jobs_with_objects(self):
81
-        chunk_manager = ChunkManager(available_cores)
82 70
         process_manager = ProcessManager(
71
+            config=Config({}),
83 72
             process_count=available_cores,
84
-            chunk_manager=chunk_manager,
73
+            job=self.make_job_with_object,
85 74
         )
86 75
 
87 76
         data = [MyFakeClass(v) for v in range(100)]
88
-        process_id_list = []
89 77
         final_result = 0
90 78
 
91
-        results = process_manager.chunk_and_execute_jobs(
92
-            data,
93
-            job_maker=self.make_job_with_object,
94
-        )
79
+        results = process_manager.make_them_work(data)
80
+        process_manager.terminate()
95 81
 
96
-        for process_id, result_object in results:
82
+        for result_object in results:
97 83
             final_result += result_object.value
98
-            process_id_list.append(process_id)
99 84
 
100
-        # Goal is 4950
101
-        assert final_result == 4950
85
+        assert final_result == 39600
86
+
87
+    def test_shared_memory_with_shared_manager(self):
88
+        shared = SharedDataManager()
89
+        shared.set('counter', 42)
90
+
91
+        def job(*args, **kwargs):
92
+            return shared.get('counter') + 1
93
+
94
+        process_manager = ProcessManager(
95
+            config=Config({}),
96
+            process_count=available_cores,
97
+            job=job,
98
+        )
99
+
100
+        results = process_manager.make_them_work(None)
101
+        process_manager.terminate()
102
+
103
+        assert results[0] == 43
104
+
105
+    def test_share_data_with_function(self):
106
+        shared = SharedDataManager()
107
+
108
+        class Foo(object):
109
+            counter = shared.create('counter', 0)
110
+
111
+        def job(*args, **kwargs):
112
+            return shared.get('counter') + 1
113
+
114
+        process_manager = ProcessManager(
115
+            config=Config({}),
116
+            process_count=available_cores,
117
+            job=job,
118
+        )
119
+
120
+        foo = Foo()
121
+        foo.counter = 42
122
+
123
+        results = process_manager.make_them_work(None)
124
+        assert results[0] == 43
125
+
126
+        foo.counter = 45
127
+
128
+        results = process_manager.make_them_work(None)
129
+        assert results[0] == 46
130
+
131
+        process_manager.terminate()
132
+
133
+    def test_after_created_shared_data(self):
134
+        shared = SharedDataManager()
135
+
136
+        shared.set('foo_1', 0)
137
+
138
+        def job(key):
139
+            return shared.get('foo_{}'.format(key)) + 1
140
+
141
+        process_manager = ProcessManager(
142
+            config=Config({}),
143
+            process_count=available_cores,
144
+            job=job,
145
+        )
146
+
147
+        shared.set('foo_1', 42)
148
+
149
+        results = process_manager.make_them_work('1')
150
+        assert results[0] == 43
151
+
152
+        shared.set('foo_2', 52)
153
+
154
+        results = process_manager.make_them_work('2')
155
+        assert results[0] == 53
156
+
157
+        process_manager.terminate()

+ 46 - 0
tests/test_share.py Parādīt failu

@@ -0,0 +1,46 @@
1
+# coding: utf-8
2
+from synergine2.share import SharedDataManager
3
+from tests import BaseTest
4
+
5
+
6
+class TestShare(BaseTest):
7
+    def test_simple_share_with_class(self):
8
+        shared = SharedDataManager()
9
+
10
+        class Foo(object):
11
+            counter = shared.create('counter', 0)
12
+
13
+        foo = Foo()
14
+        foo.counter = 42
15
+
16
+        assert shared.get('counter') == 42
17
+
18
+        foo.counter = 48
19
+
20
+        assert shared.get('counter') == 48
21
+
22
+    def test_dynamic_key(self):
23
+        shared = SharedDataManager()
24
+
25
+        class Foo(object):
26
+            counter = shared.create(
27
+                '{id}_counter',
28
+                (0, 0, 0),
29
+                indexes=[],
30
+            )
31
+
32
+            @property
33
+            def id(self):
34
+                return id(self)
35
+
36
+        foo = Foo()
37
+        foo.counter = 42
38
+
39
+        assert shared.get('{}_counter'.format(foo.id)) == 42
40
+
41
+        foo.counter = 48
42
+
43
+        assert shared.get('{}_counter'.format(foo.id)) == 48
44
+
45
+    def test_indexes(self):
46
+        pass