Browse Source

Shared manager with redis

Bastien Sevajol 6 years ago
parent
commit
dd7ad18ffd
3 changed files with 23 additions and 8 deletions
  1. 3 1
      requirements.txt
  2. 7 4
      synergine2/share.py
  3. 13 3
      tests/test_processing.py

+ 3 - 1
requirements.txt View File

@@ -13,8 +13,10 @@ pymlconf==0.7.0
13 13
 pyparsing==2.1.10
14 14
 pytest==3.0.4
15 15
 pytest-cov==2.5.1
16
+pytest-timeout==1.2.0
16 17
 pytest-xdist==1.16.0
17 18
 PyYAML==3.12
19
+redis==2.10.6
18 20
 six==1.10.0
21
+tmx==1.9.1
19 22
 typing==3.6.1
20
-pylibmc==1.5.2

+ 7 - 4
synergine2/share.py View File

@@ -1,7 +1,8 @@
1 1
 # coding: utf-8
2
+import pickle
2 3
 import typing
3 4
 
4
-import pylibmc
5
+import redis
5 6
 
6 7
 from synergine2.exceptions import SynergineException
7 8
 
@@ -12,13 +13,15 @@ class SharedDataManager(object):
12 13
     start of processes. Processes will only be able to access shared memory filled here before start.
13 14
     """
14 15
     def __init__(self):
15
-        self._mc = pylibmc.Client(['127.0.0.1'], binary=True, behaviors={"tcp_nodelay": True, "ketama": True})
16
+        self._r = redis.StrictRedis(host='localhost', port=6379, db=0)  # TODO: configs
17
+        # TODO: Il faut écrire dans REDIS que lorsque l'on veut passer à l'étape processes, genre de commit
18
+        # sinon on va ecrire dans redis a chaque fois qu'on modifie une shared data c'est pas optimal.
16 19
 
17 20
     def set(self, key: str, value: typing.Any) -> None:
18
-        self._mc.set(key, value)
21
+        self._r.set(key, pickle.dumps(value))
19 22
 
20 23
     def get(self, key) -> typing.Any:
21
-        return self._mc.get(key)
24
+        return pickle.loads(self._r.get(key))
22 25
 
23 26
     def create(
24 27
         self,

+ 13 - 3
tests/test_processing.py View File

@@ -21,6 +21,7 @@ class MyFakeClass(object):
21 21
 
22 22
 
23 23
 class TestProcessing(BaseTest):
24
+    @pytest.mark.timeout(10)
24 25
     def make_job_with_scalar(
25 26
             self,
26 27
             data: list,
@@ -28,6 +29,7 @@ class TestProcessing(BaseTest):
28 29
         result = sum(data)
29 30
         return result
30 31
 
32
+    @pytest.mark.timeout(10)
31 33
     def make_job_with_object(
32 34
             self,
33 35
             data: list,
@@ -50,6 +52,7 @@ class TestProcessing(BaseTest):
50 52
 
51 53
         assert sum(results) == 39600
52 54
 
55
+    @pytest.mark.timeout(10)
53 56
     def test_non_parallel_jobs_with_scalar(self):
54 57
         # TODO: process manager utilise actuellement un cpu quand même, changer ca
55 58
         process_manager = ProcessManager(
@@ -66,6 +69,7 @@ class TestProcessing(BaseTest):
66 69
         assert len(results) == 1
67 70
         assert final_result == 4950
68 71
 
72
+    @pytest.mark.timeout(10)
69 73
     def test_parallel_jobs_with_objects(self):
70 74
         process_manager = ProcessManager(
71 75
             config=Config({}),
@@ -84,12 +88,14 @@ class TestProcessing(BaseTest):
84 88
 
85 89
         assert final_result == 39600
86 90
 
91
+    @pytest.mark.timeout(10)
87 92
     def test_shared_memory_with_shared_manager(self):
88 93
         shared = SharedDataManager()
89 94
         shared.set('counter', 42)
90 95
 
91 96
         def job(*args, **kwargs):
92
-            return shared.get('counter') + 1
97
+            counter = shared.get('counter') or 0
98
+            return counter + 1
93 99
 
94 100
         process_manager = ProcessManager(
95 101
             config=Config({}),
@@ -102,6 +108,7 @@ class TestProcessing(BaseTest):
102 108
 
103 109
         assert results[0] == 43
104 110
 
111
+    @pytest.mark.timeout(10)
105 112
     def test_share_data_with_function(self):
106 113
         shared = SharedDataManager()
107 114
 
@@ -109,7 +116,8 @@ class TestProcessing(BaseTest):
109 116
             counter = shared.create('counter', 0)
110 117
 
111 118
         def job(*args, **kwargs):
112
-            return shared.get('counter') + 1
119
+            counter = shared.get('counter') or 0
120
+            return counter + 1
113 121
 
114 122
         process_manager = ProcessManager(
115 123
             config=Config({}),
@@ -130,13 +138,15 @@ class TestProcessing(BaseTest):
130 138
 
131 139
         process_manager.terminate()
132 140
 
141
+    @pytest.mark.timeout(10)
133 142
     def test_after_created_shared_data(self):
134 143
         shared = SharedDataManager()
135 144
 
136 145
         shared.set('foo_1', 0)
137 146
 
138 147
         def job(key):
139
-            return shared.get('foo_{}'.format(key)) + 1
148
+            value = shared.get('foo_{}'.format(key)) or 0
149
+            return value + 1
140 150
 
141 151
         process_manager = ProcessManager(
142 152
             config=Config({}),