from functools import wraps from apscheduler.schedulers.base import BaseScheduler from apscheduler.util import maybe_ref try: from twisted.internet import reactor as default_reactor except ImportError as exc: # pragma: nocover raise ImportError("TwistedScheduler requires Twisted installed") from exc def run_in_reactor(func): @wraps(func) def wrapper(self, *args, **kwargs): self._reactor.callFromThread(func, self, *args, **kwargs) return wrapper class TwistedScheduler(BaseScheduler): """ A scheduler that runs on a Twisted reactor. Extra options: =========== ======================================================== ``reactor`` Reactor instance to use (defaults to the global reactor) =========== ======================================================== """ _reactor = None _delayedcall = None def _configure(self, config): self._reactor = maybe_ref(config.pop("reactor", default_reactor)) super()._configure(config) @run_in_reactor def shutdown(self, wait=True): super().shutdown(wait) self._stop_timer() def _start_timer(self, wait_seconds): self._stop_timer() if wait_seconds is not None: self._delayedcall = self._reactor.callLater(wait_seconds, self.wakeup) def _stop_timer(self): if self._delayedcall and self._delayedcall.active(): self._delayedcall.cancel() del self._delayedcall @run_in_reactor def wakeup(self): self._stop_timer() wait_seconds = self._process_jobs() self._start_timer(wait_seconds) def _create_default_executor(self): from apscheduler.executors.twisted import TwistedExecutor return TwistedExecutor()