Source code for jobflow_remote.utils.schedule

"""Scheduling tools based on the schedule module."""

from __future__ import annotations

import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING

from schedule import Scheduler

if TYPE_CHECKING:
    from jobflow import Job

logger = logging.getLogger(__name__)


# TODO consider making this with an exponential backoff strategy
# with a failure at the end
[docs] class SafeScheduler(Scheduler): """ An implementation of Scheduler that catches jobs that fail, logs their exception tracebacks as errors, optionally reschedules the jobs for their next run time, and keeps going. Adapted from https://gist.github.com/mplewis/8483f1c24f2d6259aef6 """ def __init__( self, reschedule_on_failure: bool = True, seconds_after_failure: int = 0 ) -> None: """ If reschedule_on_failure is True, jobs will be rescheduled for their next run as if they had completed successfully. If False, they'll run on the next run_pending() tick. """ self.reschedule_on_failure = reschedule_on_failure self.seconds_after_failure = seconds_after_failure super().__init__() def _run_job(self, job: Job) -> None: try: super()._run_job(job) except Exception: task_name = job.job_func.__name__ logger.exception(f"Error while running task {task_name}") if self.reschedule_on_failure: if secs := self.seconds_after_failure: logger.warning(f"Task {task_name} rescheduled in {secs} seconds") job.last_run = None job.next_run = datetime.now() + timedelta(seconds=secs) else: logger.warning(f"Task {task_name} rescheduled") job.last_run = datetime.now() job._schedule_next_run() else: logger.warning(f"Task {task_name} canceled.") self.cancel_job(job)