import pickle from apscheduler.job import Job from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError from apscheduler.util import ( datetime_to_utc_timestamp, maybe_ref, utc_timestamp_to_datetime, ) try: from rethinkdb import RethinkDB except ImportError as exc: # pragma: nocover raise ImportError("RethinkDBJobStore requires rethinkdb installed") from exc class RethinkDBJobStore(BaseJobStore): """ Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to rethinkdb's `RethinkdbClient `_. Plugin alias: ``rethinkdb`` :param str database: database to store jobs in :param str collection: collection to store jobs in :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing connection arguments :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available """ def __init__( self, database="apscheduler", table="jobs", client=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args, ): super().__init__() if not database: raise ValueError('The "database" parameter must not be empty') if not table: raise ValueError('The "table" parameter must not be empty') self.database = database self.table_name = table self.table = None self.client = client self.pickle_protocol = pickle_protocol self.connect_args = connect_args self.r = RethinkDB() self.conn = None def start(self, scheduler, alias): super().start(scheduler, alias) if self.client: self.conn = maybe_ref(self.client) else: self.conn = self.r.connect(db=self.database, **self.connect_args) if self.database not in self.r.db_list().run(self.conn): self.r.db_create(self.database).run(self.conn) if self.table_name not in self.r.table_list().run(self.conn): self.r.table_create(self.table_name).run(self.conn) if "next_run_time" not in self.r.table(self.table_name).index_list().run( self.conn ): self.r.table(self.table_name).index_create("next_run_time").run(self.conn) self.table = self.r.db(self.database).table(self.table_name) def lookup_job(self, job_id): results = list(self.table.get_all(job_id).pluck("job_state").run(self.conn)) return self._reconstitute_job(results[0]["job_state"]) if results else None def get_due_jobs(self, now): return self._get_jobs( self.r.row["next_run_time"] <= datetime_to_utc_timestamp(now) ) def get_next_run_time(self): results = list( self.table.filter(self.r.row["next_run_time"] != None) .order_by(self.r.asc("next_run_time")) .map(lambda x: x["next_run_time"]) .limit(1) .run(self.conn) ) return utc_timestamp_to_datetime(results[0]) if results else None def get_all_jobs(self): jobs = self._get_jobs() self._fix_paused_jobs_sorting(jobs) return jobs def add_job(self, job): job_dict = { "id": job.id, "next_run_time": datetime_to_utc_timestamp(job.next_run_time), "job_state": self.r.binary( pickle.dumps(job.__getstate__(), self.pickle_protocol) ), } results = self.table.insert(job_dict).run(self.conn) if results["errors"] > 0: raise ConflictingIdError(job.id) def update_job(self, job): changes = { "next_run_time": datetime_to_utc_timestamp(job.next_run_time), "job_state": self.r.binary( pickle.dumps(job.__getstate__(), self.pickle_protocol) ), } results = self.table.get_all(job.id).update(changes).run(self.conn) skipped = False in map(lambda x: results[x] == 0, results.keys()) if results["skipped"] > 0 or results["errors"] > 0 or not skipped: raise JobLookupError(job.id) def remove_job(self, job_id): results = self.table.get_all(job_id).delete().run(self.conn) if results["deleted"] + results["skipped"] != 1: raise JobLookupError(job_id) def remove_all_jobs(self): self.table.delete().run(self.conn) def shutdown(self): self.conn.close() def _reconstitute_job(self, job_state): job_state = pickle.loads(job_state) job = Job.__new__(Job) job.__setstate__(job_state) job._scheduler = self._scheduler job._jobstore_alias = self._alias return job def _get_jobs(self, predicate=None): jobs = [] failed_job_ids = [] query = ( self.table.filter(self.r.row["next_run_time"] != None).filter(predicate) if predicate else self.table ) query = query.order_by("next_run_time", "id").pluck("id", "job_state") for document in query.run(self.conn): try: jobs.append(self._reconstitute_job(document["job_state"])) except Exception: self._logger.exception( 'Unable to restore job "%s" -- removing it', document["id"] ) failed_job_ids.append(document["id"]) # Remove all the jobs we failed to restore if failed_job_ids: self.r.expr(failed_job_ids).for_each( lambda job_id: self.table.get_all(job_id).delete() ).run(self.conn) return jobs def __repr__(self): connection = self.conn return f"<{self.__class__.__name__} (connection={connection})>"