daemons.py 16KB


  1. import threading
  2. import collections
  3. from configparser import DuplicateSectionError
  4. from wsgiref.simple_server import make_server
  5. from radicale import Application as RadicaleApplication
  6. from radicale import HTTPServer as BaseRadicaleHTTPServer
  7. from radicale import HTTPSServer as BaseRadicaleHTTPSServer
  8. from radicale import RequestHandler as RadicaleRequestHandler
  9. from radicale import config as radicale_config
  10. from rq import Connection as RQConnection
  11. from rq import Worker as BaseRQWorker
  12. from redis import Redis
  13. from rq.dummy import do_nothing
  14. from rq.worker import StopRequested
  15. from tracim.lib.base import logger
  16. from tracim.lib.exceptions import AlreadyRunningDaemon
  17. from tracim.lib.utils import get_rq_queue
  18. from tracim.lib.email_fetcher import MailFetcher
  19. class DaemonsManager(object):
  20. def __init__(self):
  21. self._running_daemons = {}
  22. def run(self, name: str, daemon_class: object, **kwargs) -> None:
  23. """
  24. Start a daemon with given daemon class.
  25. :param name: Name of runned daemon. It's not possible to start two
  26. daemon with same name. In the opposite case, raise
  27. tracim.lib.exceptions.AlreadyRunningDaemon
  28. :param daemon_class: Daemon class to use for daemon instance.
  29. :param kwargs: Other kwargs will be given to daemon class
  30. instantiation.
  31. """
  32. if name in self._running_daemons:
  33. raise AlreadyRunningDaemon(
  34. 'Daemon with name "{0}" already running'.format(name)
  35. )
  36. logger.info(self, 'Starting daemon with name "{0}" and class "{1}" ...'
  37. .format(name, daemon_class))
  38. daemon = daemon_class(name=name, kwargs=kwargs, daemon=True)
  39. daemon.start()
  40. self._running_daemons[name] = daemon
  41. def stop(self, name: str) -> None:
  42. """
  43. Stop daemon with his name and wait for him.
  44. Where name is given name when daemon started
  45. with run method.
  46. :param name:
  47. """
  48. if name in self._running_daemons:
  49. logger.info(self, 'Stopping daemon with name "{0}" ...'
  50. .format(name))
  51. self._running_daemons[name].stop()
  52. self._running_daemons[name].join()
  53. del self._running_daemons[name]
  54. logger.info(self, 'Stopping daemon with name "{0}": OK'
  55. .format(name))
  56. def stop_all(self) -> None:
  57. """
  58. Stop all started daemons and wait for them.
  59. """
  60. logger.info(self, 'Stopping all daemons')
  61. for name, daemon in self._running_daemons.items():
  62. logger.info(self, 'Stopping daemon "{0}" ...'.format(name))
  63. daemon.stop()
  64. for name, daemon in self._running_daemons.items():
  65. logger.info(
  66. self,
  67. 'Stopping daemon "{0}" waiting confirmation'.format(name),
  68. )
  69. daemon.join()
  70. logger.info(self, 'Stopping daemon "{0}" OK'.format(name))
  71. self._running_daemons = {}
  72. def execute_in_thread(self, thread_name, callback):
  73. self._running_daemons[thread_name].append_thread_callback(callback)
  74. class TracimSocketServerMixin(object):
  75. """
  76. Mixin to use with socketserver.BaseServer who add _after_serve_actions
  77. method executed after end of server execution.
  78. """
  79. def __init__(self, *args, **kwargs):
  80. super().__init__(*args, **kwargs)
  81. self._daemon_execute_callbacks = []
  82. def append_thread_callback(self, callback: collections.Callable) -> None:
  83. """
  84. Add callback to self._daemon_execute_callbacks. See service_actions
  85. function to their usages.
  86. :param callback: callback to execute in daemon
  87. """
  88. self._daemon_execute_callbacks.append(callback)
  89. def serve_forever(self, *args, **kwargs):
  90. super().serve_forever(*args, **kwargs)
  91. # After serving (in case of stop) do following:
  92. self._after_serve_actions()
  93. def _after_serve_actions(self):
  94. """
  95. Override (and call super if needed) to execute actions when server
  96. finish it's job.
  97. """
  98. pass
  99. def service_actions(self):
  100. if len(self._daemon_execute_callbacks):
  101. try:
  102. while True:
  103. self._daemon_execute_callbacks.pop()()
  104. except IndexError:
  105. pass # Finished to iter
  106. class Daemon(threading.Thread):
  107. """
  108. Thread who contains daemon. You must implement start and stop methods to
  109. manage daemon life correctly.
  110. """
  111. def run(self) -> None:
  112. """
  113. Place here code who have to be executed in Daemon.
  114. """
  115. raise NotImplementedError()
  116. def stop(self) -> None:
  117. """
  118. Place here code who stop your daemon
  119. """
  120. raise NotImplementedError()
  121. def append_thread_callback(self, callback: collections.Callable) -> None:
  122. """
  123. Place here the logic who permit to execute a callback in your daemon.
  124. To get an exemple of that, take a look at
  125. socketserver.BaseServer#service_actions and how we use it in
  126. tracim.lib.daemons.TracimSocketServerMixin#service_actions .
  127. :param callback: callback to execute in your thread.
  128. """
  129. raise NotImplementedError()
  130. class MailFetcherDaemon(Daemon):
  131. """
  132. Thread containing a daemon who fetch new mail from a mailbox and
  133. send http request to a tracim endpoint to handle them.
  134. """
  135. def __init__(self, *args, **kwargs) -> None:
  136. super().__init__(*args, **kwargs)
  137. self._fetcher = None # type: MailFetcher
  138. self.ok = True
  139. def run(self) -> None:
  140. from tracim.config.app_cfg import CFG
  141. cfg = CFG.get_instance()
  142. self._fetcher = MailFetcher(
  143. host=cfg.EMAIL_REPLY_IMAP_SERVER,
  144. port=cfg.EMAIL_REPLY_IMAP_PORT,
  145. user=cfg.EMAIL_REPLY_IMAP_USER,
  146. password=cfg.EMAIL_REPLY_IMAP_PASSWORD,
  147. use_ssl=cfg.EMAIL_REPLY_IMAP_USE_SSL,
  148. folder=cfg.EMAIL_REPLY_IMAP_FOLDER,
  149. delay=cfg.EMAIL_REPLY_CHECK_HEARTBEAT,
  150. # FIXME - G.M - 2017-11-15 - proper tracim url formatting
  151. endpoint=cfg.WEBSITE_BASE_URL + "/events",
  152. token=cfg.EMAIL_REPLY_TOKEN,
  153. use_html_parsing=cfg.EMAIL_REPLY_USE_HTML_PARSING,
  154. use_txt_parsing=cfg.EMAIL_REPLY_USE_TXT_PARSING,
  155. filelock_path=cfg.EMAIL_REPLY_FILELOCK_PATH,
  156. )
  157. self._fetcher.run()
  158. def stop(self) -> None:
  159. if self._fetcher:
  160. self._fetcher.stop()
  161. def append_thread_callback(self, callback: collections.Callable) -> None:
  162. logger.warning('MailFetcherDaemon not implement append_thread_calback')
  163. pass
  164. class MailSenderDaemon(Daemon):
  165. # NOTE: use *args and **kwargs because parent __init__ use strange
  166. # * parameter
  167. def __init__(self, *args, **kwargs):
  168. super().__init__(*args, **kwargs)
  169. self.worker = None # type: RQWorker
  170. def append_thread_callback(self, callback: collections.Callable) -> None:
  171. logger.warning('MailSenderDaemon not implement append_thread_callback')
  172. pass
  173. def stop(self) -> None:
  174. # When _stop_requested at False, tracim.lib.daemons.RQWorker
  175. # will raise StopRequested exception in worker thread after receive a
  176. # job.
  177. self.worker._stop_requested = True
  178. queue = get_rq_queue('mail_sender')
  179. queue.enqueue(do_nothing)
  180. def run(self) -> None:
  181. from tracim.config.app_cfg import CFG
  182. cfg = CFG.get_instance()
  183. with RQConnection(Redis(
  184. host=cfg.EMAIL_SENDER_REDIS_HOST,
  185. port=cfg.EMAIL_SENDER_REDIS_PORT,
  186. db=cfg.EMAIL_SENDER_REDIS_DB,
  187. )):
  188. self.worker = RQWorker(['mail_sender'])
  189. self.worker.work()
  190. class RQWorker(BaseRQWorker):
  191. def _install_signal_handlers(self):
  192. # RQ Worker is designed to work in main thread
  193. # So we have to disable these signals (we implement server stop in
  194. # MailSenderDaemon.stop method).
  195. pass
  196. def dequeue_job_and_maintain_ttl(self, timeout):
  197. # RQ Worker is designed to work in main thread, so we add behaviour
  198. # here: if _stop_requested has been set to True, raise the standard way
  199. # StopRequested exception to stop worker.
  200. if self._stop_requested:
  201. raise StopRequested()
  202. return super().dequeue_job_and_maintain_ttl(timeout)
  203. class RadicaleHTTPSServer(TracimSocketServerMixin, BaseRadicaleHTTPSServer):
  204. pass
  205. class RadicaleHTTPServer(TracimSocketServerMixin, BaseRadicaleHTTPServer):
  206. pass
  207. class RadicaleDaemon(Daemon):
  208. def __init__(self, *args, **kwargs):
  209. super().__init__(*args, **kwargs)
  210. self._prepare_config()
  211. self._server = None
  212. def run(self):
  213. """
  214. To see origin radical server start method, refer to
  215. radicale.__main__.run
  216. """
  217. self._server = self._get_server()
  218. self._server.serve_forever()
  219. def stop(self):
  220. self._server.shutdown()
  221. def _prepare_config(self):
  222. from tracim.config.app_cfg import CFG
  223. cfg = CFG.get_instance()
  224. tracim_auth = 'tracim.lib.radicale.auth'
  225. tracim_rights = 'tracim.lib.radicale.rights'
  226. tracim_storage = 'tracim.lib.radicale.storage'
  227. fs_path = cfg.RADICALE_SERVER_FILE_SYSTEM_FOLDER
  228. allow_origin = cfg.RADICALE_SERVER_ALLOW_ORIGIN
  229. realm_message = cfg.RADICALE_SERVER_REALM_MESSAGE
  230. radicale_config.set('auth', 'type', 'custom')
  231. radicale_config.set('auth', 'custom_handler', tracim_auth)
  232. radicale_config.set('rights', 'type', 'custom')
  233. radicale_config.set('rights', 'custom_handler', tracim_rights)
  234. radicale_config.set('storage', 'type', 'custom')
  235. radicale_config.set('storage', 'custom_handler', tracim_storage)
  236. radicale_config.set('storage', 'filesystem_folder', fs_path)
  237. radicale_config.set('server', 'realm', realm_message)
  238. radicale_config.set(
  239. 'server',
  240. 'base_prefix',
  241. cfg.RADICALE_CLIENT_BASE_URL_PREFIX,
  242. )
  243. try:
  244. radicale_config.add_section('headers')
  245. except DuplicateSectionError:
  246. pass # It is not a problem, we just want it exist
  247. if allow_origin:
  248. radicale_config.set(
  249. 'headers',
  250. 'Access-Control-Allow-Origin',
  251. allow_origin,
  252. )
  253. # Radicale is not 100% CALDAV Compliant, we force some Allow-Methods
  254. radicale_config.set(
  255. 'headers',
  256. 'Access-Control-Allow-Methods',
  257. 'DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, OPTIONS, PROPFIND, '
  258. 'PROPPATCH, PUT, REPORT',
  259. )
  260. # Radicale is not 100% CALDAV Compliant, we force some Allow-Headers
  261. radicale_config.set(
  262. 'headers',
  263. 'Access-Control-Allow-Headers',
  264. 'X-Requested-With,X-Auth-Token,Content-Type,Content-Length,'
  265. 'X-Client,Authorization,depth,Prefer,If-None-Match,If-Match',
  266. )
  267. def _get_server(self):
  268. from tracim.config.app_cfg import CFG
  269. cfg = CFG.get_instance()
  270. return make_server(
  271. cfg.RADICALE_SERVER_HOST,
  272. cfg.RADICALE_SERVER_PORT,
  273. RadicaleApplication(),
  274. RadicaleHTTPSServer if cfg.RADICALE_SERVER_SSL else RadicaleHTTPServer,
  275. RadicaleRequestHandler
  276. )
  277. def append_thread_callback(self, callback: collections.Callable) -> None:
  278. """
  279. Give the callback to running server through
  280. tracim.lib.daemons.TracimSocketServerMixin#append_thread_callback
  281. :param callback: callback to execute in daemon
  282. """
  283. self._server.append_thread_callback(callback)
  284. # TODO : webdav deamon, make it clean !
  285. import sys, os
  286. from wsgidav.wsgidav_app import DEFAULT_CONFIG
  287. from wsgidav.xml_tools import useLxml
  288. from wsgidav.wsgidav_app import WsgiDAVApp
  289. from wsgidav._version import __version__
  290. from tracim.lib.webdav.sql_dav_provider import Provider
  291. from tracim.lib.webdav.sql_domain_controller import TracimDomainController
  292. from inspect import isfunction
  293. import traceback
  294. from cherrypy import wsgiserver
  295. from cherrypy.wsgiserver import CherryPyWSGIServer
  296. DEFAULT_CONFIG_FILE = "wsgidav.conf"
  297. PYTHON_VERSION = "%s.%s.%s" % (sys.version_info[0], sys.version_info[1], sys.version_info[2])
  298. class WsgiDavDaemon(Daemon):
  299. def __init__(self, *args, **kwargs):
  300. super().__init__(*args, **kwargs)
  301. self.config = self._initConfig()
  302. self._server = None
  303. def _initConfig(self):
  304. """Setup configuration dictionary from default, command line and configuration file."""
  305. from tg import config as tg_config
  306. # Set config defaults
  307. config = DEFAULT_CONFIG.copy()
  308. temp_verbose = config["verbose"]
  309. # Configuration file overrides defaults
  310. default_config_file = os.path.abspath(DEFAULT_CONFIG_FILE)
  311. config_file = tg_config.get('wsgidav.config_path', default_config_file)
  312. fileConf = self._readConfigFile(config_file, temp_verbose)
  313. config.update(fileConf)
  314. if not useLxml and config["verbose"] >= 1:
  315. print(
  316. "WARNING: Could not import lxml: using xml instead (slower). Consider installing lxml from http://codespeak.net/lxml/.")
  317. from wsgidav.dir_browser import WsgiDavDirBrowser
  318. from tracim.lib.webdav.tracim_http_authenticator import TracimHTTPAuthenticator
  319. from wsgidav.error_printer import ErrorPrinter
  320. from tracim.lib.webdav.utils import TracimWsgiDavDebugFilter
  321. config['middleware_stack'] = [
  322. WsgiDavDirBrowser,
  323. TracimHTTPAuthenticator,
  324. ErrorPrinter,
  325. TracimWsgiDavDebugFilter,
  326. ]
  327. config['provider_mapping'] = {
  328. config['root_path']: Provider(
  329. # TODO: Test to Re enabme archived and deleted
  330. show_archived=False, # config['show_archived'],
  331. show_deleted=False, # config['show_deleted'],
  332. show_history=False, # config['show_history'],
  333. manage_locks=config['manager_locks']
  334. )
  335. }
  336. config['domaincontroller'] = TracimDomainController(presetdomain=None, presetserver=None)
  337. return config
  338. def _readConfigFile(self, config_file, verbose):
  339. """Read configuration file options into a dictionary."""
  340. if not os.path.exists(config_file):
  341. raise RuntimeError("Couldn't open configuration file '%s'." % config_file)
  342. try:
  343. import imp
  344. conf = {}
  345. configmodule = imp.load_source("configuration_module", config_file)
  346. for k, v in vars(configmodule).items():
  347. if k.startswith("__"):
  348. continue
  349. elif isfunction(v):
  350. continue
  351. conf[k] = v
  352. except Exception as e:
  353. exceptioninfo = traceback.format_exception_only(sys.exc_type, sys.exc_value) # @UndefinedVariable
  354. exceptiontext = ""
  355. for einfo in exceptioninfo:
  356. exceptiontext += einfo + "\n"
  357. print("Failed to read configuration file: " + config_file + "\nDue to " + exceptiontext, file=sys.stderr)
  358. raise
  359. return conf
  360. def run(self):
  361. app = WsgiDAVApp(self.config)
  362. # Try running WsgiDAV inside the following external servers:
  363. self._runCherryPy(app, self.config)
  364. def _runCherryPy(self, app, config):
  365. version = "WsgiDAV/%s %s Python/%s" % (
  366. __version__,
  367. wsgiserver.CherryPyWSGIServer.version,
  368. PYTHON_VERSION
  369. )
  370. wsgiserver.CherryPyWSGIServer.version = version
  371. protocol = "http"
  372. if config["verbose"] >= 1:
  373. print("Running %s" % version)
  374. print("Listening on %s://%s:%s ..." % (protocol, config["host"], config["port"]))
  375. self._server = CherryPyWSGIServer(
  376. (config["host"], config["port"]),
  377. app,
  378. server_name=version,
  379. )
  380. self._server.start()
  381. def stop(self):
  382. self._server.stop()
  383. def append_thread_callback(self, callback: collections.Callable) -> None:
  384. """
  385. Place here the logic who permit to execute a callback in your daemon.
  386. To get an exemple of that, take a look at
  387. socketserver.BaseServer#service_actions and how we use it in
  388. tracim.lib.daemons.TracimSocketServerMixin#service_actions .
  389. :param callback: callback to execute in your thread.
  390. """
  391. raise NotImplementedError()