Browse Source

change stopping tracim strategy: update RQ worker to be stopped and kill tracim process after stop daemons. Closes #166

Bastien Sevajol 8 years ago
parent
commit
3985bba7de

+ 2 - 0
tracim/tracim/config/app_cfg.py View File

22
 
22
 
23
 from tgext.pluggable import plug
23
 from tgext.pluggable import plug
24
 from tgext.pluggable import replace_template
24
 from tgext.pluggable import replace_template
25
+from tracim.lib.system import InterruptManager
25
 
26
 
26
 from tracim.lib.utils import lazy_ugettext as l_
27
 from tracim.lib.utils import lazy_ugettext as l_
27
 
28
 
109
         manager.run('mail_sender', MailSenderDaemon)
110
         manager.run('mail_sender', MailSenderDaemon)
110
 
111
 
111
 environment_loaded.register(lambda: start_daemons(daemons))
112
 environment_loaded.register(lambda: start_daemons(daemons))
113
+interrupt_manager = InterruptManager(daemons_manager=daemons)
112
 
114
 
113
 # Note: here are fake translatable strings that allow to translate messages for reset password email content
115
 # Note: here are fake translatable strings that allow to translate messages for reset password email content
114
 duplicated_email_subject = l_('Password reset request')
116
 duplicated_email_subject = l_('Password reset request')

+ 27 - 11
tracim/tracim/lib/daemons.py View File

1
 import threading
1
 import threading
2
+import collections
2
 from configparser import DuplicateSectionError
3
 from configparser import DuplicateSectionError
3
-from wsgiref.simple_server import make_server
4
-import signal
5
 
4
 
6
-import collections
5
+from wsgiref.simple_server import make_server
7
 
6
 
8
 from radicale import Application as RadicaleApplication
7
 from radicale import Application as RadicaleApplication
9
 from radicale import HTTPServer as BaseRadicaleHTTPServer
8
 from radicale import HTTPServer as BaseRadicaleHTTPServer
13
 from rq import Connection as RQConnection
12
 from rq import Connection as RQConnection
14
 from rq import Worker as BaseRQWorker
13
 from rq import Worker as BaseRQWorker
15
 from redis import Redis
14
 from redis import Redis
15
+from rq.dummy import do_nothing
16
+from rq.worker import StopRequested
16
 
17
 
17
 from tracim.lib.base import logger
18
 from tracim.lib.base import logger
18
 from tracim.lib.exceptions import AlreadyRunningDaemon
19
 from tracim.lib.exceptions import AlreadyRunningDaemon
19
-from tracim.lib.utils import add_signal_handler
20
+
21
+from tracim.tracim.lib.utils import get_rq_queue
20
 
22
 
21
 
23
 
22
 class DaemonsManager(object):
24
 class DaemonsManager(object):
23
     def __init__(self):
25
     def __init__(self):
24
         self._running_daemons = {}
26
         self._running_daemons = {}
25
-        add_signal_handler(signal.SIGTERM, self.stop_all)
26
 
27
 
27
     def run(self, name: str, daemon_class: object, **kwargs) -> None:
28
     def run(self, name: str, daemon_class: object, **kwargs) -> None:
28
         """
29
         """
61
             logger.info(self, 'Stopping daemon with name "{0}": OK'
62
             logger.info(self, 'Stopping daemon with name "{0}": OK'
62
                               .format(name))
63
                               .format(name))
63
 
64
 
64
-    def stop_all(self, *args, **kwargs) -> None:
65
+    def stop_all(self) -> None:
65
         """
66
         """
66
         Stop all started daemons and wait for them.
67
         Stop all started daemons and wait for them.
67
         """
68
         """
71
             daemon.stop()
72
             daemon.stop()
72
 
73
 
73
         for name, daemon in self._running_daemons.items():
74
         for name, daemon in self._running_daemons.items():
75
+            logger.info(
76
+                self,
77
+                'Stopping daemon "{0}" waiting confirmation'.format(name),
78
+            )
74
             daemon.join()
79
             daemon.join()
75
             logger.info(self, 'Stopping daemon "{0}" OK'.format(name))
80
             logger.info(self, 'Stopping daemon "{0}" OK'.format(name))
76
 
81
 
158
         pass
163
         pass
159
 
164
 
160
     def stop(self) -> None:
165
     def stop(self) -> None:
161
-        self.worker.request_stop('TRACIM STOP', None)
166
+        # When _stop_requested at False, tracim.tracim.lib.daemons.RQWorker
167
+        # will raise StopRequested exception in worker thread after receive a
168
+        # job.
169
+        self.worker._stop_requested = True
170
+        queue = get_rq_queue('mail_sender')
171
+        queue.enqueue(do_nothing)
162
 
172
 
163
     def run(self) -> None:
173
     def run(self) -> None:
164
         from tracim.config.app_cfg import CFG
174
         from tracim.config.app_cfg import CFG
175
 
185
 
176
 class RQWorker(BaseRQWorker):
186
 class RQWorker(BaseRQWorker):
177
     def _install_signal_handlers(self):
187
     def _install_signal_handlers(self):
178
-        # TODO BS 20170126: RQ WWorker is designed to work in main thread
188
+        # RQ Worker is designed to work in main thread
179
         # So we have to disable these signals (we implement server stop in
189
         # So we have to disable these signals (we implement server stop in
180
-        # MailSenderDaemon.stop method). When bug
181
-        # https://github.com/tracim/tracim/issues/166 will be fixed, ensure
182
-        # This worker terminate correctly.
190
+        # MailSenderDaemon.stop method).
183
         pass
191
         pass
184
 
192
 
193
+    def dequeue_job_and_maintain_ttl(self, timeout):
194
+        # RQ Worker is designed to work in main thread, so we add behaviour
195
+        # here: if _stop_requested has been set to True, raise the standard way
196
+        # StopRequested exception to stop worker.
197
+        if self._stop_requested:
198
+            raise StopRequested()
199
+        return super().dequeue_job_and_maintain_ttl(timeout)
200
+
185
 
201
 
186
 class RadicaleHTTPSServer(TracimSocketServerMixin, BaseRadicaleHTTPSServer):
202
 class RadicaleHTTPSServer(TracimSocketServerMixin, BaseRadicaleHTTPSServer):
187
     pass
203
     pass

+ 3 - 7
tracim/tracim/lib/email.py View File

6
 
6
 
7
 import typing
7
 import typing
8
 from mako.template import Template
8
 from mako.template import Template
9
-from redis import Redis
10
-from rq import Queue
11
 from tg.i18n import ugettext as _
9
 from tg.i18n import ugettext as _
12
 
10
 
13
 from tracim.lib.base import logger
11
 from tracim.lib.base import logger
14
 from tracim.model import User
12
 from tracim.model import User
15
 
13
 
14
+from tracim.tracim.lib.utils import get_rq_queue
15
+
16
 
16
 
17
 def send_email_through(
17
 def send_email_through(
18
         send_callable: typing.Callable[[Message], None],
18
         send_callable: typing.Callable[[Message], None],
31
     if cfg.EMAIL_PROCESSING_MODE == CFG.CST.SYNC:
31
     if cfg.EMAIL_PROCESSING_MODE == CFG.CST.SYNC:
32
         send_callable(message)
32
         send_callable(message)
33
     elif cfg.EMAIL_PROCESSING_MODE == CFG.CST.ASYNC:
33
     elif cfg.EMAIL_PROCESSING_MODE == CFG.CST.ASYNC:
34
-        queue = Queue('mail_sender', connection=Redis(
35
-            host=cfg.EMAIL_SENDER_REDIS_HOST,
36
-            port=cfg.EMAIL_SENDER_REDIS_PORT,
37
-            db=cfg.EMAIL_SENDER_REDIS_DB,
38
-        ))
34
+        queue = get_rq_queue('mail_sender')
39
         queue.enqueue(send_callable, message)
35
         queue.enqueue(send_callable, message)
40
     else:
36
     else:
41
         raise NotImplementedError(
37
         raise NotImplementedError(

+ 38 - 0
tracim/tracim/lib/system.py View File

1
+# -*- coding: utf-8 -*-
2
+import os
3
+import signal
4
+
5
+from tracim.lib.daemons import DaemonsManager
6
+
7
+
8
+class InterruptManager(object):
9
+    def __init__(self, daemons_manager: DaemonsManager):
10
+        self.daemons_manager = daemons_manager
11
+        self.process_pid = os.getpid()
12
+        self._install_sgnal_handlers()
13
+
14
+    def _install_sgnal_handlers(self) -> None:
15
+        """
16
+        Install signal handler to intercept SIGINT and SIGTERM signals
17
+        """
18
+        signal.signal(signal.SIGTERM, self.stop)
19
+        signal.signal(signal.SIGINT, self.stop)
20
+
21
+    def _remove_signal_handlers(self) -> None:
22
+        """
23
+        Remove installed signals to permit stop of main thread.
24
+        """
25
+        signal.signal(signal.SIGTERM, signal.SIG_DFL)
26
+        signal.signal(signal.SIGINT, signal.SIG_DFL)
27
+
28
+    def stop(self, signum, frame) -> None:
29
+        """
30
+        Run stopping process needed when tracim have to stop.
31
+        :param signum: signal interruption value
32
+        :param frame: frame of signal origin
33
+        """
34
+        self._remove_signal_handlers()
35
+        self.daemons_manager.stop_all()
36
+        # Web server is managed by end of stack like uwsgi, apache2.
37
+        # So to ask it's termination, we have to use standard kills signals
38
+        os.kill(self.process_pid, signum)

+ 17 - 14
tracim/tracim/lib/utils.py View File

13
 from tg.support.registry import StackedObjectProxy
13
 from tg.support.registry import StackedObjectProxy
14
 from tg.util import LazyString as BaseLazyString
14
 from tg.util import LazyString as BaseLazyString
15
 from tg.util import lazify
15
 from tg.util import lazify
16
+from redis import Redis
17
+from rq import Queue
16
 
18
 
17
 from tracim.lib.base import logger
19
 from tracim.lib.base import logger
18
 from webob import Response
20
 from webob import Response
63
     raise NotImplementedError()
65
     raise NotImplementedError()
64
 
66
 
65
 
67
 
66
-def add_signal_handler(signal_id, handler) -> None:
67
-    """
68
-    Add a callback attached to python signal.
69
-    :param signal_id: signal identifier (eg. signal.SIGTERM)
70
-    :param handler: callback to execute when signal trig
71
-    """
72
-    def _handler(*args, **kwargs):
73
-        handler()
74
-        signal.signal(signal_id, signal.SIG_DFL)
75
-        os.kill(os.getpid(), signal_id)  # Rethrow signal
76
-
77
-    signal.signal(signal_id, _handler)
78
-
79
-
80
 class APIWSGIHTTPException(WSGIHTTPException):
68
 class APIWSGIHTTPException(WSGIHTTPException):
81
     def json_formatter(self, body, status, title, environ):
69
     def json_formatter(self, body, status, title, environ):
82
         if self.comment:
70
         if self.comment:
171
         return text
159
         return text
172
 
160
 
173
 lazy_ugettext = lazify(_lazy_ugettext)
161
 lazy_ugettext = lazify(_lazy_ugettext)
162
+
163
+
164
+def get_rq_queue(queue_name: str= 'default') -> Queue:
165
+    """
166
+    :param queue_name: name of queue
167
+    :return: wanted queue
168
+    """
169
+    from tracim.config.app_cfg import CFG
170
+    cfg = CFG.get_instance()
171
+
172
+    return Queue(queue_name, connection=Redis(
173
+        host=cfg.EMAIL_SENDER_REDIS_HOST,
174
+        port=cfg.EMAIL_SENDER_REDIS_PORT,
175
+        db=cfg.EMAIL_SENDER_REDIS_DB,
176
+    ))