Browse Source

Change SharedDataManager to optimized way with synergine

Bastien Sevajol 6 years ago
parent
commit
63093bbebc
4 changed files with 99 additions and 5 deletions
  1. 4 0
      synergine2/exceptions.py
  2. 26 5
      synergine2/share.py
  3. 8 0
      tests/test_processing.py
  4. 61 0
      tests/test_share.py

+ 4 - 0
synergine2/exceptions.py View File

13
 
13
 
14
 class ConfigurationError(SynergineException):
14
 class ConfigurationError(SynergineException):
15
     pass
15
     pass
16
+
17
+
18
+class UnknownSharedData(SynergineException):
19
+    pass

+ 26 - 5
synergine2/share.py View File

5
 import redis
5
 import redis
6
 
6
 
7
 from synergine2.exceptions import SynergineException
7
 from synergine2.exceptions import SynergineException
8
+from synergine2.exceptions import UnknownSharedData
8
 
9
 
9
 
10
 
10
 class SharedDataManager(object):
11
 class SharedDataManager(object):
12
     This object is designed to own shared memory between processes. It must be feed (with set method) before
13
     This object is designed to own shared memory between processes. It must be feed (with set method) before
13
     start of processes. Processes will only be able to access shared memory filled here before start.
14
     start of processes. Processes will only be able to access shared memory filled here before start.
14
     """
15
     """
15
-    def __init__(self):
16
+    def __init__(self, clear: bool=True):
16
         self._r = redis.StrictRedis(host='localhost', port=6379, db=0)  # TODO: configs
17
         self._r = redis.StrictRedis(host='localhost', port=6379, db=0)  # TODO: configs
17
-        # TODO: Il faut écrire dans REDIS que lorsque l'on veut passer à l'étape processes, genre de commit
18
-        # sinon on va ecrire dans redis a chaque fois qu'on modifie une shared data c'est pas optimal.
18
+
19
+        self._data = {}
20
+        self._modified_keys = set()
21
+
22
+        if clear:
23
+            self._r.flushdb()
19
 
24
 
20
     def set(self, key: str, value: typing.Any) -> None:
25
     def set(self, key: str, value: typing.Any) -> None:
21
-        self._r.set(key, pickle.dumps(value))
26
+        self._data[key] = value
27
+        self._modified_keys.add(key)
22
 
28
 
23
     def get(self, key) -> typing.Any:
29
     def get(self, key) -> typing.Any:
24
-        return pickle.loads(self._r.get(key))
30
+        if key not in self._data:
31
+            b_value = self._r.get(key)
32
+            if b_value is None:
33
+                # We not allow None value storage
34
+                raise UnknownSharedData('No shared data for key "{}"'.format(key))
35
+            self._data[key] = pickle.loads(b_value)
36
+
37
+        return self._data[key]
38
+
39
+    def commit(self) -> None:
40
+        for key in self._modified_keys:
41
+            self._r.set(key, pickle.dumps(self.get(key)))
42
+        self._modified_keys = set()
43
+
44
+    def refresh(self) -> None:
45
+        self._data = {}
25
 
46
 
26
     def create(
47
     def create(
27
         self,
48
         self,

+ 8 - 0
tests/test_processing.py View File

92
     def test_shared_memory_with_shared_manager(self):
92
     def test_shared_memory_with_shared_manager(self):
93
         shared = SharedDataManager()
93
         shared = SharedDataManager()
94
         shared.set('counter', 42)
94
         shared.set('counter', 42)
95
+        shared.commit()
95
 
96
 
96
         def job(*args, **kwargs):
97
         def job(*args, **kwargs):
98
+            shared.refresh()
97
             counter = shared.get('counter') or 0
99
             counter = shared.get('counter') or 0
98
             return counter + 1
100
             return counter + 1
99
 
101
 
116
             counter = shared.create('counter', 0)
118
             counter = shared.create('counter', 0)
117
 
119
 
118
         def job(*args, **kwargs):
120
         def job(*args, **kwargs):
121
+            shared.refresh()
119
             counter = shared.get('counter') or 0
122
             counter = shared.get('counter') or 0
120
             return counter + 1
123
             return counter + 1
121
 
124
 
127
 
130
 
128
         foo = Foo()
131
         foo = Foo()
129
         foo.counter = 42
132
         foo.counter = 42
133
+        shared.commit()
130
 
134
 
131
         results = process_manager.make_them_work(None)
135
         results = process_manager.make_them_work(None)
132
         assert results[0] == 43
136
         assert results[0] == 43
133
 
137
 
134
         foo.counter = 45
138
         foo.counter = 45
139
+        shared.commit()
135
 
140
 
136
         results = process_manager.make_them_work(None)
141
         results = process_manager.make_them_work(None)
137
         assert results[0] == 46
142
         assert results[0] == 46
145
         shared.set('foo_1', 0)
150
         shared.set('foo_1', 0)
146
 
151
 
147
         def job(key):
152
         def job(key):
153
+            shared.refresh()
148
             value = shared.get('foo_{}'.format(key)) or 0
154
             value = shared.get('foo_{}'.format(key)) or 0
149
             return value + 1
155
             return value + 1
150
 
156
 
155
         )
161
         )
156
 
162
 
157
         shared.set('foo_1', 42)
163
         shared.set('foo_1', 42)
164
+        shared.commit()
158
 
165
 
159
         results = process_manager.make_them_work('1')
166
         results = process_manager.make_them_work('1')
160
         assert results[0] == 43
167
         assert results[0] == 43
161
 
168
 
162
         shared.set('foo_2', 52)
169
         shared.set('foo_2', 52)
170
+        shared.commit()
163
 
171
 
164
         results = process_manager.make_them_work('2')
172
         results = process_manager.make_them_work('2')
165
         assert results[0] == 53
173
         assert results[0] == 53

+ 61 - 0
tests/test_share.py View File

1
 # coding: utf-8
1
 # coding: utf-8
2
+import pytest
3
+
4
+from synergine2.exceptions import UnknownSharedData
2
 from synergine2.share import SharedDataManager
5
 from synergine2.share import SharedDataManager
3
 from tests import BaseTest
6
 from tests import BaseTest
4
 
7
 
42
 
45
 
43
         assert shared.get('{}_counter'.format(foo.id)) == 48
46
         assert shared.get('{}_counter'.format(foo.id)) == 48
44
 
47
 
48
+    def test_update_dict_with_pointer(self):
49
+        shared = SharedDataManager()
50
+
51
+        class Foo(object):
52
+            data = shared.create('data', {})
53
+
54
+        foo = Foo()
55
+        foo.data = {'foo': 'bar'}
56
+
57
+        assert shared.get('data') == {'foo': 'bar'}
58
+
59
+        foo.data['foo'] = 'buz'
60
+        assert shared.get('data') == {'foo': 'buz'}
61
+
62
+    def test_refresh_without_commit(self):
63
+        shared = SharedDataManager()
64
+
65
+        class Foo(object):
66
+            counter = shared.create('counter', 0)
67
+
68
+        foo = Foo()
69
+        foo.counter = 42
70
+
71
+        assert shared.get('counter') == 42
72
+
73
+        shared.refresh()
74
+        with pytest.raises(UnknownSharedData):
75
+            shared.get('counter')
76
+
77
+    def test_commit(self):
78
+        shared = SharedDataManager()
79
+
80
+        class Foo(object):
81
+            counter = shared.create('counter', 0)
82
+
83
+        foo = Foo()
84
+        foo.counter = 42
85
+
86
+        assert shared.get('counter') == 42
87
+
88
+        shared.commit()
89
+        assert shared.get('counter') == 42
90
+
91
+    def test_commit_then_refresh(self):
92
+        shared = SharedDataManager()
93
+
94
+        class Foo(object):
95
+            counter = shared.create('counter', 0)
96
+
97
+        foo = Foo()
98
+        foo.counter = 42
99
+
100
+        assert shared.get('counter') == 42
101
+
102
+        shared.commit()
103
+        shared.refresh()
104
+        assert shared.get('counter') == 42
105
+
45
     def test_indexes(self):
106
     def test_indexes(self):
46
         pass
107
         pass