Source code for jobflow_remote.jobs.jobcontroller

from __future__ import annotations

import contextlib
import fnmatch
import logging
import traceback
import warnings
from collections import defaultdict
from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, cast

import jobflow
import pymongo
from jobflow import JobStore, OnMissing
from monty.dev import deprecated
from monty.json import MontyDecoder
from monty.serialization import loadfn
from qtoolkit.core.data_objects import CancelStatus, QResources

from jobflow_remote.config.base import ConfigError, ExecutionConfig, Project
from jobflow_remote.config.manager import ConfigManager
from jobflow_remote.jobs.data import (
    OUT_FILENAME,
    DbCollection,
    DynamicResponseType,
    FlowDoc,
    FlowInfo,
    JobDoc,
    JobInfo,
    RemoteError,
    get_initial_flow_doc_dict,
    get_initial_job_doc_dict,
    get_reset_job_base_dict,
    projection_flow_info_jobs,
    projection_job_info,
)
from jobflow_remote.jobs.state import (
    DELETABLE_STATES,
    PAUSABLE_STATES,
    RESETTABLE_STATES,
    RUNNING_STATES,
    FlowState,
    JobState,
)
from jobflow_remote.remote.data import get_remote_store, update_store
from jobflow_remote.remote.queue import QueueManager
from jobflow_remote.utils.data import deep_merge_dict
from jobflow_remote.utils.db import FlowLockedError, JobLockedError, MongoLock

if TYPE_CHECKING:
    from collections.abc import Generator

    from maggma.stores import MongoStore

    from jobflow_remote.remote.host import BaseHost


logger = logging.getLogger(__name__)


[docs] class JobController: """ Main entry point for all the interactions with the Stores. Maintains a connection to both the queue Store and the results JobStore. It is required that the queue Store is a MongoStore, as it will access the database, and work with different collections. The main functionalities are those for updating the state of the database and querying the Jobs and Flows status information. """ def __init__( self, queue_store: MongoStore, jobstore: JobStore, flows_collection: str = "flows", auxiliary_collection: str = "jf_auxiliary", project: Project | None = None, ) -> None: """ Parameters ---------- queue_store The Store used to save information about the status of the Jobs. Should be a MongoStore and other collections are used from the same database. jobstore The JobStore containing the output of the jobflow Flows. flows_collection The name of the collection used to store the Flows data. Uses the DB defined in the queue_store. auxiliary_collection The name of the collection used to store other auxiliary data. Uses the DB defined in the queue_store. project The project where the Stores were defined. """ self.queue_store = queue_store self.jobstore = jobstore self.jobs_collection = self.queue_store.collection_name self.flows_collection = flows_collection self.auxiliary_collection = auxiliary_collection # TODO should it connect here? Or the passed stores should be connected? self.queue_store.connect() self.jobstore.connect() self.db = self.queue_store._collection.database self.jobs = self.queue_store._collection self.flows = self.db[self.flows_collection] self.auxiliary = self.db[self.auxiliary_collection] self.project = project
[docs] @classmethod def from_project_name(cls, project_name: str | None = None) -> JobController: """ Generate an instance of JobController from the project name. Parameters ---------- project_name The name of the project. If None the default project will be used. Returns ------- JobController An instance of JobController associated with the project. """ config_manager: ConfigManager = ConfigManager() project: Project = config_manager.get_project(project_name) return cls.from_project(project=project)
[docs] @classmethod def from_project(cls, project: Project) -> JobController: """ Generate an instance of JobController from a Project object. Parameters ---------- project The project used to generate the JobController. If None the default project will be used. Returns ------- JobController An instance of JobController associated with the project. """ queue_store = project.get_queue_store() flows_collection = project.queue.flows_collection auxiliary_collection = project.queue.auxiliary_collection jobstore = project.get_jobstore() return cls( queue_store=queue_store, jobstore=jobstore, flows_collection=flows_collection, auxiliary_collection=auxiliary_collection, project=project, )
[docs] def close(self) -> None: """Close the connections to all the Stores in JobController.""" try: self.queue_store.close() except Exception: logger.exception("Error while closing the connection to the queue store") try: self.jobstore.close() except Exception: logger.exception("Error while closing the connection to the job store")
def _build_query_job( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, locked: bool = False, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, ) -> dict: """ Build a query to search for Jobs, based on standard parameters. The Jobs will need to satisfy all the defined conditions. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. locked If True only locked Jobs will be selected. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. Returns ------- dict A dictionary with the query to be applied to a collection containing JobDocs. """ if job_ids and not any(isinstance(ji, (list, tuple)) for ji in job_ids): # without these cast mypy is confused about the type job_ids = cast(list[tuple[str, int]], [job_ids]) if db_ids is not None and not isinstance(db_ids, (list, tuple)): db_ids = [db_ids] if flow_ids and not isinstance(flow_ids, (list, tuple)): flow_ids = [flow_ids] if isinstance(states, JobState): states = [states] if isinstance(workers, str): workers = [workers] query: dict = defaultdict(dict) if db_ids: query["db_id"] = {"$in": db_ids} if job_ids: job_ids = cast(list[tuple[str, int]], job_ids) or_list = [] for job_id, job_index in job_ids: or_list.append({"uuid": job_id, "index": job_index}) query["$or"] = or_list if flow_ids: query["job.hosts"] = {"$in": flow_ids} if states: query["state"] = {"$in": [s.value for s in states]} if start_date: start_date_str = start_date.astimezone(timezone.utc) query["updated_on"] = {"$gte": start_date_str} if end_date: end_date_str = end_date.astimezone(timezone.utc) query["updated_on"]["$lte"] = end_date_str if locked: query["lock_id"] = {"$ne": None} if name: # Add the beginning of the line, so that it will match the string # exactly if no wildcard is given. Otherwise will match substrings. mongo_regex = "^" + fnmatch.translate(name).replace("\\\\", "\\") query["job.name"] = {"$regex": mongo_regex} if metadata: metadata_dict = {f"job.metadata.{k}": v for k, v in metadata.items()} query.update(metadata_dict) if workers: query["worker"] = {"$in": workers} return query def _build_query_flow( self, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, locked: bool = False, ) -> dict: """ Build a query to search for Flows, based on standard parameters. The Flows will need to satisfy all the defined conditions. Parameters ---------- job_ids One or more strings with uuids of Jobs belonging to the Flow. db_ids One or more db_ids of Jobs belonging to the Flow. flow_ids One or more Flow uuids. states One or more states of the Flows. start_date Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) locked If True only locked Flows will be selected. Returns ------- dict A dictionary with the query to be applied to a collection containing FlowDocs. """ if job_ids is not None and not isinstance(job_ids, (list, tuple)): job_ids = [job_ids] if db_ids is not None and not isinstance(db_ids, (list, tuple)): db_ids = [db_ids] if flow_ids is not None and not isinstance(flow_ids, (list, tuple)): flow_ids = [flow_ids] if isinstance(states, FlowState): states = [states] query: dict = {} if db_ids: # the "0" refers to the index in the ids list. # needs to be a string, but is correctly recognized by MongoDB query["ids"] = {"$elemMatch": {"0": {"$in": db_ids}}} if job_ids: query["jobs"] = {"$in": job_ids} if flow_ids: query["uuid"] = {"$in": flow_ids} if states: query["state"] = {"$in": [s.value for s in states]} if start_date: start_date_str = start_date.astimezone(timezone.utc) query["updated_on"] = {"$gte": start_date_str} if end_date: end_date_str = end_date.astimezone(timezone.utc) query["updated_on"] = {"$lte": end_date_str} if name: mongo_regex = "^" + fnmatch.translate(name).replace("\\\\", "\\") query["name"] = {"$regex": mongo_regex} if locked: query["lock_id"] = {"$ne": None} return query
[docs] def get_jobs_info_query(self, query: dict = None, **kwargs) -> list[JobInfo]: """ Get a list of JobInfo based on a generic query. Parameters ---------- query The query to be performed. kwargs arguments passed to MongoDB find(). Returns ------- list A list of JobInfo matching the criteria. """ data = self.jobs.find(query, projection=projection_job_info, **kwargs) return [JobInfo.from_query_output(d) for d in data]
[docs] def get_jobs_info( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple[str, int]] | None = None, limit: int = 0, ) -> list[JobInfo]: """ Query for Jobs based on standard parameters and return a list of JobInfo. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. locked If True only locked Jobs will be selected. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. sort A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions. limit Maximum number of entries to retrieve. 0 means no limit. Returns ------- list A list of JobInfo objects for the Jobs matching the criteria. """ query = self._build_query_job( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, locked=locked, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, ) return self.get_jobs_info_query(query=query, sort=sort, limit=limit)
[docs] def get_jobs_doc_query(self, query: dict = None, **kwargs) -> list[JobDoc]: """ Query for Jobs based on a generic filter and return a list of JobDoc. Parameters ---------- query A dictionary representing the filter. kwargs All arguments passed to pymongo's Collection.find() method. Returns ------- list A list of JobDoc objects for the Jobs matching the criteria. """ data = self.jobs.find(query, **kwargs) return [JobDoc.model_validate(d) for d in data]
[docs] def get_jobs_doc( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, ) -> list[JobDoc]: """ Query for Jobs based on standard parameters and return a list of JobDoc. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. locked If True only locked Jobs will be selected. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. sort A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions. limit Maximum number of entries to retrieve. 0 means no limit. Returns ------- list A list of JobDoc objects for the Jobs matching the criteria. """ query = self._build_query_job( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, locked=locked, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, ) return self.get_jobs_doc_query(query=query, sort=sort, limit=limit)
[docs] @staticmethod def generate_job_id_query( db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, ) -> tuple[dict, list | None]: """ Generate a query for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None the Job the sorting will be added to get the highest index. Returns ------- dict, list A dict and an optional list to be used as query and sort, respectively, in a query for a single Job. """ query: dict = {} sort: list | None = None if (job_id is None) == (db_id is None): raise ValueError( "One and only one among job_id and db_id should be defined" ) if db_id: query["db_id"] = db_id if job_id: query["uuid"] = job_id if job_index is None: # note: this format is suitable for collection.find(sort=.), # but not for $sort in an aggregation. sort = [["index", pymongo.DESCENDING]] else: query["index"] = job_index if not query: raise ValueError("At least one among db_id and job_id should be specified") return query, sort
[docs] def get_job_info( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, ) -> JobInfo | None: """ Get the JobInfo for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None the Job with the largest index will be selected. Returns ------- JobInfo A JobInfo, or None if no Job matches the criteria. """ query, sort = self.generate_job_id_query(db_id, job_id, job_index) data = list( self.jobs.find(query, projection=projection_job_info, sort=sort, limit=1) ) if not data: return None return JobInfo.from_query_output(data[0])
def _many_jobs_action( self, method: Callable, action_description: str, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, max_limit: int = 0, **method_kwargs, ) -> list[str]: """ Helper method to query Jobs based on criteria and sequentially apply an action on all those retrieved. Used to provide a common interface between all the methods that should be applied on a list of jobs sequentially. Parameters ---------- method The function that should be applied on a single Job. action_description A description of the action being performed. For logging purposes. job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states The state of the Jobs. locked If True only locked Jobs will be selected. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. max_limit The action will be applied to the Jobs only if the total number is lower than the specified limit. 0 means no limit. method_kwargs Kwargs passed to the method called on each Job Returns ------- list List of db_ids of the updated Jobs. """ filtering_options = [ job_ids, db_ids, flow_ids, states, start_date, end_date, name, metadata, workers, ] if custom_query and any(opt is not None for opt in filtering_options): raise ValueError( "The custom query option is incompatible with all the other filtering options" ) if custom_query: query = custom_query else: query = self._build_query_job( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, ) result = self.jobs.find(query, projection=["db_id"]) queried_dbs_ids = [r["db_id"] for r in result] if max_limit != 0 and len(queried_dbs_ids) > max_limit: raise ValueError( f"Cannot perform {action_description} on {len(queried_dbs_ids)} Jobs " f"as they exceeds the specified maximum limit ({max_limit})." f"Increase the limit to complete the action on this many Jobs." ) updated_ids = set() for db_id in queried_dbs_ids: try: job_updated_ids = method(db_id=db_id, **method_kwargs) if not isinstance(job_updated_ids, (list, tuple)): job_updated_ids = ( [] if job_updated_ids is None else [job_updated_ids] ) if job_updated_ids: updated_ids.update(job_updated_ids) except Exception: if raise_on_error: raise logger.exception(f"Error while {action_description} for job {db_id}") return list(updated_ids)
[docs] def rerun_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, force: bool = False, wait: int | None = None, break_lock: bool = False, ) -> list[str]: """ Rerun a list of selected Jobs, i.e. bring their state back to READY. See the docs of `rerun_job` for more details. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. force Bypass the limitation that only failed Jobs can be rerun. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- list List of db_ids of the updated Jobs. """ return self._many_jobs_action( method=self.rerun_job, action_description="rerunning", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, force=force, wait=wait, break_lock=break_lock, )
[docs] def rerun_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, force: bool = False, wait: int | None = None, break_lock: bool = False, ) -> list[str]: """ Rerun a single Job, i.e. bring its state back to READY. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. By default, only Jobs in one of the running states (CHECKED_OUT, UPLOADED, ...), in the REMOTE_ERROR state or FAILED with children in the READY or WAITING state can be rerun. This should guarantee that no unexpected inconsistencies due to dynamic Jobs generation should appear. This limitation can be bypassed with the `force` option. In any case, no Job with children with index > 1 can be rerun, as there is no sensible way of handling it. Rerunning a Job in a REMOTE_ERROR or on an intermediate STATE also results in a reset of the remote attempts and errors. When rerunning a Job in a SUBMITTED or RUNNING state the system also tries to cancel the process in the worker. Rerunning a FAILED Job also lead to change of state in its children. The full list of modified Jobs is returned. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None: the Job with the highest index. force Bypass the limitation that only Jobs in a certain state can be rerun. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- list List of db_ids of the updated Jobs. """ lock_filter, sort = self.generate_job_id_query(db_id, job_id, job_index) sleep = None if wait: sleep = 10 modified_jobs: list[str] = [] # the job to rerun is the last to be released since this prevents # a checkout of the job while the flow is still locked with self.lock_job( filter=lock_filter, break_lock=break_lock, sort=sort, projection=["uuid", "index", "db_id", "state"], sleep=sleep, max_wait=wait, get_locked_doc=True, ) as job_lock: job_doc_dict = job_lock.locked_document if not job_doc_dict: if job_lock.unavailable_document: raise JobLockedError.from_job_doc(job_lock.unavailable_document) raise ValueError(f"No Job document matching criteria {lock_filter}") job_state = JobState(job_doc_dict["state"]) if job_state in [JobState.READY]: raise ValueError("The Job is in the READY state. No need to rerun.") if job_state in RESETTABLE_STATES: # if in one of the resettable states no need to lock the flow or # update children. doc_update = self._reset_remote(job_doc_dict) modified_jobs = [] elif ( job_state not in [JobState.FAILED, JobState.REMOTE_ERROR] and not force ): raise ValueError( f"Job in state {job_doc_dict['state']} cannot be rerun. " "Use the 'force' option to override this check." ) else: # full restart required doc_update, modified_jobs = self._full_rerun( job_doc_dict, sleep=sleep, wait=wait, break_lock=break_lock, force=force, ) modified_jobs.append(job_doc_dict["db_id"]) set_doc = {"$set": doc_update} job_lock.update_on_release = set_doc return modified_jobs
def _full_rerun( self, doc: dict, sleep: int | None = None, wait: int | None = None, break_lock: bool = False, force: bool = False, ) -> tuple[dict, list[str]]: """ Perform the full rerun of Job, in case a Job is FAILED or in one of the usually not admissible states. This requires actions on the original Job's children and will need to acquire the lock on all of them as well as on the Flow. Parameters ---------- doc The dict of the JobDoc associated to the Job to rerun. Just the "uuid", "index", "db_id", "state" values are required. sleep Amounts of seconds to wait between checks that the lock has been released. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. force Bypass the limitation that only Jobs in a certain state can be rerun. Returns ------- dict, list Updates to be set on the rerun Job upon lock release and the list of db_ids of the modified Jobs. """ job_id = doc["uuid"] job_index = doc["index"] modified_jobs: list[str] = [] flow_filter = {"jobs": job_id} with self.lock_flow( filter=flow_filter, sleep=sleep, max_wait=wait, get_locked_doc=True, break_lock=break_lock, ) as flow_lock: if not flow_lock.locked_document: if flow_lock.unavailable_document: raise FlowLockedError.from_flow_doc(flow_lock.unavailable_document) raise ValueError(f"No Flow document matching criteria {flow_filter}") flow_doc = FlowDoc.model_validate(flow_lock.locked_document) # only the job with the largest index currently present in the db # can be rerun to avoid inconsistencies. (rerunning a smaller index # would still leave the job with larger indexes in the DB with no # clear way of how to deal with them) if max(flow_doc.ids_mapping[job_id]) > job_index: raise ValueError( f"Job {job_id} is not the highest index ({job_index}). " "Rerunning it will lead to inconsistencies and is not allowed." ) # check that the all the children only those with the largest index # in the flow are present. # If that is the case the rerun would lead to inconsistencies. # If only the last one is among the children it is acceptable # to rerun, but in case of a child with lower index a dynamical # action that cannot be reverted has been already applied. # Do not allow this even if force==True. # if not force, only the first level children need to be checked if not force: descendants = flow_doc.children.get(job_id, []) else: descendants = flow_doc.descendants(job_id) for dep_id, dep_index in descendants: if max(flow_doc.ids_mapping[dep_id]) > dep_index: raise ValueError( f"Job {job_id} has a child job ({dep_id}) which is not the last index ({dep_index}). " "Rerunning the Job will lead to inconsistencies and is not allowed." ) # TODO should STOPPED be acceptable? acceptable_child_states = [ JobState.READY.value, JobState.WAITING.value, JobState.PAUSED.value, ] # Update the state of the descendants updated_states: dict[str, dict[int, JobState]] = defaultdict(dict) with ExitStack() as stack: # first acquire the lock on all the descendants and # check their state if needed. Break immediately if # the lock cannot be acquired on one of the children # or if the states do not satisfy the requirements children_locks = [] for dep_id, dep_index in descendants: # TODO consider using the db_id for the query. may be faster? child_lock = stack.enter_context( self.lock_job( filter={"uuid": dep_id, "index": dep_index}, break_lock=break_lock, projection=["uuid", "index", "db_id", "state"], sleep=sleep, max_wait=wait, get_locked_doc=True, ) ) child_doc_dict = child_lock.locked_document if not child_doc_dict: if child_lock.unavailable_document: raise JobLockedError.from_job_doc( child_lock.unavailable_document, f"The parent Job with uuid {job_id} cannot be rerun", ) raise ValueError( f"The child of Job {job_id} to rerun with uuid {dep_id} and index {dep_index} could not be found in the database" ) # check that the children have not been started yet. # the only case being if some children allow failed parents. # Put a lock on each of the children, so that if they are READY # they will not be checked out if ( not force and child_doc_dict["state"] not in acceptable_child_states ): msg = ( f"The child of Job {job_id} to rerun with uuid {dep_id} and " f"index {dep_index} has state {child_doc_dict['state']} which " "is not acceptable. Use the 'force' option to override this check." ) raise ValueError(msg) children_locks.append(child_lock) # Here all the descendants are locked and could be set to WAITING. # Set the new state for all of them. for child_lock in children_locks: child_doc = child_lock.locked_document if child_doc["state"] != JobState.WAITING.value: modified_jobs.append(child_doc["db_id"]) child_doc_update = get_reset_job_base_dict() child_doc_update["state"] = JobState.WAITING.value child_lock.update_on_release = {"$set": child_doc_update} updated_states[child_doc["uuid"]][child_doc["index"]] = ( JobState.WAITING ) # if everything is fine here, update the state of the flow # before releasing its lock and set the update for the original job # pass explicitly the new state of the job, since it is not updated # in the DB. The Job is the last lock to be released. updated_states[job_id][job_index] = JobState.READY self.update_flow_state( flow_uuid=flow_doc.uuid, updated_states=updated_states ) job_doc_update = get_reset_job_base_dict() job_doc_update["state"] = JobState.READY.value return job_doc_update, modified_jobs def _reset_remote(self, doc: dict) -> dict: """ Simple reset of a Job in a running state or REMOTE_ERROR. Does not require additional locking on the Flow or other Jobs. Parameters ---------- doc The dict of the JobDoc associated to the Job to rerun. Just the "uuid", "index", "state" values are required. Returns ------- dict Updates to be set on the Job upon lock release. """ if doc["state"] in [JobState.SUBMITTED.value, JobState.RUNNING.value]: # try cancelling the job submitted to the remote queue try: self._cancel_queue_process(doc) except Exception: logger.warning( f"Failed cancelling the process for Job {doc['uuid']} {doc['index']}", exc_info=True, ) job_doc_update = get_reset_job_base_dict() job_doc_update["state"] = JobState.CHECKED_OUT.value return job_doc_update @deprecated( message="_set_job_properties will be removed. Use the set_job_doc_properties method instead" ) def _set_job_properties( self, values: dict, db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, use_pipeline: bool = False, ) -> str | None: """ Helper to set multiple values in a JobDoc while locking the Job. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. Parameters ---------- values Dictionary with the values to be set. Will be passed to a pymongo `update_one` method. db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None the Job with the largest index will be selected. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. acceptable_states List of JobState for which the Job values can be changed. If None all states are acceptable. use_pipeline if True a pipeline will be used in the update of the document Returns ------- str The db_id of the updated Job. None if the Job was not updated. """ return self.set_job_doc_properties( values=values, db_id=db_id, job_id=job_id, job_index=job_index, wait=wait, break_lock=break_lock, acceptable_states=acceptable_states, use_pipeline=use_pipeline, )
[docs] def set_job_doc_properties( self, values: dict, db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, use_pipeline: bool = False, ) -> str: """ Helper to set multiple values in a JobDoc while locking the Job. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. Parameters ---------- values Dictionary with the values to be set. Will be passed to a pymongo `update_one` method. db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None the Job with the largest index will be selected. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. acceptable_states List of JobState for which the Job values can be changed. If None all states are acceptable. use_pipeline if True a pipeline will be used in the update of the document Returns ------- str The db_id of the updated Job. None if the Job was not updated. """ sleep = None if wait: sleep = 10 lock_filter, sort = self.generate_job_id_query(db_id, job_id, job_index) projection = ["db_id", "uuid", "index", "state"] with self.lock_job( filter=lock_filter, break_lock=break_lock, sort=sort, sleep=sleep, max_wait=wait, projection=projection, ) as lock: doc = lock.locked_document if doc: if ( acceptable_states and JobState(doc["state"]) not in acceptable_states ): raise ValueError( f"Job in state {doc['state']}. The action cannot be performed" ) values = dict(values) # values["updated_on"] = datetime.utcnow() lock.update_on_release = ( [{"$set": values}] if use_pipeline else {"$set": values} ) return doc["db_id"] return None
[docs] def set_job_state( self, state: JobState, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, ) -> str: """ Set the state of a Job to an arbitrary JobState. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. No check is performed! Any job can be set to any state. Only for advanced users or for debugging purposes. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None the Job with the largest index will be selected. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Returns ------- str The db_id of the updated Job. None if the Job was not updated. """ values = { "state": state.value, "remote.step_attempts": 0, "remote.retry_time_limit": None, "previous_state": None, "remote.queue_state": None, "remote.error": None, "error": None, } return self.set_job_doc_properties( values=values, job_id=job_id, db_id=db_id, job_index=job_index, wait=wait, break_lock=break_lock, )
[docs] def retry_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False, ) -> list[str]: """ Retry selected Jobs, i.e. bring them back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- list List of db_ids of the updated Jobs. """ return self._many_jobs_action( method=self.retry_job, action_description="retrying", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, wait=wait, break_lock=break_lock, )
[docs] def retry_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, ) -> str: """ Retry a single Job, i.e. bring it back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state. Jobs in other states cannot be retried. The Job is selected by db_id or uuid+index. Only one among db_id and job_id should be defined. Only locking of the retried Job is required. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None: the Job with the highest index. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- str The db_id of the updated Job. """ lock_filter, sort = self.generate_job_id_query(db_id, job_id, job_index) sleep = None if wait: sleep = 10 with self.lock_job( filter=lock_filter, sort=sort, get_locked_doc=True, sleep=sleep, max_wait=wait, break_lock=break_lock, ) as lock: doc = lock.locked_document if not doc: if lock.unavailable_document: raise JobLockedError( f"The Job matching criteria {lock_filter} is locked." ) raise ValueError(f"No Job matching criteria {lock_filter}") state = JobState(doc["state"]) if state == JobState.REMOTE_ERROR: previous_state = doc["previous_state"] try: JobState(previous_state) except ValueError as exc: raise ValueError( f"The registered previous state: {previous_state} is not a valid state" ) from exc set_dict = get_reset_job_base_dict() set_dict["state"] = previous_state lock.update_on_release = {"$set": set_dict} elif state in RUNNING_STATES: set_dict = { "remote.step_attempts": 0, "remote.retry_time_limit": None, "remote.error": None, } lock.update_on_release = {"$set": set_dict} else: raise ValueError(f"Job in state {state.value} cannot be retried.") return doc["db_id"]
[docs] def pause_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, ) -> list[str]: """ Pause selected Jobs. Only READY and WAITING Jobs can be paused. The action is reversible. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. Returns ------- list List of db_ids of the updated Jobs. """ return self._many_jobs_action( method=self.pause_job, action_description="pausing", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, wait=wait, )
[docs] def stop_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False, ) -> list[str]: """ Stop selected Jobs. Only Jobs in the READY and all the running states can be stopped. The action is not reversible. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- list List of db_ids of the updated Jobs. """ return self._many_jobs_action( method=self.stop_job, action_description="stopping", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, wait=wait, break_lock=break_lock, )
[docs] def stop_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, ) -> str: """ Stop a single Job. Only Jobs in the READY and all the running states can be stopped. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is not reversible. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None: the Job with the highest index. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- str The db_id of the updated Job. """ job_lock_kwargs = dict( projection=["uuid", "index", "db_id", "state", "remote", "worker"] ) flow_lock_kwargs = dict(projection=["uuid"]) with self.lock_job_flow( acceptable_states=[JobState.READY, *RUNNING_STATES], job_id=job_id, db_id=db_id, job_index=job_index, wait=wait, break_lock=break_lock, job_lock_kwargs=job_lock_kwargs, flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document if job_doc is None: raise RuntimeError("No job document found in lock") job_state = JobState(job_doc["state"]) if job_state in [JobState.SUBMITTED.value, JobState.RUNNING.value]: # try cancelling the job submitted to the remote queue try: self._cancel_queue_process(job_doc) except Exception: logger.warning( f"Failed cancelling the process for Job {job_doc['uuid']} {job_doc['index']}", exc_info=True, ) job_id = job_doc["uuid"] job_index = job_doc["index"] updated_states = {job_id: {job_index: JobState.USER_STOPPED}} if flow_lock.locked_document is None: raise RuntimeError("No document found in flow lock") self.update_flow_state( flow_uuid=flow_lock.locked_document["uuid"], updated_states=updated_states, ) job_lock.update_on_release = { "$set": {"state": JobState.USER_STOPPED.value} } return_doc = job_lock.locked_document if return_doc is None: raise RuntimeError("No document found in final job lock") return return_doc["db_id"]
[docs] def pause_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, ) -> str: """ Pause a single Job. Only READY and WAITING Jobs can be paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is reversible. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None: the Job with the highest index. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. Returns ------- str The db_id of the updated Job. """ job_lock_kwargs = dict(projection=["uuid", "index", "db_id", "state"]) flow_lock_kwargs = dict(projection=["uuid"]) with self.lock_job_flow( acceptable_states=PAUSABLE_STATES, job_id=job_id, db_id=db_id, job_index=job_index, wait=wait, break_lock=False, job_lock_kwargs=job_lock_kwargs, flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document if job_doc is None: raise RuntimeError("No job document found in lock") job_id = job_doc["uuid"] job_index = job_doc["index"] updated_states = {job_id: {job_index: JobState.PAUSED}} flow_doc = flow_lock.locked_document if flow_doc is None: raise RuntimeError("No flow document found in lock") self.update_flow_state( flow_uuid=flow_doc["uuid"], updated_states=updated_states, ) job_lock.update_on_release = {"$set": {"state": JobState.PAUSED.value}} return_doc = job_lock.locked_document if return_doc is None: raise RuntimeError("No document found in final job lock") return return_doc["db_id"]
[docs] def play_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False, ) -> list[str]: """ Restart selected Jobs that were previously paused. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- list List of db_ids of the updated Jobs. """ return self._many_jobs_action( method=self.play_job, action_description="playing", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, wait=wait, break_lock=break_lock, )
[docs] def play_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, ) -> str: """ Restart a single Jobs that was previously paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. Parameters ---------- db_id The db_id of the Job. job_id The uuid of the Job. job_index The index of the Job. If None: the Job with the highest index. wait In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. break_lock Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB. Returns ------- str The db_id of the updated Job. """ job_lock_kwargs = dict( projection=["uuid", "index", "db_id", "state", "job.config", "parents"] ) flow_lock_kwargs = dict(projection=["uuid"]) with self.lock_job_flow( acceptable_states=[JobState.PAUSED], job_id=job_id, db_id=db_id, job_index=job_index, wait=wait, break_lock=break_lock, job_lock_kwargs=job_lock_kwargs, flow_lock_kwargs=flow_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document if job_doc is None: raise RuntimeError("No job document found in lock") job_id = job_doc["uuid"] job_index = job_doc["index"] on_missing = job_doc["job"]["config"]["on_missing_references"] allow_failed = on_missing != OnMissing.ERROR.value # in principle the lock on each of the parent jobs is not needed # since a parent Job cannot change to COMPLETED or FAILED while # the flow is locked for parent in self.jobs.find( {"uuid": {"$in": job_doc["parents"]}}, projection=["state"] ): parent_state = JobState(parent["state"]) if parent_state != JobState.COMPLETED: if parent_state == JobState.FAILED and allow_failed: continue final_state = JobState.WAITING break else: final_state = JobState.READY updated_states = {job_id: {job_index: final_state}} self.update_flow_state( flow_uuid=flow_lock.locked_document["uuid"], updated_states=updated_states, ) job_lock.update_on_release = {"$set": {"state": final_state.value}} return job_lock.locked_document["db_id"]
[docs] def set_job_run_properties( self, worker: str | None = None, exec_config: str | ExecutionConfig | dict | None = None, resources: dict | QResources | None = None, update: bool = True, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, ) -> list[str]: """ Set execution properties for selected Jobs: worker, exec_config and resources. Parameters ---------- worker The name of the worker to set. exec_config The name of the exec_config to set or an explicit value of ExecutionConfig or dict. resources The resources to be set, either as a dict or a QResources instance. update If True, when setting exec_config and resources a passed dictionary will be used to update already existing values. If False it will replace the original values. job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. Returns ------- list List of db_ids of the updated Jobs. """ set_dict: dict[str, Any] = {} if worker: if worker not in self.project.workers: raise ValueError(f"worker {worker} is not present in the project") set_dict["worker"] = worker if exec_config: if ( isinstance(exec_config, str) and exec_config not in self.project.exec_config ): raise ValueError( f"exec_config {exec_config} is not present in the project" ) if isinstance(exec_config, ExecutionConfig): exec_config = exec_config.model_dump() if update and isinstance(exec_config, dict): # if the content is a string replace even if it is an update, # merging is meaningless cond = { "$cond": { "if": {"$eq": [{"$type": "$exec_config"}, "string"]}, "then": exec_config, "else": {"$mergeObjects": ["$exec_config", exec_config]}, } } set_dict["exec_config"] = cond else: set_dict["exec_config"] = exec_config if resources: if isinstance(resources, QResources): resources = resources.as_dict() # if passing a QResources it is pointless to update # all the keywords will be overwritten and if the previous # value was a generic dictionary the merged dictionary will fail # almost surely lead to failures update = False if update: set_dict["resources"] = {"$mergeObjects": ["$resources", resources]} else: set_dict["resources"] = resources acceptable_states = [ JobState.READY, JobState.WAITING, JobState.COMPLETED, JobState.FAILED, JobState.PAUSED, JobState.REMOTE_ERROR, ] return self._many_jobs_action( method=self.set_job_doc_properties, action_description="setting", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, values=set_dict, acceptable_states=acceptable_states, use_pipeline=update, )
[docs] def get_flow_job_aggreg( self, query: dict | None = None, projection_flow: dict | None = None, projection_job: dict | None = None, sort: list[tuple] | None = None, limit: int = 0, ) -> list[dict]: """ Retrieve data about Flows and all their Jobs through an aggregation. In the aggregation the list of Jobs are identified as `jobs_list`. Parameters ---------- query A dictionary representing the filter. projection_flow Projection of the fields for the Flow document passed to the aggregation. projection_job Projection of the fields for the Job document used in the $lookup. sort A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions. limit Maximum number of entries to retrieve. 0 means no limit. Returns ------- list The list of dictionaries resulting from the query. """ pipeline: list[dict] = [ { "$lookup": { "from": self.jobs_collection, "localField": "jobs", "foreignField": "uuid", "as": "jobs_list", } } ] if query: pipeline.append({"$match": query}) if projection_flow: pipeline.append({"$project": projection_flow}) if projection_job: # insert the pipeline for the projection of the Job fields # to reduce the impact of the size of the documents. # This can help reducing the size of the fetched documents and # avoid exceeding the maximum size allowed. Adding the projection # in the general pipeline does not have the same effect. pipeline[0]["$lookup"]["pipeline"] = [{"$project": projection_job}] # if the additional projection is set, the keys need to be specified # in that part of the pipeline as well. if projection_flow: for k in projection_job: pipeline[-1]["$project"][f"jobs_list.{k}"] = 1 if sort: pipeline.append({"$sort": dict(sort)}) if limit: pipeline.append({"$limit": limit}) return list(self.flows.aggregate(pipeline))
[docs] def get_flows_info( self, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, full: bool = False, ) -> list[FlowInfo]: """ Query for Flows based on standard parameters and return a list of FlowInfo. Parameters ---------- job_ids One or more strings with uuids of Jobs belonging to the Flow. db_ids One or more db_ids of Jobs belonging to the Flow. flow_ids One or more Flow uuids. states One or more states of the Flow. start_date Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) locked If True only locked Flows will be selected. sort A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions. limit Maximum number of entries to retrieve. 0 means no limit. full If True data is fetched from both the Flow collection and Job collection with an aggregate. Otherwise, only the Job information in the Flow document will be used. Returns ------- list A list of FlowInfo. """ query = self._build_query_flow( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, locked=locked, ) # Only use the full aggregation if more job details are needed. # The single flow document is enough for basic information if full: projection_job = {f: 1 for f in projection_flow_info_jobs} projection_flow = {k: 1 for k in FlowDoc.model_fields} data = self.get_flow_job_aggreg( query=query, sort=sort, limit=limit, projection_flow=projection_flow, projection_job=projection_job, ) else: data = list(self.flows.find(query, sort=sort, limit=limit)) return [FlowInfo.from_query_dict(d) for d in data]
[docs] def delete_flows( self, flow_ids: str | list[str] | None = None, max_limit: int = 10, delete_output: bool = False, delete_files: bool = False, ) -> int: """ Delete a list of Flows based on the flow uuids. Parameters ---------- flow_ids One or more Flow uuids. max_limit The Flows will be deleted only if the total number is lower than the specified limit. 0 means no limit. delete_output If True also delete the associated output in the JobStore. delete_files If True also delete the files on the worker. Returns ------- int Number of deleted Flows. """ if isinstance(flow_ids, str): flow_ids = [flow_ids] if flow_ids is None: flow_ids = [f["uuid"] for f in self.flows.find({}, projection=["uuid"])] if max_limit != 0 and len(flow_ids) > max_limit: raise ValueError( f"Cannot delete {len(flow_ids)} Flows as they exceeds the specified maximum " f"limit ({max_limit}). Increase the limit to delete the Flows." ) deleted = 0 for fid in flow_ids: # TODO should it catch errors? if self.delete_flow( fid, delete_output=delete_output, delete_files=delete_files ): deleted += 1 return deleted
[docs] def delete_flow( self, flow_id: str, delete_output: bool = False, delete_files: bool = False, ) -> bool: """ Delete a single Flow based on the uuid. Parameters ---------- flow_id One or more Flow uuids. delete_output If True also delete the associated output in the JobStore. delete_files If True also delete the files on the worker. Returns ------- bool True if the flow has been deleted. """ # TODO should this lock anything (FW does not lock)? flow = self.get_flow_info_by_flow_uuid(flow_id) if not flow: return False job_ids = flow["jobs"] if delete_output: self.jobstore.remove_docs({"uuid": {"$in": job_ids}}) if delete_files: jobs_info = self.get_jobs_info(flow_ids=[flow_id]) self._safe_delete_files(jobs_info) self.jobs.delete_many({"uuid": {"$in": job_ids}}) self.flows.delete_one({"uuid": flow_id}) return True
[docs] def unlock_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, ) -> int: """ Forcibly remove the lock on a locked Job document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. Returns ------- int Number of modified Jobs. """ query = self._build_query_job( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, locked=True, name=name, metadata=metadata, workers=workers, ) result = self.jobs.update_many( filter=query, update={"$set": {"lock_id": None, "lock_time": None}}, ) return result.modified_count
def _safe_delete_files(self, jobs_info: list[JobInfo]) -> list[JobInfo]: """ Delete the files associated to the selected Jobs. Checks that the folder to be deleted contains the jfremote_in.json file to avoid mistakenly deleting other folders. Parameters ---------- jobs_info A list of JobInfo whose files should be deleted. Returns ------- list The list of JobInfo whose files have been actually deleted. """ hosts: dict[str, BaseHost] = {} deleted = [] for job_info in jobs_info: if job_info.run_dir: if job_info.worker in hosts: host = hosts[job_info.worker] else: host = self.project.workers[job_info.worker].get_host() hosts[job_info.worker] = host host.connect() remote_files = host.listdir(job_info.run_dir) # safety measure to avoid mistakenly deleting other folders # maybe too much? if any( fn in remote_files for fn in ("jfremote_in.json", "jfremote_in.json.gz") ): if host.rmtree(path=job_info.run_dir, raise_on_error=False): deleted.append(job_info) else: logger.warning( f"Did not delete folder {job_info.run_dir} " f"since it may not contain a jobflow-remote execution", ) for host in hosts.values(): try: host.close() except Exception: pass return deleted
[docs] def unlock_flows( self, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, ) -> int: """ Forcibly remove the lock on a locked Flow document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies. Parameters ---------- job_ids One or more strings with uuids of Jobs belonging to the Flow. db_ids One or more db_ids of Jobs belonging to the Flow. flow_ids One or more Flow uuids. states One or more states of the Flows. start_date Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) Returns ------- int Number of modified Flows. """ query = self._build_query_flow( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, locked=True, name=name, ) result = self.flows.update_many( filter=query, update={"$set": {"lock_id": None, "lock_time": None}}, ) return result.modified_count
[docs] def reset(self, reset_output: bool = False, max_limit: int = 25) -> bool: """ Reset the content of the queue database and builds the indexes. Optionally deletes the content of the JobStore with the outputs. In this case all the data contained in the JobStore will be removed, not just those associated to the data in the queue. Parameters ---------- reset_output If True also reset the JobStore containing the outputs. max_limit Maximum number of Flows present in the DB. If number is larger the database will not be reset. Set 0 for not limit. Returns ------- bool True if the database was reset, False otherwise. """ # TODO should it just delete docs related to job removed in the reset? # what if the outputs are in other stores? Should take those as well if max_limit: n_flows = self.flows.count_documents({}) if n_flows >= max_limit: logger.warning( f"The database contains {n_flows} flows and will not be reset. " "Increase the max_limit value or set it to 0" ) return False if reset_output: self.jobstore.remove_docs({}) self.jobs.drop() self.flows.drop() self.auxiliary.drop() self.auxiliary.insert_one({"next_id": 1}) self.build_indexes(drop=True) return True
[docs] def build_indexes( self, background: bool = True, job_custom_indexes: list[str | list] | None = None, flow_custom_indexes: list[str | list] | None = None, drop: bool = False, ) -> None: """ Build indexes in the database. Parameters ---------- background If True, the indexes should be created in the background. job_custom_indexes List of custom indexes for the jobs collection. Each element is passed to pymongo's create_index, thus following those conventions. flow_custom_indexes List of custom indexes for the flows collection. Same as job_custom_indexes. drop If True all existing indexes in the collections will be dropped. """ if drop: self.jobs.drop_indexes() self.flows.drop_indexes() self.auxiliary.drop_indexes() self.jobs.create_index("db_id", unique=True, background=background) self.jobs.create_index( [("uuid", pymongo.ASCENDING), ("index", pymongo.ASCENDING)], unique=True, background=background, ) job_indexes = [ "uuid", "index", "state", "updated_on", "name", "worker", [("priority", pymongo.DESCENDING)], ["state", "remote.retry_time_limit"], ] for f in job_indexes: self.jobs.create_index(f, background=background) if job_custom_indexes: for idx in job_custom_indexes: self.jobs.create_index(idx, background=background) self.flows.create_index("uuid", unique=True, background=background) flow_indexes = [ "name", "state", "updated_on", "ids", "jobs", ] for idx in flow_indexes: self.flows.create_index(idx, background=background) if flow_custom_indexes: for idx in flow_custom_indexes: self.flows.create_index(idx, background=background)
[docs] def create_indexes( self, indexes: list[str | list], collection: DbCollection = DbCollection.JOBS, unique: bool = False, background: bool = True, ) -> None: """ Build the selected indexes Parameters ---------- indexes List of indexes to be added to the collection. Each element is passed to pymongo's create_index, thus following those conventions. collection The collection where the index will be created. unique background If True, the indexes should be created in the background. """ coll = self.get_collection(collection) for idx in indexes: coll.create_index(idx, background=background, unique=unique)
[docs] def get_collection(self, collection: DbCollection): """ Return the internal collection corresponding to the selected DbCollection. Parameters ---------- collection The collection selected. Returns ------- The internal instance of the MongoDB collection. """ return { DbCollection.JOBS: self.jobs, DbCollection.FLOWS: self.flows, DbCollection.AUX: self.auxiliary, }[collection]
[docs] def compact(self) -> None: """Compact jobs and flows collections in MongoDB.""" self.db.command({"compact": self.jobs_collection}) self.db.command({"compact": self.flows_collection})
[docs] def get_flow_info_by_flow_uuid( self, flow_uuid: str, projection: list | dict | None = None ): return self.flows.find_one({"uuid": flow_uuid}, projection=projection)
[docs] def get_flow_info_by_job_uuid( self, job_uuid: str, projection: list | dict | None = None ): return self.flows.find_one({"jobs": job_uuid}, projection=projection)
[docs] def get_job_info_by_job_uuid( self, job_uuid: str, job_index: int | str = "last", projection: list | dict | None = None, ): query: dict[str, Any] = {"uuid": job_uuid} sort = None if isinstance(job_index, int): query["index"] = job_index elif job_index == "last": sort = {"index": -1} else: raise ValueError(f"job_index value: {job_index} is not supported") return self.jobs.find_one(query, projection=projection, sort=sort)
[docs] def get_job_info_by_pid(self, pid: int | str) -> JobInfo | None: """ Retrieve job information by process ID (e.g., Slurm job ID). Args: pid (int): The process ID of the job in the queue system. Returns: JobInfo | None: Job information if found, None otherwise. """ query = {"remote.process_id": str(pid)} jobs_info = self.get_jobs_info_query(query=query, limit=1) if jobs_info: return jobs_info[0] return None
[docs] def get_job_doc( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, ) -> JobDoc | None: query, sort = self.generate_job_id_query(db_id, job_id, job_index) data = list(self.jobs.find(query, sort=sort, limit=1)) if not data: return None return JobDoc.model_validate(data[0])
[docs] def get_jobs(self, query, projection: list | dict | None = None): return list(self.jobs.find(query, projection=projection))
[docs] def count_jobs( self, query: dict | None = None, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, locked: bool = False, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, ) -> int: """ Count Jobs based on filters. Parameters ---------- query A generic query. Will override all the other parameters. job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. locked If True only locked Jobs will be selected. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. Returns ------- int Number of Jobs matching the criteria. """ if query is None: query = self._build_query_job( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, locked=locked, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, ) return self.jobs.count_documents(query)
[docs] def count_flows( self, query: dict | None = None, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, ) -> int: """ Count flows based on filter parameters. Parameters ---------- query A generic query. Will override all the other parameters. job_ids One or more strings with uuids of Jobs belonging to the Flow. db_ids One or more db_ids of Jobs belonging to the Flow. flow_ids One or more Flow uuids. states One or more states of the Flows. start_date Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) Returns ------- int Number of Flows matching the criteria. """ if not query: query = self._build_query_flow( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, ) return self.flows.count_documents(query)
[docs] def get_jobs_info_by_flow_uuid( self, flow_uuid, projection: list | dict | None = None ): query = {"job.hosts": flow_uuid} return list(self.jobs.find(query, projection=projection))
[docs] def add_flow( self, flow: jobflow.Flow | jobflow.Job | list[jobflow.Job], worker: str, allow_external_references: bool = False, exec_config: ExecutionConfig | None = None, resources: dict | QResources | None = None, ) -> list[str]: from jobflow.core.flow import get_flow flow = get_flow(flow, allow_external_references=allow_external_references) jobs_list = list(flow.iterflow()) job_dicts = [] n_jobs = len(jobs_list) doc_next_id = self.auxiliary.find_one_and_update( {"next_id": {"$exists": True}}, {"$inc": {"next_id": n_jobs}} ) if doc_next_id is None: raise ValueError( "It seems that the database has not been initialised. If that is the" " case run `jf admin reset` or use the reset() method of JobController" ) first_id = doc_next_id["next_id"] db_ids = [] for (job, parents), db_id_int in zip( jobs_list, range(first_id, first_id + n_jobs) ): prefix = self.project.queue.db_id_prefix or "" db_id = f"{prefix}{db_id_int}" db_ids.append(db_id) job_dicts.append( get_initial_job_doc_dict( job, parents, db_id, worker=worker, exec_config=exec_config, resources=resources, ) ) flow_doc = get_initial_flow_doc_dict(flow, job_dicts) # inserting first the flow document and, iteratively, all the jobs # should not lead to inconsistencies in the states, even if one of # the jobs is checked out in the meanwhile. The opposite could lead # to errors. try: self.flows.insert_one(flow_doc) self.jobs.insert_many(job_dicts) except pymongo.errors.DuplicateKeyError as exc: # note that if the same Flow is inserted twice, the error will happen # during the insert_one(flow_doc), so no inconsistency will be left in # the DB. Since jobflow prevents the same job to belong to two different # Flows, the only apparently reasonable case where the flow uuid is # different but a job has a duplicated uuid is a random clash between # two generated uuid, which should have a negligible probability of # occurring. If a problem shows up a solution might be enclosing the # previous two operations in a transaction. raise ValueError( "A duplicate key error happened while inserting the flow. Check " "that you are not trying to submit the same flow multiple times." ) from exc logger.info(f"Added flow ({flow.uuid}) with jobs: {flow.job_uuids}") return db_ids
def _append_flow( self, job_doc: dict, flow_dict: dict, new_flow_dict: dict, worker: str, response_type: DynamicResponseType, exec_config: ExecutionConfig | None = None, resources: QResources | None = None, ) -> None: from jobflow import Flow, Job decoder = MontyDecoder() def deserialize_partial_flow(in_dict: dict): """ Recursively deserialize a Flow dictionary, avoiding the deserialization of all the elements that may require external packages. """ if in_dict.get("@class") == "Flow": jobs = [deserialize_partial_flow(d) for d in in_dict.get("jobs")] flow_init = { k: v for k, v in in_dict.items() if k not in ("@module", "@class", "@version", "job") } flow_init["jobs"] = jobs return Flow(**flow_init) # if it is not a Flow, should be a Job job_init = { k: v for k, v in in_dict.items() if k not in ("@module", "@class", "@version") } job_init["config"] = decoder.process_decoded(job_init["config"]) return Job(**job_init) # It is sure that the new_flow_dict is a serialized Flow (and not Job # or list[Job]), because the get_flow has already been applied at run # time, during the remote execution. # Recursive deserialize the Flow without deserializing function and # arguments to take advantage of standard Flow/Job methods. new_flow = deserialize_partial_flow(new_flow_dict) # get job parents. Job parents are identified only by their uuid. if response_type == DynamicResponseType.REPLACE: job_parents = job_doc["parents"] else: job_parents = [job_doc["uuid"]] # add new jobs to flow flow_dict = dict(flow_dict) flow_updates: dict[str, dict[str, Any]] = { "$addToSet": {"jobs": {"$each": new_flow.job_uuids}}, "$push": {}, } # add new jobs jobs_list = list(new_flow.iterflow()) n_new_jobs = len(jobs_list) first_id = self.auxiliary.find_one_and_update( {"next_id": {"$exists": True}}, {"$inc": {"next_id": n_new_jobs}} )["next_id"] job_dicts = [] flow_updates["$set"] = {} ids_to_push = [] for (job, parents), db_id_int in zip( jobs_list, range(first_id, first_id + n_new_jobs) ): prefix = self.project.queue.db_id_prefix or "" db_id = f"{prefix}{db_id_int}" # inherit the parents of the job to which we are appending parents = parents if parents else job_parents # noqa: PLW2901 job_dicts.append( get_initial_job_doc_dict( job, parents, db_id, worker=worker, exec_config=exec_config, resources=resources, ) ) flow_updates["$set"][f"parents.{job.uuid}.{job.index}"] = parents ids_to_push.append((job_dicts[-1]["db_id"], job.uuid, job.index)) flow_updates["$push"]["ids"] = {"$each": ids_to_push} if response_type == DynamicResponseType.DETOUR: # if detour, update the parents of the child jobs leaf_uuids = [v for v, d in new_flow.graph.out_degree() if d == 0] self.jobs.update_many( {"parents": job_doc["uuid"]}, {"$push": {"parents": {"$each": leaf_uuids}}}, ) # flow_dict["updated_on"] = datetime.utcnow() flow_updates["$set"]["updated_on"] = datetime.utcnow() # TODO, this could be replaced by the actual change, instead of the replace self.flows.update_one({"uuid": flow_dict["uuid"]}, flow_updates) self.jobs.insert_many(job_dicts) logger.info(f"Appended flow ({new_flow.uuid}) with jobs: {new_flow.job_uuids}")
[docs] def checkout_job( self, query=None, flow_uuid: str = None, sort: list[tuple[str, int]] | None = None, ) -> tuple[str, int] | None: """ Check out one job. Set the job state from READY to CHECKED_OUT with an atomic update. Flow state is also updated if needed. NB: flow is not locked during the checkout at any time. Does not require lock of the Job document. """ # comment on locking: lock during check out may serve two purposes: # 1) update the state of the Flow object. With the conditional set # this should be fine even without locking # 2) to prevent checking out jobs while other processes may be working # on the same flow. (e.g. while rerunning a parent of a READY child, # it would be necessary that the job is not started in the meanwhile). # Without a full Flow lock this case may show up. # For the time being do not lock the flow and check if issues are arising. query = {} if query is None else dict(query) query.update({"state": JobState.READY.value}) if flow_uuid is not None: # if flow uuid provided, only include job ids in that flow flow_out = self.get_flow_info_by_flow_uuid(flow_uuid, ["jobs"]) if not flow_out: return None job_uuids = flow_out["jobs"] query["uuid"] = {"$in": job_uuids} if sort is None: sort = [("priority", pymongo.DESCENDING), ("created_on", pymongo.ASCENDING)] result = self.jobs.find_one_and_update( query, { "$set": { "state": JobState.CHECKED_OUT.value, "updated_on": datetime.utcnow(), } }, projection=["uuid", "index"], sort=sort, # return_document=ReturnDocument.AFTER, ) if not result: return None reserved_uuid = result["uuid"] reserved_index = result["index"] # update flow state. If it is READY switch its state, otherwise no change # to the state. The operation is atomic. # Filtering on the index is not needed state_cond = { "$cond": { "if": {"$eq": ["$state", "READY"]}, "then": "RUNNING", "else": "$state", } } updated_cond = { "$cond": { "if": {"$eq": ["$state", "READY"]}, "then": datetime.utcnow(), "else": "$updated_on", } } self.flows.find_one_and_update( {"jobs": reserved_uuid}, [{"$set": {"state": state_cond, "updated_on": updated_cond}}], ) return reserved_uuid, reserved_index
# TODO if jobstore is not an option anymore, the "store" argument # can be removed and just use self.jobstore.
[docs] def complete_job( self, job_doc: dict, local_path: Path | str, store: JobStore ) -> bool: # Don't sleep if the flow is locked. Only the Runner should call this, # and it will handle the fact of having a locked Flow. # Lock before reading the data. locks the Flow for a longer time, but # avoids parsing (potentially large) files to discover that the flow is # already locked. with self.lock_flow( filter={"jobs": job_doc["uuid"]}, get_locked_doc=True ) as flow_lock: if flow_lock.locked_document: local_path = Path(local_path) out_path = local_path / OUT_FILENAME host_flow_id = job_doc["job"]["hosts"][-1] if not out_path.exists(): msg = ( f"The output file {OUT_FILENAME} was not present in the download " f"folder {local_path} and it is required to complete the job" ) self.checkin_job( job_doc, flow_lock.locked_document, response=None, error=msg ) self.update_flow_state(host_flow_id) return True # do not deserialize the response or stored data, saves time and # avoids the need for packages to be installed. out = loadfn(out_path, cls=None) decoder = MontyDecoder() doc_update = {"start_time": decoder.process_decoded(out["start_time"])} # update the time of the JobDoc, will be used in the checking end_time = decoder.process_decoded(out.get("end_time")) if end_time: doc_update["end_time"] = end_time error = out.get("error") if error: self.checkin_job( job_doc, flow_lock.locked_document, response=None, error=error, doc_update=doc_update, ) self.update_flow_state(host_flow_id) return True response = out.get("response") if not response: msg = ( f"The output file {OUT_FILENAME} was downloaded, but it does " "not contain the response. The job was likely killed " "before completing" ) self.checkin_job( job_doc, flow_lock.locked_document, response=None, error=msg, doc_update=doc_update, ) self.update_flow_state(host_flow_id) return True remote_store = get_remote_store( store, local_path, self.project.remote_jobstore ) update_store(store, remote_store, job_doc["db_id"]) self.checkin_job( job_doc, flow_lock.locked_document, response=response, doc_update=doc_update, ) self.update_flow_state(host_flow_id) return True if flow_lock.unavailable_document: # raising the error if the lock could not be acquired leaves # the caller handle the issue. In general, it should be the # runner, that will retry at a later time. raise FlowLockedError.from_flow_doc( flow_lock.unavailable_document, "Could not complete the job" ) return False
[docs] def checkin_job( self, job_doc: dict, flow_dict: dict, response: dict | None, error: str | None = None, doc_update: dict | None = None, ): stored_data = None if response is None: new_state = JobState.FAILED.value # handle response else: new_state = JobState.COMPLETED.value if response["replace"] is not None: self._append_flow( job_doc, flow_dict, response["replace"], response_type=DynamicResponseType.REPLACE, worker=job_doc["worker"], exec_config=job_doc["exec_config"], resources=job_doc["resources"], ) if response["addition"] is not None: self._append_flow( job_doc, flow_dict, response["addition"], response_type=DynamicResponseType.ADDITION, worker=job_doc["worker"], exec_config=job_doc["exec_config"], resources=job_doc["resources"], ) if response["detour"] is not None: self._append_flow( job_doc, flow_dict, response["detour"], response_type=DynamicResponseType.DETOUR, worker=job_doc["worker"], exec_config=job_doc["exec_config"], resources=job_doc["resources"], ) if response["stored_data"] is not None: stored_data = response["stored_data"] if response["stop_children"]: self.stop_children(job_doc["uuid"]) if response["stop_jobflow"]: self.stop_jobflow(job_uuid=job_doc["uuid"]) if not doc_update: doc_update = {} doc_update.update( {"state": new_state, "stored_data": stored_data, "error": error} ) result = self.jobs.update_one( {"uuid": job_doc["uuid"], "index": job_doc["index"]}, {"$set": doc_update} ) if result.modified_count == 0: raise RuntimeError( f"The job {job_doc['uuid']} index {job_doc['index']} has not been updated in the database" ) # TODO it should be fine to replace this query by constructing the list of # job uuids from the original + those added. Should be verified. job_uuids = self.get_flow_info_by_job_uuid(job_doc["uuid"], ["jobs"])["jobs"] return len(self.refresh_children(job_uuids)) + 1
# TODO should this refresh all the kind of states? Or just set to ready?
[docs] def refresh_children(self, job_uuids: list[str]) -> list[str]: """ Set the state of Jobs children to READY following the completion of a Job. Parameters ---------- job_uuids List of Jobs uuids belonging to a Flow. Returns ------- List of db_ids of modified Jobs. """ # go through and look for jobs whose state we can update to ready. # Need to ensure that all parent uuids with all indices are completed # first find state of all jobs; ensure larger indices are returned last. flow_jobs = self.jobs.find( {"uuid": {"$in": job_uuids}}, sort=[("index", 1)], projection=["uuid", "index", "parents", "state", "job.config", "db_id"], ) # the mapping only contains jobs with the larger index jobs_mapping = {j["uuid"]: j for j in flow_jobs} # Now find jobs that are queued and whose parents are all completed # (or allowed to fail) and ready them. Assume that none of the children # can be in a running state and thus no need to lock them. to_ready = [] for job in jobs_mapping.values(): allowed_states = [JobState.COMPLETED.value] on_missing_ref = ( job.get("job", {}).get("config", {}).get("on_missing_references", None) ) if on_missing_ref == jobflow.OnMissing.NONE.value: allowed_states.extend( (JobState.FAILED.value, JobState.USER_STOPPED.value) ) if job["state"] == JobState.WAITING.value and all( jobs_mapping[p]["state"] in allowed_states for p in job["parents"] ): # Use the db_id to identify the children, since the uuid alone is not # enough in some cases. to_ready.append(job["db_id"]) # Here it is assuming that there will be only one job with each uuid, as # it should be when switching state to READY the first time. # The code forbids rerunning a job that have children with index larger than 1, # so this should always be consistent. if len(to_ready) > 0: self.jobs.update_many( {"db_id": {"$in": to_ready}}, {"$set": {"state": JobState.READY.value}} ) return to_ready
[docs] def stop_children(self, job_uuid: str) -> int: """ Stop the direct children of a Job in the WAITING state. Parameters ---------- job_uuid The uuid of the Job. Returns ------- The number of modified Jobs. """ result = self.jobs.update_many( {"parents": job_uuid, "state": JobState.WAITING.value}, {"$set": {"state": JobState.STOPPED.value}}, ) return result.modified_count
[docs] def stop_jobflow(self, job_uuid: str = None, flow_uuid: str = None) -> int: """ Stop all the WAITING Jobs in a Flow. Parameters ---------- job_uuid The uuid of Job to identify the Flow. Incompatible with flow_uuid. flow_uuid The Flow uuid. Incompatible with job_uuid. Returns ------- The number of modified Jobs. """ if job_uuid is None and flow_uuid is None: raise ValueError("Either job_uuid or flow_uuid must be set.") if job_uuid is not None and flow_uuid is not None: raise ValueError("Only one of job_uuid and flow_uuid should be set.") criteria = {"uuid": flow_uuid} if job_uuid is None else {"jobs": job_uuid} # get uuids of jobs in the flow flow_dict = self.flows.find_one(criteria, projection=["jobs"]) if not flow_dict: return 0 job_uuids = flow_dict["jobs"] result = self.jobs.update_many( {"uuid": {"$in": job_uuids}, "state": JobState.WAITING.value}, {"$set": {"state": JobState.STOPPED.value}}, ) return result.modified_count
[docs] def get_job_uuids(self, flow_uuids: list[str]) -> list[str]: """ Get the list of Jobs belonging to Flows, based on their uuid. Parameters ---------- flow_uuids A list of Flow uuids. Returns ------- A list of uuids of Jobs belong to the selected Flows. """ job_uuids = [] for flow in self.flows.find_one( {"uuid": {"$in": flow_uuids}}, projection=["jobs"] ): job_uuids.extend(flow["jobs"]) return job_uuids
[docs] def get_flow_jobs_data( self, query: dict | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0, ) -> list[dict]: """ Get the data of Flows and their Jobs from the DB using an aggregation. In the aggregation the Jobs are identified as "jobs". Parameters ---------- query The query to filter the Flow. projection The projection for the Flow and Job data. sort Sorting passed to the aggregation. limit The maximum number of results returned. Returns ------- A list of dictionaries with the result of the query. """ pipeline: list[dict] = [ { "$lookup": { "from": self.jobs_collection, "localField": "jobs", "foreignField": "uuid", "as": "jobs", } } ] if query: pipeline.append({"$match": query}) if projection: pipeline.append({"$project": projection}) if sort: pipeline.append({"$sort": dict(sort)}) if limit: pipeline.append({"$limit": limit}) return list(self.flows.aggregate(pipeline))
[docs] def update_flow_state( self, flow_uuid: str, updated_states: dict[str, dict[int, JobState | None]] | None = None, ) -> None: """ Update the state of a Flow in the DB based on the Job's states. The Flow should be locked while performing this operation. Parameters ---------- flow_uuid The uuid of the Flow to update. updated_states A dictionary with the updated states of Jobs that have not been stored in the DB yet. In the form {job_uuid: JobState value}. If the value is None the Job is considered deleted and the state of that Job will be ignored while determining the state of the whole Flow. """ updated_states = updated_states or {} projection = ["uuid", "index", "parents", "state"] flow_jobs = self.get_jobs_info_by_flow_uuid( flow_uuid=flow_uuid, projection=projection ) # update the full list of states and those of the leafs according # to the updated_states passed. # Ignore the Jobs for which the updated_states value is None. jobs_states = [ updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"])) for j in flow_jobs if updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"])) is not None ] leafs = get_flow_leafs(flow_jobs) leaf_states = [ updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"])) for j in leafs if updated_states.get(j["uuid"], {}).get(j["index"], JobState(j["state"])) is not None ] flow_state = FlowState.from_jobs_states( jobs_states=jobs_states, leaf_states=leaf_states ) # update flow state. If it is changed update the updated_on updated_cond = { "$cond": { "if": {"$eq": ["$state", flow_state.value]}, "then": "$updated_on", "else": datetime.utcnow(), } } self.flows.find_one_and_update( {"uuid": flow_uuid}, [{"$set": {"state": flow_state.value, "updated_on": updated_cond}}], )
[docs] @contextlib.contextmanager def lock_job(self, **lock_kwargs) -> Generator[MongoLock, None, None]: """ Lock a Job document. See MongoLock context manager for more details about the locking options. Parameters ---------- lock_kwargs Kwargs passed to the MongoLock context manager. Returns ------- MongoLock An instance of MongoLock. """ with MongoLock(collection=self.jobs, **lock_kwargs) as lock: yield lock
[docs] @contextlib.contextmanager def lock_flow(self, **lock_kwargs) -> Generator[MongoLock, None, None]: """ Lock a Flow document. See MongoLock context manager for more details about the locking options. Parameters ---------- lock_kwargs Kwargs passed to the MongoLock context manager. Returns ------- MongoLock An instance of MongoLock. """ with MongoLock(collection=self.flows, **lock_kwargs) as lock: yield lock
[docs] @contextlib.contextmanager def lock_job_for_update( self, query, max_step_attempts, delta_retry, **kwargs, ) -> Generator[MongoLock, None, None]: """ Lock a Job document for state update by the Runner. See MongoLock context manager for more details about the locking options. Parameters ---------- query The query used to select the Job document to lock. max_step_attempts The maximum number of attempts for a single step after which the Job should be set to the REMOTE_ERROR state. delta_retry List of increasing delay between subsequent attempts when the advancement of a remote step fails. Used to set the retry time. kwargs Kwargs passed to the MongoLock context manager. Returns ------- MongoLock An instance of MongoLock. """ db_filter = dict(query) db_filter["remote.retry_time_limit"] = {"$not": {"$gt": datetime.utcnow()}} if "sort" not in kwargs: kwargs["sort"] = [ ("priority", pymongo.DESCENDING), ("created_on", pymongo.ASCENDING), ] with self.lock_job( filter=db_filter, **kwargs, ) as lock: doc = lock.locked_document no_retry = False error = None try: yield lock except ConfigError: error = traceback.format_exc() warnings.warn(error, stacklevel=2) no_retry = True except RemoteError as e: error = f"Remote error: {e.msg}" no_retry = e.no_retry except Exception: error = traceback.format_exc() warnings.warn(error, stacklevel=2) set_output = lock.update_on_release if lock.locked_document: if not error: succeeded_update = { "$set": { "remote.step_attempts": 0, "remote.retry_time_limit": None, "remote.error": None, } } update_on_release = deep_merge_dict( succeeded_update, set_output or {} ) else: step_attempts = doc["remote"]["step_attempts"] no_retry = no_retry or step_attempts >= max_step_attempts if no_retry: update_on_release = { "$set": { "state": JobState.REMOTE_ERROR.value, "previous_state": doc["state"], "remote.error": error, } } else: step_attempts += 1 ind = min(step_attempts, len(delta_retry)) - 1 delta = delta_retry[ind] retry_time_limit = datetime.utcnow() + timedelta(seconds=delta) update_on_release = { "$set": { "remote.step_attempts": step_attempts, "remote.retry_time_limit": retry_time_limit, "remote.error": error, } } if "$set" in update_on_release: update_on_release["$set"]["updated_on"] = datetime.utcnow() lock.update_on_release = update_on_release
[docs] @contextlib.contextmanager def lock_job_flow( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, job_lock_kwargs: dict | None = None, flow_lock_kwargs: dict | None = None, ) -> Generator[tuple[MongoLock, MongoLock], None, None]: """ Lock one Job document and the Flow document the Job belongs to. See MongoLock context manager for more details about the locking options. Parameters ---------- job_id The uuid of the Job to lock. db_id The db_id of the Job to lock. job_index The index of the Job to lock. wait The amount of seconds to wait for a lock to be released. break_lock True if the context manager is allowed to forcibly break a lock. acceptable_states A list of JobStates. If not among these a ValueError exception is raised. job_lock_kwargs Kwargs passed to MongoLock for the Job lock. flow_lock_kwargs Kwargs passed to MongoLock for the Flow lock. Returns ------- MongoLock, MongoLock An instance of MongoLock. """ lock_filter, sort = self.generate_job_id_query(db_id, job_id, job_index) sleep = None if wait: sleep = 10 job_lock_kwargs = job_lock_kwargs or {} flow_lock_kwargs = flow_lock_kwargs or {} with self.lock_job( filter=lock_filter, break_lock=break_lock, sort=sort, sleep=sleep, max_wait=wait, get_locked_doc=True, **job_lock_kwargs, ) as job_lock: job_doc_dict = job_lock.locked_document if not job_doc_dict: if job_lock.unavailable_document: raise JobLockedError.from_job_doc(job_lock.unavailable_document) raise ValueError(f"No Job document matching criteria {lock_filter}") job_state = JobState(job_doc_dict["state"]) if acceptable_states and job_state not in acceptable_states: raise ValueError( f"Job in state {job_doc_dict['state']}. The action cannot be performed" ) flow_filter = {"jobs": job_doc_dict["uuid"]} with self.lock_flow( filter=flow_filter, sleep=sleep, max_wait=wait, get_locked_doc=True, break_lock=break_lock, **flow_lock_kwargs, ) as flow_lock: if not flow_lock.locked_document: if flow_lock.unavailable_document: raise FlowLockedError.from_flow_doc( flow_lock.unavailable_document ) raise ValueError( f"No Flow document matching criteria {flow_filter}" ) yield job_lock, flow_lock
[docs] def ping_flow_doc(self, uuid: str) -> None: """ Ping a Flow document to update its "updated_on" value. Parameters ---------- uuid The uuid of the Flow to update. """ self.flows.find_one_and_update( {"nodes": uuid}, {"$set": {"updated_on": datetime.utcnow()}} )
def _cancel_queue_process(self, job_doc: dict) -> None: """ Cancel the process in the remote queue. Parameters ---------- job_doc The dict of the JobDoc with the Job to be cancelled. """ queue_process_id = job_doc["remote"]["process_id"] if not queue_process_id: raise ValueError("The process id is not defined in the job document") worker = self.project.workers[job_doc["worker"]] host = worker.get_host() try: host.connect() queue_manager = QueueManager(worker.get_scheduler_io(), host) cancel_result = queue_manager.cancel(queue_process_id) if cancel_result.status != CancelStatus.SUCCESSFUL: raise RuntimeError( f"Cancelling queue process {queue_process_id} failed. stdout: {cancel_result.stdout}. stderr: {cancel_result.stderr}" ) finally: try: host.close() except Exception: logger.warning( f"The connection to host {host} could not be closed.", exc_info=True )
[docs] def get_batch_processes(self, worker: str) -> dict[str, str]: """ Get the batch processes associated with a given worker. Parameters ---------- worker The worker name. Returns ------- dict A dictionary with the {process_id: process_uuid} of the batch jobs running on the selected worker. """ result = self.auxiliary.find_one({"batch_processes": {"$exists": True}}) if result: return result["batch_processes"].get(worker, {}) return {}
[docs] def add_batch_process( self, process_id: str, process_uuid: str, worker: str ) -> dict: """ Add a batch process to the list of running processes. Two IDs are defined, one to keep track of the actual process number and one to be associated to the Jobs that are being executed. The need for two IDs originates from the fact that the former may not be known at runtime. Parameters ---------- process_id The ID of the processes obtained from the QueueManager. process_uuid A unique ID to identify the processes. worker The worker where the process is being executed. Returns ------- dict The updated document. """ return self.auxiliary.find_one_and_update( {"batch_processes": {"$exists": True}}, {"$push": {f"batch_processes.{worker}.{process_id}": process_uuid}}, upsert=True, )
[docs] def remove_batch_process(self, process_id: str, worker: str) -> dict: """ Remove a process from the list of running batch processes. Parameters ---------- process_id The ID of the processes obtained from the QueueManager. worker The worker where the process was being executed. Returns ------- dict The updated document. """ return self.auxiliary.find_one_and_update( {"batch_processes": {"$exists": True}}, {"$unset": {f"batch_processes.{worker}.{process_id}": ""}}, upsert=True, )
[docs] def delete_job( self, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, delete_output: bool = False, delete_files: bool = False, wait: int | None = None, break_lock: bool = False, ) -> str: """ Delete a single job from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only. Parameters ---------- job_id The uuid of the job to delete. db_id The db_id of the job to delete. job_index The index of the job. If None, the job with the largest index will be selected. delete_output : bool, default False If True, also delete the job output from the JobStore. delete_files If True also delete the files on the worker. wait In case the Flow or Job is locked, wait this time (in seconds) for the lock to be released. break_lock Forcibly break the lock on locked documents. Returns ------- str The db_id of the deleted Job. """ job_lock_kwargs = dict(projection=["uuid", "index", "db_id", "state"]) # avoid deleting jobs in batch states. It would require additional # specific handling and it is an unlikely use case. with self.lock_job_flow( job_id=job_id, db_id=db_id, job_index=job_index, acceptable_states=DELETABLE_STATES, wait=wait, break_lock=break_lock, job_lock_kwargs=job_lock_kwargs, ) as (job_lock, flow_lock): job_doc = job_lock.locked_document if job_doc is None: raise RuntimeError("No job document found in lock") if flow_lock.locked_document is None: raise RuntimeError("No document found in flow lock") # Update FlowDoc flow_doc = FlowDoc.model_validate(flow_lock.locked_document) job_uuid, job_index = job_doc["uuid"], job_doc["index"] if len(flow_doc.ids) == 1: raise RuntimeError( "It is not possible to delete the only Job of the Flow. Delete the entire Flow." ) # Remove job from ids list flow_doc.ids = [ id_tuple for id_tuple in flow_doc.ids if id_tuple[1] != job_uuid or id_tuple[2] != job_index ] # Remove job from jobs list and as parent of other jobs if no job # with that id remains in the flow if not any(job_uuid == id_tuple[1] for id_tuple in flow_doc.ids): # Here a flow_doc.jobs.remove could be enough. But due to a previous # bug the list of jobs could contain the same uuid more than once. # Make sure to remove all the instances. flow_doc.jobs = [jid for jid in flow_doc.jobs if jid != job_uuid] for parent_dict in flow_doc.parents.values(): for index_list in parent_dict.values(): if job_uuid in index_list: index_list.remove(job_uuid) # Remove job from parents flow_doc.parents[job_uuid].pop(str(job_index), None) # if all the jobs with a given uuid have been removed, also remove # the entry from the parents if not flow_doc.parents[job_uuid]: flow_doc.parents.pop(job_uuid, None) # Update flow state if necessary updated_states = {job_uuid: {job_index: None}} # None indicates job removal self.update_flow_state( flow_uuid=flow_doc.uuid, updated_states=updated_states ) # Prepare flow update flow_update = { "$set": { "jobs": flow_doc.jobs, "ids": flow_doc.ids, "parents": flow_doc.parents, } } flow_update["$set"]["updated_on"] = datetime.utcnow() # Set flow update to be applied on lock release flow_lock.update_on_release = flow_update job_lock.delete_on_release = True # Optionally delete from jobstore if delete_output: try: if delete_output: self.jobstore.remove_docs({"uuid": job_id, "index": job_index}) except Exception: warnings.warn( f"Error while delete the output of job {job_id} {job_index}", stacklevel=2, ) if delete_files: job_info = JobInfo.from_query_output(job_doc) self._safe_delete_files([job_info]) return job_doc["db_id"]
[docs] def delete_jobs( self, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, delete_output: bool = False, delete_files: bool = False, max_limit: int = 10, ) -> list[str]: """ Delete selected jobs from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only. Parameters ---------- job_ids One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve. db_ids One or more db_ids of the Jobs to retrieve. flow_ids One or more Flow uuids to which the Jobs to retrieve belong. states One or more states of the Jobs. start_date Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC. end_date Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC. name Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. *test*) metadata A dictionary of the values of the metadata to match. Should be an exact match for all the values provided. workers One or more worker names. custom_query A generic query. Incompatible with all the other filtering options. raise_on_error If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed. wait In case the Flow or Jobs that need to be deleted are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released. delete_output : bool, default False If True, also delete the Job output from the JobStore. delete_files If True also delete the files on the worker. max_limit The Jobs will be deleted only if the total number is lower than the specified limit. 0 means no limit. Returns ------- list List of db_ids of the deleted Jobs. """ return self._many_jobs_action( method=self.delete_job, action_description="deleting", job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, states=states, start_date=start_date, end_date=end_date, name=name, metadata=metadata, workers=workers, custom_query=custom_query, raise_on_error=raise_on_error, wait=wait, delete_output=delete_output, delete_files=delete_files, max_limit=max_limit, )
[docs] def get_flow_leafs(job_docs: list[dict]) -> list[dict]: """ Get the leaf jobs from a list of serialized representation of JobDoc. Parameters ---------- job_docs The list of serialized JobDocs in the Flow Returns ------- list The list of serialized JobDocs that are leafs of the Flow. """ # first sort the list, so that only the largest indexes are kept in the dictionary job_docs = sorted(job_docs, key=lambda j: j["index"]) d = {j["uuid"]: j for j in job_docs} for j in job_docs: if j["parents"]: for parent_id in j["parents"]: d.pop(parent_id, None) return list(d.values())