Bastien Sevajol il y a 7 ans
Parent
révision
b2299866bd
3 fichiers modifiés avec 158 ajouts et 0 suppressions
  1. 26 0
      sandbox/print_terminal.py
  2. 98 0
      synergine2/terminals.py
  3. 34 0
      tests/test_terminals.py

+ 26 - 0
sandbox/print_terminal.py Voir le fichier

@@ -0,0 +1,26 @@
1
+import time
2
+import sys
3
+
4
+sys.path.append('../')
5
+
6
+from synergine2.terminals import Terminal, TerminalPackage, TerminalManager
7
+
8
+
9
+class PrintTerminal(Terminal):
10
+    def receive(self, package: TerminalPackage):
11
+        print(package.value)
12
+        sys.stdout.flush()
13
+
14
+    def daemon(self):
15
+        while self.read():
16
+            print('Hello world')
17
+            sys.stdout.flush()
18
+            time.sleep(1)
19
+
20
+
21
+terminals_manager = TerminalManager(terminals=[PrintTerminal()])
22
+for i in range(3):
23
+    time.sleep(2)
24
+    terminals_manager.send(TerminalPackage('Just print me'))
25
+
26
+terminals_manager.stop()

+ 98 - 0
synergine2/terminals.py Voir le fichier

@@ -0,0 +1,98 @@
1
+from multiprocessing import Queue
2
+
3
+from multiprocessing import Process
4
+from queue import Empty
5
+
6
+import time
7
+
8
+STOP_SIGNAL = '__STOP_SIGNAL__'
9
+
10
+
11
+class TerminalPackage(object):
12
+    def __init__(self, value):
13
+        self._value = value
14
+
15
+    @property
16
+    def value(self):
17
+        return self._value
18
+
19
+
20
+class Terminal(object):
21
+    """Default behaviour is to do nothing.
22
+    DEFAULT_SLEEP is sleep time between each queue read"""
23
+    DEFAULT_SLEEP = 1
24
+
25
+    def __init__(self):
26
+        self._input_queue = None
27
+        self._output_queue = None
28
+        self._stop_required = False
29
+
30
+    def start(self, input_queue: Queue, output_queue: Queue) -> None:
31
+        self._input_queue = input_queue
32
+        self._output_queue = output_queue
33
+        self.run()
34
+
35
+    def run(self):
36
+        """
37
+        Override this method to create your daemon terminal
38
+        """
39
+        while self.read():
40
+            time.sleep(self.DEFAULT_SLEEP)
41
+
42
+    def read(self):
43
+        while True:
44
+            try:
45
+                package = self._input_queue.get(block=False, timeout=None)
46
+                if package == STOP_SIGNAL:
47
+                    self._stop_required = True
48
+                    return False
49
+
50
+                self.receive(package)
51
+            except Empty:
52
+                return True  # Finished to read Queue
53
+
54
+    def receive(self, package: TerminalPackage):
55
+        raise NotImplementedError()
56
+
57
+    def send(self, package: TerminalPackage):
58
+        self._output_queue.put(package)
59
+
60
+
61
+class TerminalManager(object):
62
+    def __init__(self, terminals: [Terminal]):
63
+        self._terminals = terminals
64
+        self._outputs_queues = []
65
+        self._inputs_queues = []
66
+
67
+    def start(self) -> None:
68
+        for terminal in self._terminals:
69
+            output_queue = Queue()
70
+            self._outputs_queues.append(output_queue)
71
+
72
+            input_queue = Queue()
73
+            self._inputs_queues.append(input_queue)
74
+
75
+            process = Process(target=terminal.start, kwargs=dict(
76
+                input_queue=output_queue,
77
+                output_queue=input_queue,
78
+            ))
79
+            process.start()
80
+
81
+    def stop(self):
82
+        for output_queue in self._outputs_queues:
83
+            output_queue.put(STOP_SIGNAL)
84
+
85
+    def send(self, package: TerminalPackage):
86
+        for output_queue in self._outputs_queues:
87
+            output_queue.put(package)
88
+
89
+    def receive(self) -> []:
90
+        packages = []
91
+        for input_queue in self._inputs_queues:
92
+            try:
93
+                while True:
94
+                    packages.append(input_queue.get(block=False, timeout=None))
95
+            except Empty:
96
+                pass  # Queue is empty
97
+
98
+        return packages

+ 34 - 0
tests/test_terminals.py Voir le fichier

@@ -0,0 +1,34 @@
1
+import time
2
+
3
+from synergine2.terminals import Terminal
4
+from synergine2.terminals import TerminalPackage
5
+from synergine2.terminals import TerminalManager
6
+from tests import BaseTest
7
+
8
+
9
+class FakeTerminal(Terminal):
10
+    def receive(self, package: TerminalPackage):
11
+        self.send(TerminalPackage(package.value * 2))
12
+        self.send(TerminalPackage(package.value * 4))
13
+
14
+
15
+class TestTerminals(BaseTest):
16
+    def test_terminals_communications(self):
17
+        terminals_manager = TerminalManager(
18
+            terminals=[
19
+                FakeTerminal(),
20
+            ]
21
+        )
22
+        terminals_manager.start()
23
+        terminals_manager.send(TerminalPackage(42))
24
+
25
+        time.sleep(2)  # TODO: Replace by lock
26
+        packages = terminals_manager.receive()
27
+
28
+        assert 2 == len(packages)
29
+        values = [p.value for p in packages]
30
+        assert 84 in values
31
+        assert 168 in values
32
+
33
+        terminals_manager.stop()  # pytest must execute this if have fail
34
+        # TODO: Tester avec plusieurs terminaux