Przeglądaj źródła

Closes #167: Allow to use RedisQueue for email sending

Bastien Sevajol (Algoo) 7 lat temu
rodzic
commit
54c8da3898

+ 2 - 1
install/requirements.txt Wyświetl plik

@@ -49,7 +49,6 @@ stevedore==1.1.0
49 49
 tg.devtools==2.3.7
50 50
 git+https://github.com/algoo/tgapp-resetpassword.git
51 51
 tgext.admin==0.6.4
52
-tgext.asyncjob==0.3.1
53 52
 tgext.crud==0.7.3
54 53
 tgext.pluggable==0.6.2
55 54
 transaction==1.4.4
@@ -65,3 +64,5 @@ zope.sqlalchemy==0.7.6
65 64
 PyYAML
66 65
 redis==2.10.5
67 66
 typing==3.5.3.0
67
+rq==0.7.1
68
+click==6.7

+ 7 - 2
tracim/development.ini.base Wyświetl plik

@@ -192,8 +192,13 @@ email.notification.smtp.port = 25
192 192
 email.notification.smtp.user = your_smtp_user
193 193
 email.notification.smtp.password = your_smtp_password
194 194
 
195
-# Asyncjob tracker, can be memory or redis default is memory
196
-asyncjob.tracker = memory
195
+## Email sending configuration
196
+# processing_mode may be sync or async,
197
+# with async, please configure redis below
198
+email.processing_mode = sync
199
+# email.async.redis.host = localhost
200
+# email.async.redis.port = 6379
201
+# email.async.redis.db = 0
197 202
 
198 203
 ## Radical (CalDav server) configuration
199 204
 # radicale.server.host = 0.0.0.0

+ 2 - 0
tracim/setup.py Wyświetl plik

@@ -43,6 +43,7 @@ install_requires=[
43 43
     "python-ldap-test==0.2.1",
44 44
     "unicode-slugify==0.1.3",
45 45
     "pytz==2014.7",
46
+    'rq==0.7.1',
46 47
     ]
47 48
 
48 49
 setup(
@@ -76,6 +77,7 @@ setup(
76 77
             'ldap_server = tracim.command.ldap_test_server:LDAPTestServerCommand',
77 78
             'user_create = tracim.command.user:CreateUserCommand',
78 79
             'user_update = tracim.command.user:UpdateUserCommand',
80
+            'mail sender = tracim.command.mail:MailSenderCommend',
79 81
         ]
80 82
     },
81 83
     dependency_links=[

+ 16 - 0
tracim/tracim/command/mail.py Wyświetl plik

@@ -0,0 +1,16 @@
1
+# -*- coding: utf-8 -*-
2
+from rq import Connection, Worker
3
+
4
+from tracim.command import AppContextCommand
5
+
6
+
7
+class MailSenderCommend(AppContextCommand):
8
+    def get_description(self):
9
+        return '''Run rq worker for mail sending'''
10
+
11
+    def take_action(self, parsed_args):
12
+        super().take_action(parsed_args)
13
+
14
+        with Connection():
15
+            w = Worker(['mail_sender'])
16
+            w.work()

+ 27 - 52
tracim/tracim/config/app_cfg.py Wyświetl plik

@@ -19,8 +19,6 @@ from urllib.parse import urlparse
19 19
 import tg
20 20
 from paste.deploy.converters import asbool
21 21
 from tg.configuration.milestones import environment_loaded
22
-from tgext.asyncjob.trackers.memory import MemoryProgressTracker
23
-from tgext.asyncjob.trackers.redisdb import RedisProgressTracker
24 22
 
25 23
 from tgext.pluggable import plug
26 24
 from tgext.pluggable import replace_template
@@ -30,10 +28,9 @@ from tracim.lib.utils import lazy_ugettext as l_
30 28
 import tracim
31 29
 from tracim import model
32 30
 from tracim.config import TracimAppConfig
33
-from tracim.lib import app_globals, helpers
34
-from tracim.lib.auth.wrapper import AuthConfigWrapper
35 31
 from tracim.lib.base import logger
36 32
 from tracim.lib.daemons import DaemonsManager
33
+from tracim.lib.daemons import MailSenderDaemon
37 34
 from tracim.lib.daemons import RadicaleDaemon
38 35
 from tracim.lib.daemons import WsgiDavDaemon
39 36
 from tracim.model.data import ActionDescription
@@ -106,6 +103,7 @@ def start_daemons(manager: DaemonsManager):
106 103
 
107 104
     manager.run('radicale', RadicaleDaemon)
108 105
     manager.run('webdav', WsgiDavDaemon)
106
+    manager.run('mail_sender', MailSenderDaemon)
109 107
 
110 108
 environment_loaded.register(lambda: start_daemons(daemons))
111 109
 
@@ -341,32 +339,36 @@ class CFG(object):
341 339
         if not self.WSGIDAV_CLIENT_BASE_URL.endswith('/'):
342 340
             self.WSGIDAV_CLIENT_BASE_URL += '/'
343 341
 
344
-        self.ASYNC_JOB_TRACKER = tg.config.get(
345
-            'asyncjob.tracker',
346
-            'memory',
347
-        )
342
+        self.EMAIL_PROCESSING_MODE = tg.config.get(
343
+            'email.processing_mode',
344
+            'sync',
345
+        ).upper()
348 346
 
349
-        if self.ASYNC_JOB_TRACKER not in ('memory', 'redis'):
347
+        if self.EMAIL_PROCESSING_MODE not in (
348
+                self.CST.ASYNC,
349
+                self.CST.SYNC,
350
+        ):
350 351
             raise Exception(
351
-                'asyncjob.tracker configuration '
352
-                'can ''be "memory" or "redis", not "{0}"'.format(
353
-                    self.ASYNC_JOB_TRACKER,
352
+                'email.processing_mode '
353
+                'can ''be "{}" or "{}", not "{}"'.format(
354
+                    self.CST.ASYNC,
355
+                    self.CST.SYNC,
356
+                    self.EMAIL_PROCESSING_MODE,
354 357
                 )
355 358
             )
356 359
 
357
-        if self.ASYNC_JOB_TRACKER == 'redis':
358
-            self.ASYNC_JOB_TRACKER_REDIS_HOST = tg.config.get(
359
-                'asyncjob.tracker.redis.host',
360
-                'localhost',
361
-            )
362
-            self.ASYNC_JOB_TRACKER_REDIS_PORT = int(tg.config.get(
363
-                'asyncjob.tracker.redis.port',
364
-                6379,
365
-            ))
366
-            self.ASYNC_JOB_TRACKER_REDIS_DB = int(tg.config.get(
367
-                'asyncjob.tracker.redis.db',
368
-                15,
369
-            ))
360
+        self.EMAIL_SENDER_REDIS_HOST = tg.config.get(
361
+            'email.async.redis.host',
362
+            'localhost',
363
+        )
364
+        self.EMAIL_SENDER_REDIS_PORT = int(tg.config.get(
365
+            'email.async.redis.port',
366
+            6379,
367
+        ))
368
+        self.EMAIL_SENDER_REDIS_DB = int(tg.config.get(
369
+            'email.async.redis.db',
370
+            0,
371
+        ))
370 372
 
371 373
     def get_tracker_js_content(self, js_tracker_file_path = None):
372 374
         js_tracker_file_path = tg.config.get('js_tracker_path', None)
@@ -408,30 +410,3 @@ class CFG(object):
408 410
 base_config.variable_provider = lambda: {
409 411
     'CFG': CFG.get_instance()
410 412
 }
411
-
412
-
413
-def plug_asyncjob():
414
-    cfg = CFG.get_instance()
415
-
416
-    # # Manual creation of async job tracker to be able to log it
417
-    async_job_tracker = cfg.ASYNC_JOB_TRACKER
418
-    if async_job_tracker == 'redis':
419
-        async_job_progress_tracker = RedisProgressTracker(
420
-            host=cfg.ASYNC_JOB_TRACKER_REDIS_HOST,
421
-            port=cfg.ASYNC_JOB_TRACKER_REDIS_PORT,
422
-            db=cfg.ASYNC_JOB_TRACKER_REDIS_DB,
423
-        )
424
-    else:
425
-        async_job_progress_tracker = MemoryProgressTracker()
426
-
427
-    logger.info(
428
-        cfg,
429
-        'Async job track using {0}'.format(str(async_job_progress_tracker)),
430
-    )
431
-    plug(
432
-        base_config,
433
-        'tgext.asyncjob',
434
-        progress_tracker=async_job_progress_tracker,
435
-    )
436
-
437
-environment_loaded.register(lambda: plug_asyncjob())

+ 41 - 5
tracim/tracim/lib/daemons.py Wyświetl plik

@@ -1,20 +1,19 @@
1
+import logging
1 2
 import threading
2 3
 from configparser import DuplicateSectionError
3
-from datetime import datetime
4 4
 from wsgiref.simple_server import make_server
5 5
 import signal
6 6
 
7 7
 import collections
8
-import time
9
-
10
-import io
11
-import yaml
12 8
 
13 9
 from radicale import Application as RadicaleApplication
14 10
 from radicale import HTTPServer as BaseRadicaleHTTPServer
15 11
 from radicale import HTTPSServer as BaseRadicaleHTTPSServer
16 12
 from radicale import RequestHandler as RadicaleRequestHandler
17 13
 from radicale import config as radicale_config
14
+from rq import Connection as RQConnection
15
+from rq import Worker as BaseRQWorker
16
+from redis import Redis
18 17
 
19 18
 from tracim.lib.base import logger
20 19
 from tracim.lib.exceptions import AlreadyRunningDaemon
@@ -148,6 +147,43 @@ class Daemon(threading.Thread):
148 147
         raise NotImplementedError()
149 148
 
150 149
 
150
+class MailSenderDaemon(Daemon):
151
+    # NOTE: use *args and **kwargs because parent __init__ use strange
152
+    # * parameter
153
+    def __init__(self, *args, **kwargs):
154
+        super().__init__(*args, **kwargs)
155
+        self.worker = None  # type: RQWorker
156
+
157
+    def append_thread_callback(self, callback: collections.Callable) -> None:
158
+        logger.warning('MailSenderDaemon not implement append_thread_callback')
159
+        pass
160
+
161
+    def stop(self) -> None:
162
+        self.worker.request_stop('TRACIM STOP', None)
163
+
164
+    def run(self) -> None:
165
+        from tracim.config.app_cfg import CFG
166
+        cfg = CFG.get_instance()
167
+
168
+        with RQConnection(Redis(
169
+            host=cfg.EMAIL_SENDER_REDIS_HOST,
170
+            port=cfg.EMAIL_SENDER_REDIS_PORT,
171
+            db=cfg.EMAIL_SENDER_REDIS_DB,
172
+        )):
173
+            self.worker = RQWorker(['mail_sender'])
174
+            self.worker.work()
175
+
176
+
177
+class RQWorker(BaseRQWorker):
178
+    def _install_signal_handlers(self):
179
+        # TODO BS 20170126: RQ WWorker is designed to work in main thread
180
+        # So we have to disable these signals (we implement server stop in
181
+        # MailSenderDaemon.stop method). When bug
182
+        # https://github.com/tracim/tracim/issues/166 will be fixed, ensure
183
+        # This worker terminate correctly.
184
+        pass
185
+
186
+
151 187
 class RadicaleHTTPSServer(TracimSocketServerMixin, BaseRadicaleHTTPSServer):
152 188
     pass
153 189
 

+ 37 - 8
tracim/tracim/lib/email.py Wyświetl plik

@@ -1,17 +1,50 @@
1 1
 # -*- coding: utf-8 -*-
2
-
3
-from email.mime.multipart import MIMEMultipart
4 2
 import smtplib
3
+from email.message import Message
4
+from email.mime.multipart import MIMEMultipart
5 5
 from email.mime.text import MIMEText
6 6
 
7
+import typing
7 8
 from mako.template import Template
8
-from tgext.asyncjob import asyncjob_perform
9
+from redis import Redis
10
+from rq import Queue
9 11
 from tg.i18n import ugettext as _
10 12
 
11 13
 from tracim.lib.base import logger
12 14
 from tracim.model import User
13 15
 
14 16
 
17
+def send_email_through(
18
+        send_callable: typing.Callable[[Message], None],
19
+        message: Message,
20
+) -> None:
21
+    """
22
+    Send mail encapsulation to send it in async or sync mode.
23
+    TODO BS 20170126: A global mail/sender management should be a good
24
+                      thing. Actually, this method is an fast solution.
25
+    :param send_callable: A callable who get message on first parameter
26
+    :param message: The message who have to be sent
27
+    """
28
+    from tracim.config.app_cfg import CFG
29
+    cfg = CFG.get_instance()
30
+
31
+    if cfg.EMAIL_PROCESSING_MODE == CFG.CST.SYNC:
32
+        send_callable(message)
33
+    elif cfg.EMAIL_PROCESSING_MODE == CFG.CST.ASYNC:
34
+        queue = Queue('mail_sender', connection=Redis(
35
+            host=cfg.EMAIL_SENDER_REDIS_HOST,
36
+            port=cfg.EMAIL_SENDER_REDIS_PORT,
37
+            db=cfg.EMAIL_SENDER_REDIS_DB,
38
+        ))
39
+        queue.enqueue(send_callable, message)
40
+    else:
41
+        raise NotImplementedError(
42
+            'Mail sender processing mode {} is not implemented'.format(
43
+                cfg.EMAIL_PROCESSING_MODE,
44
+            )
45
+        )
46
+
47
+
15 48
 class SmtpConfiguration(object):
16 49
     """
17 50
     Container class for SMTP configuration used in Tracim
@@ -139,11 +172,7 @@ class EmailManager(object):
139 172
         message.attach(part1)
140 173
         message.attach(part2)
141 174
 
142
-        asyncjob_perform(async_email_sender.send_mail, message)
143
-
144
-        # Note: The following action allow to close the SMTP connection.
145
-        # This will work only if the async jobs are done in the right order
146
-        asyncjob_perform(async_email_sender.disconnect)
175
+        send_email_through(async_email_sender.send_mail, message)
147 176
 
148 177
     def _render(self, mako_template_filepath: str, context: dict):
149 178
         """

+ 4 - 18
tracim/tracim/lib/notifications.py Wyświetl plik

@@ -10,6 +10,7 @@ from mako.template import Template
10 10
 
11 11
 from tracim.lib.base import logger
12 12
 from tracim.lib.email import SmtpConfiguration
13
+from tracim.lib.email import send_email_through
13 14
 from tracim.lib.email import EmailSender
14 15
 from tracim.lib.user import UserApi
15 16
 from tracim.lib.workspace import WorkspaceApi
@@ -23,8 +24,6 @@ from tracim.model.data import Content, UserRoleInWorkspace, ContentType, \
23 24
 from tracim.model.auth import User
24 25
 
25 26
 
26
-from tgext.asyncjob import asyncjob_perform
27
-
28 27
 class INotifier(object):
29 28
     """
30 29
     Interface for Notifier instances
@@ -142,7 +141,6 @@ class RealNotifier(object):
142 141
                 # TODO - D.A - 2014-11-06
143 142
                 # This feature must be implemented in order to be able to scale to large communities
144 143
                 raise NotImplementedError('Sending emails through ASYNC mode is not working yet')
145
-                asyncjob_perform(EmailNotifier(self._smtp_config, global_config).notify_content_update, self._user.user_id, content.content_id)
146 144
             else:
147 145
                 logger.info(self, 'Sending email in SYNC mode')
148 146
                 EmailNotifier(self._smtp_config, global_config).notify_content_update(self._user.user_id, content.content_id)
@@ -278,19 +276,7 @@ class EmailNotifier(object):
278 276
             message.attach(part1)
279 277
             message.attach(part2)
280 278
 
281
-            message_str = message.as_string()
282
-            # asyncjob_perform(async_email_sender.send_mail, message)
283
-            # FIXME: Temporary hack to enable email sending in
284
-            # uwsgi/prod environment
285
-            async_email_sender.send_mail(message)
286
-            # s.send_message(message)
287
-
288
-        # Note: The following action allow to close the SMTP connection.
289
-        # This will work only if the async jobs are done in the right order
290
-        # FIXME: Temporary hack to enable email sending in
291
-        # uwsgi/prod environment
292
-        # asyncjob_perform(async_email_sender.disconnect)
293
-
279
+            send_email_through(async_email_sender.send_mail, message)
294 280
 
295 281
     def _build_email_body(self, mako_template_filepath: str, role: UserRoleInWorkspace, content: Content, actor: User) -> str:
296 282
         """
@@ -360,7 +346,7 @@ class EmailNotifier(object):
360 346
                 title_diff = ''
361 347
                 if previous_revision.label != content.label:
362 348
                     title_diff = htmldiff(previous_revision.label, content.label)
363
-                content_text = l_('<p id="content-body-intro">Here is an overview of the changes:</p>')+ \
349
+                content_text = str(l_('<p id="content-body-intro">Here is an overview of the changes:</p>'))+ \
364 350
                     title_diff + \
365 351
                     htmldiff(previous_revision.description, content.description)
366 352
 
@@ -370,7 +356,7 @@ class EmailNotifier(object):
370 356
                 title_diff = ''
371 357
                 if previous_revision.label != content.label:
372 358
                     title_diff = htmldiff(previous_revision.label, content.label)
373
-                content_text = l_('<p id="content-body-intro">Here is an overview of the changes:</p>')+ \
359
+                content_text = str(l_('<p id="content-body-intro">Here is an overview of the changes:</p>'))+ \
374 360
                     title_diff + \
375 361
                     htmldiff(previous_revision.description, content.description)
376 362