jobflow_remote.jobs.jobcontroller module#

class jobflow_remote.jobs.jobcontroller.JobController(queue_store: MongoStore, jobstore: JobStore, flows_collection: str = 'flows', auxiliary_collection: str = 'jf_auxiliary', project: Project | None = None)[source]#

Bases: object

Main entry point for all the interactions with the Stores.

Maintains a connection to both the queue Store and the results JobStore. It is required that the queue Store is a MongoStore, as it will access the database, and work with different collections.

The main functionalities are those for updating the state of the database and querying the Jobs and Flows status information.

Parameters:
  • queue_store – The Store used to save information about the status of the Jobs. Should be a MongoStore and other collections are used from the same database.

  • jobstore – The JobStore containing the output of the jobflow Flows.

  • flows_collection – The name of the collection used to store the Flows data. Uses the DB defined in the queue_store.

  • auxiliary_collection – The name of the collection used to store other auxiliary data. Uses the DB defined in the queue_store.

  • project – The project where the Stores were defined.

add_batch_process(process_id: str, process_uuid: str, worker: str) dict[source]#

Add a batch process to the list of running processes.

Two IDs are defined, one to keep track of the actual process number and one to be associated to the Jobs that are being executed. The need for two IDs originates from the fact that the former may not be known at runtime.

Parameters:
  • process_id – The ID of the processes obtained from the QueueManager.

  • process_uuid – A unique ID to identify the processes.

  • worker – The worker where the process is being executed.

Returns:

The updated document.

Return type:

dict

add_flow(flow: Flow | Job | list[Job], worker: str, allow_external_references: bool = False, exec_config: ExecutionConfig | None = None, resources: dict | QResources | None = None) list[str][source]#
build_indexes(background: bool = True, job_custom_indexes: list[str | list] | None = None, flow_custom_indexes: list[str | list] | None = None, drop: bool = False) None[source]#

Build indexes in the database.

Parameters:
  • background – If True, the indexes should be created in the background.

  • job_custom_indexes – List of custom indexes for the jobs collection. Each element is passed to pymongo’s create_index, thus following those conventions.

  • flow_custom_indexes – List of custom indexes for the flows collection. Same as job_custom_indexes.

  • drop – If True all existing indexes in the collections will be dropped.

checkin_job(job_doc: dict, flow_dict: dict, response: dict | None, error: str | None = None, doc_update: dict | None = None)[source]#
checkout_job(query=None, flow_uuid: str = None, sort: list[tuple[str, int]] | None = None) tuple[str, int] | None[source]#

Check out one job.

Set the job state from READY to CHECKED_OUT with an atomic update. Flow state is also updated if needed.

NB: flow is not locked during the checkout at any time. Does not require lock of the Job document.

close() None[source]#

Close the connections to all the Stores in JobController.

compact() None[source]#

Compact jobs and flows collections in MongoDB.

complete_job(job_doc: dict, local_path: Path | str, store: JobStore) bool[source]#
count_flows(query: dict | None = None, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None) int[source]#

Count flows based on filter parameters.

Parameters:
  • query – A generic query. Will override all the other parameters.

  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flows.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

Returns:

Number of Flows matching the criteria.

Return type:

int

count_jobs(query: dict | None = None, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, locked: bool = False, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None) int[source]#

Count Jobs based on filters.

Parameters:
  • query – A generic query. Will override all the other parameters.

  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

Returns:

Number of Jobs matching the criteria.

Return type:

int

create_indexes(indexes: list[str | list], collection: DbCollection = DbCollection.JOBS, unique: bool = False, background: bool = True) None[source]#

Build the selected indexes

Parameters:
  • indexes – List of indexes to be added to the collection. Each element is passed to pymongo’s create_index, thus following those conventions.

  • collection – The collection where the index will be created.

  • unique

  • background – If True, the indexes should be created in the background.

delete_flow(flow_id: str, delete_output: bool = False, delete_files: bool = False) bool[source]#

Delete a single Flow based on the uuid.

Parameters:
  • flow_id – One or more Flow uuids.

  • delete_output – If True also delete the associated output in the JobStore.

  • delete_files – If True also delete the files on the worker.

Returns:

True if the flow has been deleted.

Return type:

bool

delete_flows(flow_ids: str | list[str] | None = None, max_limit: int = 10, delete_output: bool = False, delete_files: bool = False) int[source]#

Delete a list of Flows based on the flow uuids.

Parameters:
  • flow_ids – One or more Flow uuids.

  • max_limit – The Flows will be deleted only if the total number is lower than the specified limit. 0 means no limit.

  • delete_output – If True also delete the associated output in the JobStore.

  • delete_files – If True also delete the files on the worker.

Returns:

Number of deleted Flows.

Return type:

int

delete_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, delete_output: bool = False, delete_files: bool = False, wait: int | None = None, break_lock: bool = False) str[source]#

Delete a single job from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only.

Parameters:
  • job_id – The uuid of the job to delete.

  • db_id – The db_id of the job to delete.

  • job_index – The index of the job. If None, the job with the largest index will be selected.

  • delete_output (bool, default False) – If True, also delete the job output from the JobStore.

  • delete_files – If True also delete the files on the worker.

  • wait – In case the Flow or Job is locked, wait this time (in seconds) for the lock to be released.

  • break_lock – Forcibly break the lock on locked documents.

Returns:

The db_id of the deleted Job.

Return type:

str

delete_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, delete_output: bool = False, delete_files: bool = False, max_limit: int = 10) list[str][source]#

Delete selected jobs from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be deleted are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • delete_output (bool, default False) – If True, also delete the Job output from the JobStore.

  • delete_files – If True also delete the files on the worker.

  • max_limit – The Jobs will be deleted only if the total number is lower than the specified limit. 0 means no limit.

Returns:

List of db_ids of the deleted Jobs.

Return type:

list

classmethod from_project(project: Project) JobController[source]#

Generate an instance of JobController from a Project object.

Parameters:

project – The project used to generate the JobController. If None the default project will be used.

Returns:

An instance of JobController associated with the project.

Return type:

JobController

classmethod from_project_name(project_name: str | None = None) JobController[source]#

Generate an instance of JobController from the project name.

Parameters:

project_name – The name of the project. If None the default project will be used.

Returns:

An instance of JobController associated with the project.

Return type:

JobController

static generate_job_id_query(db_id: str | None = None, job_id: str | None = None, job_index: int | None = None) tuple[dict, list | None][source]#

Generate a query for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job the sorting will be added to get the highest index.

Returns:

A dict and an optional list to be used as query and sort, respectively, in a query for a single Job.

Return type:

dict, list

get_batch_processes(worker: str) dict[str, str][source]#

Get the batch processes associated with a given worker.

Parameters:

worker – The worker name.

Returns:

A dictionary with the {process_id: process_uuid} of the batch jobs running on the selected worker.

Return type:

dict

get_collection(collection: DbCollection)[source]#

Return the internal collection corresponding to the selected DbCollection.

Parameters:

collection – The collection selected.

Return type:

The internal instance of the MongoDB collection.

get_flow_info_by_flow_uuid(flow_uuid: str, projection: list | dict | None = None)[source]#
get_flow_info_by_job_uuid(job_uuid: str, projection: list | dict | None = None)[source]#
get_flow_job_aggreg(query: dict | None = None, projection_flow: dict | None = None, projection_job: dict | None = None, sort: list[tuple] | None = None, limit: int = 0) list[dict][source]#

Retrieve data about Flows and all their Jobs through an aggregation.

In the aggregation the list of Jobs are identified as jobs_list.

Parameters:
  • query – A dictionary representing the filter.

  • projection_flow – Projection of the fields for the Flow document passed to the aggregation.

  • projection_job – Projection of the fields for the Job document used in the $lookup.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

The list of dictionaries resulting from the query.

Return type:

list

get_flow_jobs_data(query: dict | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0) list[dict][source]#

Get the data of Flows and their Jobs from the DB using an aggregation.

In the aggregation the Jobs are identified as “jobs”.

Parameters:
  • query – The query to filter the Flow.

  • projection – The projection for the Flow and Job data.

  • sort – Sorting passed to the aggregation.

  • limit – The maximum number of results returned.

Return type:

A list of dictionaries with the result of the query.

get_flows_info(job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, full: bool = False) list[FlowInfo][source]#

Query for Flows based on standard parameters and return a list of FlowInfo.

Parameters:
  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flow.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • locked – If True only locked Flows will be selected.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

  • full – If True data is fetched from both the Flow collection and Job collection with an aggregate. Otherwise, only the Job information in the Flow document will be used.

Returns:

A list of FlowInfo.

Return type:

list

get_job_doc(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None) JobDoc | None[source]#
get_job_info(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None) JobInfo | None[source]#

Get the JobInfo for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

Returns:

A JobInfo, or None if no Job matches the criteria.

Return type:

JobInfo

get_job_info_by_job_uuid(job_uuid: str, job_index: int | str = 'last', projection: list | dict | None = None)[source]#
get_job_info_by_pid(pid: int | str) JobInfo | None[source]#

Retrieve job information by process ID (e.g., Slurm job ID).

Parameters:

pid (int) – The process ID of the job in the queue system.

Returns:

Job information if found, None otherwise.

Return type:

JobInfo | None

get_job_uuids(flow_uuids: list[str]) list[str][source]#

Get the list of Jobs belonging to Flows, based on their uuid.

Parameters:

flow_uuids – A list of Flow uuids.

Return type:

A list of uuids of Jobs belong to the selected Flows.

get_jobs(query, projection: list | dict | None = None)[source]#
get_jobs_doc(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0) list[JobDoc][source]#

Query for Jobs based on standard parameters and return a list of JobDoc.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

A list of JobDoc objects for the Jobs matching the criteria.

Return type:

list

get_jobs_doc_query(query: dict = None, **kwargs) list[JobDoc][source]#

Query for Jobs based on a generic filter and return a list of JobDoc.

Parameters:
  • query – A dictionary representing the filter.

  • kwargs – All arguments passed to pymongo’s Collection.find() method.

Returns:

A list of JobDoc objects for the Jobs matching the criteria.

Return type:

list

get_jobs_info(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple[str, int]] | None = None, limit: int = 0) list[JobInfo][source]#

Query for Jobs based on standard parameters and return a list of JobInfo.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

A list of JobInfo objects for the Jobs matching the criteria.

Return type:

list

get_jobs_info_by_flow_uuid(flow_uuid, projection: list | dict | None = None)[source]#
get_jobs_info_query(query: dict = None, **kwargs) list[JobInfo][source]#

Get a list of JobInfo based on a generic query.

Parameters:
  • query – The query to be performed.

  • kwargs – arguments passed to MongoDB find().

Returns:

A list of JobInfo matching the criteria.

Return type:

list

lock_flow(**lock_kwargs) Generator[MongoLock, None, None][source]#

Lock a Flow document.

See MongoLock context manager for more details about the locking options.

Parameters:

lock_kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

lock_job(**lock_kwargs) Generator[MongoLock, None, None][source]#

Lock a Job document.

See MongoLock context manager for more details about the locking options.

Parameters:

lock_kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

lock_job_flow(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, job_lock_kwargs: dict | None = None, flow_lock_kwargs: dict | None = None) Generator[tuple[MongoLock, MongoLock], None, None][source]#

Lock one Job document and the Flow document the Job belongs to.

See MongoLock context manager for more details about the locking options.

Parameters:
  • job_id – The uuid of the Job to lock.

  • db_id – The db_id of the Job to lock.

  • job_index – The index of the Job to lock.

  • wait – The amount of seconds to wait for a lock to be released.

  • break_lock – True if the context manager is allowed to forcibly break a lock.

  • acceptable_states – A list of JobStates. If not among these a ValueError exception is raised.

  • job_lock_kwargs – Kwargs passed to MongoLock for the Job lock.

  • flow_lock_kwargs – Kwargs passed to MongoLock for the Flow lock.

Returns:

An instance of MongoLock.

Return type:

MongoLock, MongoLock

lock_job_for_update(query, max_step_attempts, delta_retry, **kwargs) Generator[MongoLock, None, None][source]#

Lock a Job document for state update by the Runner.

See MongoLock context manager for more details about the locking options.

Parameters:
  • query – The query used to select the Job document to lock.

  • max_step_attempts – The maximum number of attempts for a single step after which the Job should be set to the REMOTE_ERROR state.

  • delta_retry – List of increasing delay between subsequent attempts when the advancement of a remote step fails. Used to set the retry time.

  • kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

pause_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None) str[source]#

Pause a single Job. Only READY and WAITING Jobs can be paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is reversible.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

Returns:

The db_id of the updated Job.

Return type:

str

pause_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None) list[str][source]#

Pause selected Jobs. Only READY and WAITING Jobs can be paused. The action is reversible.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

ping_flow_doc(uuid: str) None[source]#

Ping a Flow document to update its “updated_on” value.

Parameters:

uuid – The uuid of the Flow to update.

play_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Restart a single Jobs that was previously paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

play_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Restart selected Jobs that were previously paused.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

refresh_children(job_uuids: list[str]) list[str][source]#

Set the state of Jobs children to READY following the completion of a Job.

Parameters:

job_uuids – List of Jobs uuids belonging to a Flow.

Return type:

List of db_ids of modified Jobs.

remove_batch_process(process_id: str, worker: str) dict[source]#

Remove a process from the list of running batch processes.

Parameters:
  • process_id – The ID of the processes obtained from the QueueManager.

  • worker – The worker where the process was being executed.

Returns:

The updated document.

Return type:

dict

rerun_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, force: bool = False, wait: int | None = None, break_lock: bool = False) list[str][source]#

Rerun a single Job, i.e. bring its state back to READY. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

By default, only Jobs in one of the running states (CHECKED_OUT, UPLOADED, …), in the REMOTE_ERROR state or FAILED with children in the READY or WAITING state can be rerun. This should guarantee that no unexpected inconsistencies due to dynamic Jobs generation should appear. This limitation can be bypassed with the force option. In any case, no Job with children with index > 1 can be rerun, as there is no sensible way of handling it.

Rerunning a Job in a REMOTE_ERROR or on an intermediate STATE also results in a reset of the remote attempts and errors. When rerunning a Job in a SUBMITTED or RUNNING state the system also tries to cancel the process in the worker. Rerunning a FAILED Job also lead to change of state in its children. The full list of modified Jobs is returned.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • force – Bypass the limitation that only Jobs in a certain state can be rerun.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

rerun_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, force: bool = False, wait: int | None = None, break_lock: bool = False) list[str][source]#

Rerun a list of selected Jobs, i.e. bring their state back to READY. See the docs of rerun_job for more details.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • force – Bypass the limitation that only failed Jobs can be rerun.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

reset(reset_output: bool = False, max_limit: int = 25) bool[source]#

Reset the content of the queue database and builds the indexes. Optionally deletes the content of the JobStore with the outputs. In this case all the data contained in the JobStore will be removed, not just those associated to the data in the queue.

Parameters:
  • reset_output – If True also reset the JobStore containing the outputs.

  • max_limit – Maximum number of Flows present in the DB. If number is larger the database will not be reset. Set 0 for not limit.

Returns:

True if the database was reset, False otherwise.

Return type:

bool

retry_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Retry a single Job, i.e. bring it back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state. Jobs in other states cannot be retried. The Job is selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Only locking of the retried Job is required.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

retry_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Retry selected Jobs, i.e. bring them back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

set_job_doc_properties(values: dict, db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, use_pipeline: bool = False) str[source]#

Helper to set multiple values in a JobDoc while locking the Job. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • values – Dictionary with the values to be set. Will be passed to a pymongo update_one method.

  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents.

  • acceptable_states – List of JobState for which the Job values can be changed. If None all states are acceptable.

  • use_pipeline – if True a pipeline will be used in the update of the document

Returns:

The db_id of the updated Job. None if the Job was not updated.

Return type:

str

set_job_run_properties(worker: str | None = None, exec_config: str | ExecutionConfig | dict | None = None, resources: dict | QResources | None = None, update: bool = True, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True) list[str][source]#

Set execution properties for selected Jobs: worker, exec_config and resources.

Parameters:
  • worker – The name of the worker to set.

  • exec_config – The name of the exec_config to set or an explicit value of ExecutionConfig or dict.

  • resources – The resources to be set, either as a dict or a QResources instance.

  • update – If True, when setting exec_config and resources a passed dictionary will be used to update already existing values. If False it will replace the original values.

  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

set_job_state(state: JobState, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Set the state of a Job to an arbitrary JobState. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

No check is performed! Any job can be set to any state. Only for advanced users or for debugging purposes.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents.

Returns:

The db_id of the updated Job. None if the Job was not updated.

Return type:

str

stop_children(job_uuid: str) int[source]#

Stop the direct children of a Job in the WAITING state.

Parameters:

job_uuid – The uuid of the Job.

Return type:

The number of modified Jobs.

stop_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Stop a single Job. Only Jobs in the READY and all the running states can be stopped. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is not reversible.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

stop_jobflow(job_uuid: str = None, flow_uuid: str = None) int[source]#

Stop all the WAITING Jobs in a Flow.

Parameters:
  • job_uuid – The uuid of Job to identify the Flow. Incompatible with flow_uuid.

  • flow_uuid – The Flow uuid. Incompatible with job_uuid.

Return type:

The number of modified Jobs.

stop_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Stop selected Jobs. Only Jobs in the READY and all the running states can be stopped. The action is not reversible.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

unlock_flows(job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None) int[source]#

Forcibly remove the lock on a locked Flow document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies.

Parameters:
  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flows.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

Returns:

Number of modified Flows.

Return type:

int

unlock_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None) int[source]#

Forcibly remove the lock on a locked Job document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

Returns:

Number of modified Jobs.

Return type:

int

update_flow_state(flow_uuid: str, updated_states: dict[str, dict[int, JobState | None]] | None = None) None[source]#

Update the state of a Flow in the DB based on the Job’s states.

The Flow should be locked while performing this operation.

Parameters:
  • flow_uuid – The uuid of the Flow to update.

  • updated_states – A dictionary with the updated states of Jobs that have not been stored in the DB yet. In the form {job_uuid: JobState value}. If the value is None the Job is considered deleted and the state of that Job will be ignored while determining the state of the whole Flow.

jobflow_remote.jobs.jobcontroller.get_flow_leafs(job_docs: list[dict]) list[dict][source]#

Get the leaf jobs from a list of serialized representation of JobDoc.

Parameters:

job_docs – The list of serialized JobDocs in the Flow

Returns:

The list of serialized JobDocs that are leafs of the Flow.

Return type:

list