daemons.py 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. import threading
  2. import collections
  3. from configparser import DuplicateSectionError
  4. from wsgiref.simple_server import make_server
  5. from radicale import Application as RadicaleApplication
  6. from radicale import HTTPServer as BaseRadicaleHTTPServer
  7. from radicale import HTTPSServer as BaseRadicaleHTTPSServer
  8. from radicale import RequestHandler as RadicaleRequestHandler
  9. from radicale import config as radicale_config
  10. from rq import Connection as RQConnection
  11. from rq import Worker as BaseRQWorker
  12. from redis import Redis
  13. from rq.dummy import do_nothing
  14. from rq.worker import StopRequested
  15. from tracim.lib.base import logger
  16. from tracim.lib.exceptions import AlreadyRunningDaemon
  17. from tracim.lib.utils import get_rq_queue
  18. from tracim.lib.utils import TracimEnforceHTTPS
  19. from tracim.lib.email_fetcher import MailFetcher
  20. class DaemonsManager(object):
  21. def __init__(self):
  22. self._running_daemons = {}
  23. def run(self, name: str, daemon_class: object, **kwargs) -> None:
  24. """
  25. Start a daemon with given daemon class.
  26. :param name: Name of runned daemon. It's not possible to start two
  27. daemon with same name. In the opposite case, raise
  28. tracim.lib.exceptions.AlreadyRunningDaemon
  29. :param daemon_class: Daemon class to use for daemon instance.
  30. :param kwargs: Other kwargs will be given to daemon class
  31. instantiation.
  32. """
  33. if name in self._running_daemons:
  34. raise AlreadyRunningDaemon(
  35. 'Daemon with name "{0}" already running'.format(name)
  36. )
  37. logger.info(self, 'Starting daemon with name "{0}" and class "{1}" ...'
  38. .format(name, daemon_class))
  39. daemon = daemon_class(name=name, kwargs=kwargs, daemon=True)
  40. daemon.start()
  41. self._running_daemons[name] = daemon
  42. def stop(self, name: str) -> None:
  43. """
  44. Stop daemon with his name and wait for him.
  45. Where name is given name when daemon started
  46. with run method.
  47. :param name:
  48. """
  49. if name in self._running_daemons:
  50. logger.info(self, 'Stopping daemon with name "{0}" ...'
  51. .format(name))
  52. self._running_daemons[name].stop()
  53. self._running_daemons[name].join()
  54. del self._running_daemons[name]
  55. logger.info(self, 'Stopping daemon with name "{0}": OK'
  56. .format(name))
  57. def stop_all(self) -> None:
  58. """
  59. Stop all started daemons and wait for them.
  60. """
  61. logger.info(self, 'Stopping all daemons')
  62. for name, daemon in self._running_daemons.items():
  63. logger.info(self, 'Stopping daemon "{0}" ...'.format(name))
  64. daemon.stop()
  65. for name, daemon in self._running_daemons.items():
  66. logger.info(
  67. self,
  68. 'Stopping daemon "{0}" waiting confirmation'.format(name),
  69. )
  70. daemon.join()
  71. logger.info(self, 'Stopping daemon "{0}" OK'.format(name))
  72. self._running_daemons = {}
  73. def execute_in_thread(self, thread_name, callback):
  74. self._running_daemons[thread_name].append_thread_callback(callback)
  75. class TracimSocketServerMixin(object):
  76. """
  77. Mixin to use with socketserver.BaseServer who add _after_serve_actions
  78. method executed after end of server execution.
  79. """
  80. def __init__(self, *args, **kwargs):
  81. super().__init__(*args, **kwargs)
  82. self._daemon_execute_callbacks = []
  83. def append_thread_callback(self, callback: collections.Callable) -> None:
  84. """
  85. Add callback to self._daemon_execute_callbacks. See service_actions
  86. function to their usages.
  87. :param callback: callback to execute in daemon
  88. """
  89. self._daemon_execute_callbacks.append(callback)
  90. def serve_forever(self, *args, **kwargs):
  91. super().serve_forever(*args, **kwargs)
  92. # After serving (in case of stop) do following:
  93. self._after_serve_actions()
  94. def _after_serve_actions(self):
  95. """
  96. Override (and call super if needed) to execute actions when server
  97. finish it's job.
  98. """
  99. pass
  100. def service_actions(self):
  101. if len(self._daemon_execute_callbacks):
  102. try:
  103. while True:
  104. self._daemon_execute_callbacks.pop()()
  105. except IndexError:
  106. pass # Finished to iter
  107. class Daemon(threading.Thread):
  108. """
  109. Thread who contains daemon. You must implement start and stop methods to
  110. manage daemon life correctly.
  111. """
  112. def run(self) -> None:
  113. """
  114. Place here code who have to be executed in Daemon.
  115. """
  116. raise NotImplementedError()
  117. def stop(self) -> None:
  118. """
  119. Place here code who stop your daemon
  120. """
  121. raise NotImplementedError()
  122. def append_thread_callback(self, callback: collections.Callable) -> None:
  123. """
  124. Place here the logic who permit to execute a callback in your daemon.
  125. To get an exemple of that, take a look at
  126. socketserver.BaseServer#service_actions and how we use it in
  127. tracim.lib.daemons.TracimSocketServerMixin#service_actions .
  128. :param callback: callback to execute in your thread.
  129. """
  130. raise NotImplementedError()
  131. class MailFetcherDaemon(Daemon):
  132. """
  133. Thread containing a daemon who fetch new mail from a mailbox and
  134. send http request to a tracim endpoint to handle them.
  135. """
  136. def __init__(self, *args, **kwargs) -> None:
  137. super().__init__(*args, **kwargs)
  138. self._fetcher = None # type: MailFetcher
  139. self.ok = True
  140. def run(self) -> None:
  141. from tracim.config.app_cfg import CFG
  142. cfg = CFG.get_instance()
  143. self._fetcher = MailFetcher(
  144. host=cfg.EMAIL_REPLY_IMAP_SERVER,
  145. port=cfg.EMAIL_REPLY_IMAP_PORT,
  146. user=cfg.EMAIL_REPLY_IMAP_USER,
  147. password=cfg.EMAIL_REPLY_IMAP_PASSWORD,
  148. use_ssl=cfg.EMAIL_REPLY_IMAP_USE_SSL,
  149. folder=cfg.EMAIL_REPLY_IMAP_FOLDER,
  150. heartbeat=cfg.EMAIL_REPLY_CHECK_HEARTBEAT,
  151. use_idle=cfg.EMAIL_REPLY_IMAP_USE_IDLE,
  152. connection_max_lifetime=cfg.EMAIL_REPLY_CONNECTION_MAX_LIFETIME,
  153. # FIXME - G.M - 2017-11-15 - proper tracim url formatting
  154. endpoint=cfg.WEBSITE_BASE_URL + "/events",
  155. token=cfg.EMAIL_REPLY_TOKEN,
  156. use_html_parsing=cfg.EMAIL_REPLY_USE_HTML_PARSING,
  157. use_txt_parsing=cfg.EMAIL_REPLY_USE_TXT_PARSING,
  158. lockfile_path=cfg.EMAIL_REPLY_LOCKFILE_PATH,
  159. )
  160. self._fetcher.run()
  161. def stop(self) -> None:
  162. if self._fetcher:
  163. self._fetcher.stop()
  164. def append_thread_callback(self, callback: collections.Callable) -> None:
  165. logger.warning('MailFetcherDaemon not implement append_thread_calback')
  166. pass
  167. class MailSenderDaemon(Daemon):
  168. # NOTE: use *args and **kwargs because parent __init__ use strange
  169. # * parameter
  170. def __init__(self, *args, **kwargs):
  171. super().__init__(*args, **kwargs)
  172. self.worker = None # type: RQWorker
  173. def append_thread_callback(self, callback: collections.Callable) -> None:
  174. logger.warning('MailSenderDaemon not implement append_thread_callback')
  175. pass
  176. def stop(self) -> None:
  177. # When _stop_requested at False, tracim.lib.daemons.RQWorker
  178. # will raise StopRequested exception in worker thread after receive a
  179. # job.
  180. self.worker._stop_requested = True
  181. queue = get_rq_queue('mail_sender')
  182. queue.enqueue(do_nothing)
  183. def run(self) -> None:
  184. from tracim.config.app_cfg import CFG
  185. cfg = CFG.get_instance()
  186. with RQConnection(Redis(
  187. host=cfg.EMAIL_SENDER_REDIS_HOST,
  188. port=cfg.EMAIL_SENDER_REDIS_PORT,
  189. db=cfg.EMAIL_SENDER_REDIS_DB,
  190. )):
  191. self.worker = RQWorker(['mail_sender'])
  192. self.worker.work()
  193. class RQWorker(BaseRQWorker):
  194. def _install_signal_handlers(self):
  195. # RQ Worker is designed to work in main thread
  196. # So we have to disable these signals (we implement server stop in
  197. # MailSenderDaemon.stop method).
  198. pass
  199. def dequeue_job_and_maintain_ttl(self, timeout):
  200. # RQ Worker is designed to work in main thread, so we add behaviour
  201. # here: if _stop_requested has been set to True, raise the standard way
  202. # StopRequested exception to stop worker.
  203. if self._stop_requested:
  204. raise StopRequested()
  205. return super().dequeue_job_and_maintain_ttl(timeout)
  206. class RadicaleHTTPSServer(TracimSocketServerMixin, BaseRadicaleHTTPSServer):
  207. pass
  208. class RadicaleHTTPServer(TracimSocketServerMixin, BaseRadicaleHTTPServer):
  209. pass
  210. class RadicaleDaemon(Daemon):
  211. def __init__(self, *args, **kwargs):
  212. super().__init__(*args, **kwargs)
  213. self._prepare_config()
  214. self._server = None
  215. def run(self):
  216. """
  217. To see origin radical server start method, refer to
  218. radicale.__main__.run
  219. """
  220. self._server = self._get_server()
  221. self._server.serve_forever()
  222. def stop(self):
  223. self._server.shutdown()
  224. def _prepare_config(self):
  225. from tracim.config.app_cfg import CFG
  226. cfg = CFG.get_instance()
  227. tracim_auth = 'tracim.lib.radicale.auth'
  228. tracim_rights = 'tracim.lib.radicale.rights'
  229. tracim_storage = 'tracim.lib.radicale.storage'
  230. fs_path = cfg.RADICALE_SERVER_FILE_SYSTEM_FOLDER
  231. allow_origin = cfg.RADICALE_SERVER_ALLOW_ORIGIN
  232. realm_message = cfg.RADICALE_SERVER_REALM_MESSAGE
  233. radicale_config.set('auth', 'type', 'custom')
  234. radicale_config.set('auth', 'custom_handler', tracim_auth)
  235. radicale_config.set('rights', 'type', 'custom')
  236. radicale_config.set('rights', 'custom_handler', tracim_rights)
  237. radicale_config.set('storage', 'type', 'custom')
  238. radicale_config.set('storage', 'custom_handler', tracim_storage)
  239. radicale_config.set('storage', 'filesystem_folder', fs_path)
  240. radicale_config.set('server', 'realm', realm_message)
  241. radicale_config.set(
  242. 'server',
  243. 'base_prefix',
  244. cfg.RADICALE_CLIENT_BASE_URL_PREFIX,
  245. )
  246. try:
  247. radicale_config.add_section('headers')
  248. except DuplicateSectionError:
  249. pass # It is not a problem, we just want it exist
  250. if allow_origin:
  251. radicale_config.set(
  252. 'headers',
  253. 'Access-Control-Allow-Origin',
  254. allow_origin,
  255. )
  256. # Radicale is not 100% CALDAV Compliant, we force some Allow-Methods
  257. radicale_config.set(
  258. 'headers',
  259. 'Access-Control-Allow-Methods',
  260. 'DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, OPTIONS, PROPFIND, '
  261. 'PROPPATCH, PUT, REPORT',
  262. )
  263. # Radicale is not 100% CALDAV Compliant, we force some Allow-Headers
  264. radicale_config.set(
  265. 'headers',
  266. 'Access-Control-Allow-Headers',
  267. 'X-Requested-With,X-Auth-Token,Content-Type,Content-Length,'
  268. 'X-Client,Authorization,depth,Prefer,If-None-Match,If-Match',
  269. )
  270. def _get_server(self):
  271. from tracim.config.app_cfg import CFG
  272. cfg = CFG.get_instance()
  273. return make_server(
  274. cfg.RADICALE_SERVER_HOST,
  275. cfg.RADICALE_SERVER_PORT,
  276. RadicaleApplication(),
  277. RadicaleHTTPSServer if cfg.RADICALE_SERVER_SSL else RadicaleHTTPServer,
  278. RadicaleRequestHandler
  279. )
  280. def append_thread_callback(self, callback: collections.Callable) -> None:
  281. """
  282. Give the callback to running server through
  283. tracim.lib.daemons.TracimSocketServerMixin#append_thread_callback
  284. :param callback: callback to execute in daemon
  285. """
  286. self._server.append_thread_callback(callback)
  287. # TODO : webdav deamon, make it clean !
  288. import sys, os
  289. from wsgidav.wsgidav_app import DEFAULT_CONFIG
  290. from wsgidav.xml_tools import useLxml
  291. from wsgidav.wsgidav_app import WsgiDAVApp
  292. from wsgidav._version import __version__
  293. from tracim.lib.webdav.sql_dav_provider import Provider
  294. from tracim.lib.webdav.sql_domain_controller import TracimDomainController
  295. from inspect import isfunction
  296. import traceback
  297. from cherrypy import wsgiserver
  298. from cherrypy.wsgiserver import CherryPyWSGIServer
  299. DEFAULT_CONFIG_FILE = "wsgidav.conf"
  300. PYTHON_VERSION = "%s.%s.%s" % (sys.version_info[0], sys.version_info[1], sys.version_info[2])
  301. class WsgiDavDaemon(Daemon):
  302. def __init__(self, *args, **kwargs):
  303. super().__init__(*args, **kwargs)
  304. self.config = self._initConfig()
  305. self._server = None
  306. def _initConfig(self):
  307. """Setup configuration dictionary from default, command line and configuration file."""
  308. from tg import config as tg_config
  309. # Set config defaults
  310. config = DEFAULT_CONFIG.copy()
  311. temp_verbose = config["verbose"]
  312. # Configuration file overrides defaults
  313. default_config_file = os.path.abspath(DEFAULT_CONFIG_FILE)
  314. config_file = tg_config.get('wsgidav.config_path', default_config_file)
  315. fileConf = self._readConfigFile(config_file, temp_verbose)
  316. config.update(fileConf)
  317. if not useLxml and config["verbose"] >= 1:
  318. print(
  319. "WARNING: Could not import lxml: using xml instead (slower). Consider installing lxml from http://codespeak.net/lxml/.")
  320. from wsgidav.dir_browser import WsgiDavDirBrowser
  321. from wsgidav.http_authenticator import HTTPAuthenticator
  322. from wsgidav.error_printer import ErrorPrinter
  323. from tracim.lib.webdav.utils import TracimWsgiDavDebugFilter
  324. config['middleware_stack'] = [
  325. TracimEnforceHTTPS,
  326. WsgiDavDirBrowser,
  327. HTTPAuthenticator,
  328. ErrorPrinter,
  329. TracimWsgiDavDebugFilter,
  330. ]
  331. config['provider_mapping'] = {
  332. config['root_path']: Provider(
  333. # TODO: Test to Re enabme archived and deleted
  334. show_archived=False, # config['show_archived'],
  335. show_deleted=False, # config['show_deleted'],
  336. show_history=False, # config['show_history'],
  337. manage_locks=config['manager_locks']
  338. )
  339. }
  340. config['domaincontroller'] = TracimDomainController(presetdomain=None, presetserver=None)
  341. return config
  342. def _readConfigFile(self, config_file, verbose):
  343. """Read configuration file options into a dictionary."""
  344. if not os.path.exists(config_file):
  345. raise RuntimeError("Couldn't open configuration file '%s'." % config_file)
  346. try:
  347. import imp
  348. conf = {}
  349. configmodule = imp.load_source("configuration_module", config_file)
  350. for k, v in vars(configmodule).items():
  351. if k.startswith("__"):
  352. continue
  353. elif isfunction(v):
  354. continue
  355. conf[k] = v
  356. except Exception as e:
  357. exceptioninfo = traceback.format_exception_only(sys.exc_type, sys.exc_value) # @UndefinedVariable
  358. exceptiontext = ""
  359. for einfo in exceptioninfo:
  360. exceptiontext += einfo + "\n"
  361. print("Failed to read configuration file: " + config_file + "\nDue to " + exceptiontext, file=sys.stderr)
  362. raise
  363. return conf
  364. def run(self):
  365. app = WsgiDAVApp(self.config)
  366. # Try running WsgiDAV inside the following external servers:
  367. self._runCherryPy(app, self.config)
  368. def _runCherryPy(self, app, config):
  369. version = "WsgiDAV/%s %s Python/%s" % (
  370. __version__,
  371. wsgiserver.CherryPyWSGIServer.version,
  372. PYTHON_VERSION
  373. )
  374. wsgiserver.CherryPyWSGIServer.version = version
  375. protocol = "http"
  376. if config["verbose"] >= 1:
  377. print("Running %s" % version)
  378. print("Listening on %s://%s:%s ..." % (protocol, config["host"], config["port"]))
  379. self._server = CherryPyWSGIServer(
  380. (config["host"], config["port"]),
  381. app,
  382. server_name=version,
  383. )
  384. self._server.start()
  385. def stop(self):
  386. self._server.stop()
  387. def append_thread_callback(self, callback: collections.Callable) -> None:
  388. """
  389. Place here the logic who permit to execute a callback in your daemon.
  390. To get an exemple of that, take a look at
  391. socketserver.BaseServer#service_actions and how we use it in
  392. tracim.lib.daemons.TracimSocketServerMixin#service_actions .
  393. :param callback: callback to execute in your thread.
  394. """
  395. raise NotImplementedError()