jobflow_remote package#

Subpackages#

Module contents#

jobflow-remote is a python package to run jobflow workflows on remote resources.

class jobflow_remote.ConfigManager(exclude_unset: bool = False, exclude_none: bool = False, warn: bool = False)[source]#

Bases: object

A manager for the projects configuration files.

Provides tool to parse project information from the selected projects folder as well as methods to update the properties of each project.

Parameters:
  • exclude_unset – when dumping projects determine whether fields which were not explicitly set when creating the model should be excluded from the dictionary

  • exclude_none – when dumping projects determine whether fields which are equal to None should be excluded from the dictionary

  • warn – if True print warnings related to the parsing of the files in the projects folder

create_project(project: Project, ext='yaml') None[source]#

Create a new Project in the project folder by dumping the project to file.

Parameters:
  • project – The data of the project to be created.

  • ext

    The extension of the file to which the project will be dumped (yaml, json

    or toml)

dump_project(project_data: ProjectData) None[source]#

Dump the project to filepath specified in the ProjectData.

Parameters:

project_data – The project data to be dumped

get_exec_config(exec_config_name: str, project_name: str | None = None) ExecutionConfig[source]#

Return the ExecutionConfig object based on the name.

Parameters:
  • exec_config_name – Name of the ExecutionConfig.

  • project_name – Name of the project from which the ExecutionConfig should be retrieved, or None to use the one from the settings.

Returns:

The selected ExecutionConfig

Return type:

ExecutionConfig

get_project(project_name: str | None = None) Project[source]#

Get the Project object based from the project name.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The selected Project

Return type:

Project

get_project_data(project_name: str | None = None) ProjectData[source]#

Get the ProjectData object based from the project name.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The selected ProjectData

Return type:

ProjectData

get_worker(worker_name: str, project_name: str | None = None) WorkerBase[source]#

Return the worker object based on the name.

Parameters:
  • worker_name – Name of the worker to retrieve.

  • project_name – Name of the project from which the Worker should be retrieved, or None to use the one from the settings.

Returns:

The selected Worker.

Return type:

WorkerBase

load_projects_data() dict[str, ProjectData][source]#

Load projects from the selected projects folder.

Returns:

Dictionary with project name as key and ProjectData as value.

Return type:

dict

project_names_from_files() list[str][source]#

Parses all the prasable files and only checks for the “name” attribute to return a list of potential project file names.

Useful in case some projects cannot be properly parsed, but the full list needs to be returned.

Returns:

List of project names.

Return type:

list

property projects: dict[str, Project]#

returns: Dictionary with project name as key and Project as value. :rtype: dict

projects_ext = ('json', 'yaml', 'toml')#
remove_exec_config(exec_config_name: str, project_name: str | None = None) None[source]#

Remove an ExecutionConfig from the selected project.

Parameters:
  • exec_config_name – Name of the ExecutionConfig to be removed

  • project_name – Name of the project from which the ExecutionConfig should be removed, or None to use the one from the settings.

remove_project(project_name: str, remove_folders: bool = True) None[source]#

Remove a project from the projects folder.

Parameters:
  • project_name – Name of the project to be removed.

  • remove_folders – Optionally remove the folders related to the project (e.g. tmp, log).

remove_worker(worker_name: str, project_name: str | None = None) None[source]#

Remove a worker from the selected project.

Parameters:
  • worker_name – Name of the worker to be removed

  • project_name – Name of the project from which the Worker should be removed, or None to use the one from the settings.

select_project_name(project_name: str | None = None) str[source]#

Determine the project name to be used based on the passed value and on the general settings.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The name of the selected project.

Return type:

str

set_exec_config(exec_config_name: str, exec_config: ExecutionConfig, project_name: str | None = None, replace: bool = False) None[source]#

Set an ExecutionConfig in the selected project. Can add a new ExecutionConfig or replace an existing one.

Parameters:
  • exec_config_name – Name of the ExecutionConfig to be added or replaced.

  • exec_config – The ExecutionConfig.

  • project_name

    Name of the project where the ExecutionConfig is set, or None to use

    the one from the settings.

  • replace – Raise an exception if False and an ExecutionConfig with the chosen name already exists.

set_jobstore(jobstore: JobStore, project_name: str | None = None) None[source]#

Set the project specific store used for jobflow.

Parameters:
  • jobstore – A maggma Store

  • project_name – Name of the project where the Store is set, or None to use the one from the settings.

set_queue_db(store: MongoStore, project_name: str | None = None) None[source]#

Set the project specific store used for managing the queue.

Parameters:
  • store – A maggma Store

  • project_name – Name of the project where the Store is set, or None to use the one from the settings.

set_worker(name: str, worker: WorkerBase, project_name: str | None = None, replace: bool = False) None[source]#

Set a worker in the selected project. Can add a new worker or replace an existing one.

Parameters:
  • name – Name of the worker to be added or replaced.

  • worker – Worker to be set.

  • project_name – Name of the project where the Worker is set, or None to use the one from the settings.

  • replace – Raise an exception if False and a Worker with the chosen name already exists.

update_project(config: dict, project_name: str) None[source]#

Update the project values. The passed dict with values will be recursively merged in the current project.

Parameters:
  • config – Dictionary with the project values to be updated.

  • project_name – Name of the project to be updated

class jobflow_remote.JobController(queue_store: MongoStore, jobstore: JobStore, flows_collection: str = 'flows', auxiliary_collection: str = 'jf_auxiliary', project: Project | None = None)[source]#

Bases: object

Main entry point for all the interactions with the Stores.

Maintains a connection to both the queue Store and the results JobStore. It is required that the queue Store is a MongoStore, as it will access the database, and work with different collections.

The main functionalities are those for updating the state of the database and querying the Jobs and Flows status information.

Parameters:
  • queue_store – The Store used to save information about the status of the Jobs. Should be a MongoStore and other collections are used from the same database.

  • jobstore – The JobStore containing the output of the jobflow Flows.

  • flows_collection – The name of the collection used to store the Flows data. Uses the DB defined in the queue_store.

  • auxiliary_collection – The name of the collection used to store other auxiliary data. Uses the DB defined in the queue_store.

  • project – The project where the Stores were defined.

add_batch_process(process_id: str, process_uuid: str, worker: str) dict[source]#

Add a batch process to the list of running processes.

Two IDs are defined, one to keep track of the actual process number and one to be associated to the Jobs that are being executed. The need for two IDs originates from the fact that the former may not be known at runtime.

Parameters:
  • process_id – The ID of the processes obtained from the QueueManager.

  • process_uuid – A unique ID to identify the processes.

  • worker – The worker where the process is being executed.

Returns:

The updated document.

Return type:

dict

add_flow(flow: Flow | Job | list[Job], worker: str, allow_external_references: bool = False, exec_config: ExecutionConfig | None = None, resources: dict | QResources | None = None) list[str][source]#
build_indexes(background: bool = True, job_custom_indexes: list[str | list] | None = None, flow_custom_indexes: list[str | list] | None = None, drop: bool = False) None[source]#

Build indexes in the database.

Parameters:
  • background – If True, the indexes should be created in the background.

  • job_custom_indexes – List of custom indexes for the jobs collection. Each element is passed to pymongo’s create_index, thus following those conventions.

  • flow_custom_indexes – List of custom indexes for the flows collection. Same as job_custom_indexes.

  • drop – If True all existing indexes in the collections will be dropped.

checkin_job(job_doc: dict, flow_dict: dict, response: dict | None, error: str | None = None, doc_update: dict | None = None)[source]#
checkout_job(query=None, flow_uuid: str = None, sort: list[tuple[str, int]] | None = None) tuple[str, int] | None[source]#

Check out one job.

Set the job state from READY to CHECKED_OUT with an atomic update. Flow state is also updated if needed.

NB: flow is not locked during the checkout at any time. Does not require lock of the Job document.

close() None[source]#

Close the connections to all the Stores in JobController.

compact() None[source]#

Compact jobs and flows collections in MongoDB.

complete_job(job_doc: dict, local_path: Path | str, store: JobStore) bool[source]#
count_flows(query: dict | None = None, job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None) int[source]#

Count flows based on filter parameters.

Parameters:
  • query – A generic query. Will override all the other parameters.

  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flows.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

Returns:

Number of Flows matching the criteria.

Return type:

int

count_jobs(query: dict | None = None, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, locked: bool = False, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None) int[source]#

Count Jobs based on filters.

Parameters:
  • query – A generic query. Will override all the other parameters.

  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

Returns:

Number of Jobs matching the criteria.

Return type:

int

create_indexes(indexes: list[str | list], collection: DbCollection = DbCollection.JOBS, unique: bool = False, background: bool = True) None[source]#

Build the selected indexes

Parameters:
  • indexes – List of indexes to be added to the collection. Each element is passed to pymongo’s create_index, thus following those conventions.

  • collection – The collection where the index will be created.

  • unique

  • background – If True, the indexes should be created in the background.

delete_flow(flow_id: str, delete_output: bool = False, delete_files: bool = False) bool[source]#

Delete a single Flow based on the uuid.

Parameters:
  • flow_id – One or more Flow uuids.

  • delete_output – If True also delete the associated output in the JobStore.

  • delete_files – If True also delete the files on the worker.

Returns:

True if the flow has been deleted.

Return type:

bool

delete_flows(flow_ids: str | list[str] | None = None, max_limit: int = 10, delete_output: bool = False, delete_files: bool = False) int[source]#

Delete a list of Flows based on the flow uuids.

Parameters:
  • flow_ids – One or more Flow uuids.

  • max_limit – The Flows will be deleted only if the total number is lower than the specified limit. 0 means no limit.

  • delete_output – If True also delete the associated output in the JobStore.

  • delete_files – If True also delete the files on the worker.

Returns:

Number of deleted Flows.

Return type:

int

delete_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, delete_output: bool = False, delete_files: bool = False, wait: int | None = None, break_lock: bool = False) str[source]#

Delete a single job from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only.

Parameters:
  • job_id – The uuid of the job to delete.

  • db_id – The db_id of the job to delete.

  • job_index – The index of the job. If None, the job with the largest index will be selected.

  • delete_output (bool, default False) – If True, also delete the job output from the JobStore.

  • delete_files – If True also delete the files on the worker.

  • wait – In case the Flow or Job is locked, wait this time (in seconds) for the lock to be released.

  • break_lock – Forcibly break the lock on locked documents.

Returns:

The db_id of the deleted Job.

Return type:

str

delete_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, delete_output: bool = False, delete_files: bool = False, max_limit: int = 10) list[str][source]#

Delete selected jobs from the queue store and optionally from the job store. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be deleted are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • delete_output (bool, default False) – If True, also delete the Job output from the JobStore.

  • delete_files – If True also delete the files on the worker.

  • max_limit – The Jobs will be deleted only if the total number is lower than the specified limit. 0 means no limit.

Returns:

List of db_ids of the deleted Jobs.

Return type:

list

classmethod from_project(project: Project) JobController[source]#

Generate an instance of JobController from a Project object.

Parameters:

project – The project used to generate the JobController. If None the default project will be used.

Returns:

An instance of JobController associated with the project.

Return type:

JobController

classmethod from_project_name(project_name: str | None = None) JobController[source]#

Generate an instance of JobController from the project name.

Parameters:

project_name – The name of the project. If None the default project will be used.

Returns:

An instance of JobController associated with the project.

Return type:

JobController

static generate_job_id_query(db_id: str | None = None, job_id: str | None = None, job_index: int | None = None) tuple[dict, list | None][source]#

Generate a query for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job the sorting will be added to get the highest index.

Returns:

A dict and an optional list to be used as query and sort, respectively, in a query for a single Job.

Return type:

dict, list

get_batch_processes(worker: str) dict[str, str][source]#

Get the batch processes associated with a given worker.

Parameters:

worker – The worker name.

Returns:

A dictionary with the {process_id: process_uuid} of the batch jobs running on the selected worker.

Return type:

dict

get_collection(collection: DbCollection)[source]#

Return the internal collection corresponding to the selected DbCollection.

Parameters:

collection – The collection selected.

Return type:

The internal instance of the MongoDB collection.

get_flow_info_by_flow_uuid(flow_uuid: str, projection: list | dict | None = None)[source]#
get_flow_info_by_job_uuid(job_uuid: str, projection: list | dict | None = None)[source]#
get_flow_job_aggreg(query: dict | None = None, projection_flow: dict | None = None, projection_job: dict | None = None, sort: list[tuple] | None = None, limit: int = 0) list[dict][source]#

Retrieve data about Flows and all their Jobs through an aggregation.

In the aggregation the list of Jobs are identified as jobs_list.

Parameters:
  • query – A dictionary representing the filter.

  • projection_flow – Projection of the fields for the Flow document passed to the aggregation.

  • projection_job – Projection of the fields for the Job document used in the $lookup.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

The list of dictionaries resulting from the query.

Return type:

list

get_flow_jobs_data(query: dict | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0) list[dict][source]#

Get the data of Flows and their Jobs from the DB using an aggregation.

In the aggregation the Jobs are identified as “jobs”.

Parameters:
  • query – The query to filter the Flow.

  • projection – The projection for the Flow and Job data.

  • sort – Sorting passed to the aggregation.

  • limit – The maximum number of results returned.

Return type:

A list of dictionaries with the result of the query.

get_flows_info(job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0, full: bool = False) list[FlowInfo][source]#

Query for Flows based on standard parameters and return a list of FlowInfo.

Parameters:
  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flow.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • locked – If True only locked Flows will be selected.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

  • full – If True data is fetched from both the Flow collection and Job collection with an aggregate. Otherwise, only the Job information in the Flow document will be used.

Returns:

A list of FlowInfo.

Return type:

list

get_job_doc(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None) JobDoc | None[source]#
get_job_info(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None) JobInfo | None[source]#

Get the JobInfo for a single Job based on db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

Returns:

A JobInfo, or None if no Job matches the criteria.

Return type:

JobInfo

get_job_info_by_job_uuid(job_uuid: str, job_index: int | str = 'last', projection: list | dict | None = None)[source]#
get_job_info_by_pid(pid: int | str) JobInfo | None[source]#

Retrieve job information by process ID (e.g., Slurm job ID).

Parameters:

pid (int) – The process ID of the job in the queue system.

Returns:

Job information if found, None otherwise.

Return type:

JobInfo | None

get_job_uuids(flow_uuids: list[str]) list[str][source]#

Get the list of Jobs belonging to Flows, based on their uuid.

Parameters:

flow_uuids – A list of Flow uuids.

Return type:

A list of uuids of Jobs belong to the selected Flows.

get_jobs(query, projection: list | dict | None = None)[source]#
get_jobs_doc(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple] | None = None, limit: int = 0) list[JobDoc][source]#

Query for Jobs based on standard parameters and return a list of JobDoc.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

A list of JobDoc objects for the Jobs matching the criteria.

Return type:

list

get_jobs_doc_query(query: dict = None, **kwargs) list[JobDoc][source]#

Query for Jobs based on a generic filter and return a list of JobDoc.

Parameters:
  • query – A dictionary representing the filter.

  • kwargs – All arguments passed to pymongo’s Collection.find() method.

Returns:

A list of JobDoc objects for the Jobs matching the criteria.

Return type:

list

get_jobs_info(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, locked: bool = False, sort: list[tuple[str, int]] | None = None, limit: int = 0) list[JobInfo][source]#

Query for Jobs based on standard parameters and return a list of JobInfo.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • locked – If True only locked Jobs will be selected.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • sort – A list of (key, direction) pairs specifying the sort order for this query. Follows pymongo conventions.

  • limit – Maximum number of entries to retrieve. 0 means no limit.

Returns:

A list of JobInfo objects for the Jobs matching the criteria.

Return type:

list

get_jobs_info_by_flow_uuid(flow_uuid, projection: list | dict | None = None)[source]#
get_jobs_info_query(query: dict = None, **kwargs) list[JobInfo][source]#

Get a list of JobInfo based on a generic query.

Parameters:
  • query – The query to be performed.

  • kwargs – arguments passed to MongoDB find().

Returns:

A list of JobInfo matching the criteria.

Return type:

list

lock_flow(**lock_kwargs) Generator[MongoLock, None, None][source]#

Lock a Flow document.

See MongoLock context manager for more details about the locking options.

Parameters:

lock_kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

lock_job(**lock_kwargs) Generator[MongoLock, None, None][source]#

Lock a Job document.

See MongoLock context manager for more details about the locking options.

Parameters:

lock_kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

lock_job_flow(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, job_lock_kwargs: dict | None = None, flow_lock_kwargs: dict | None = None) Generator[tuple[MongoLock, MongoLock], None, None][source]#

Lock one Job document and the Flow document the Job belongs to.

See MongoLock context manager for more details about the locking options.

Parameters:
  • job_id – The uuid of the Job to lock.

  • db_id – The db_id of the Job to lock.

  • job_index – The index of the Job to lock.

  • wait – The amount of seconds to wait for a lock to be released.

  • break_lock – True if the context manager is allowed to forcibly break a lock.

  • acceptable_states – A list of JobStates. If not among these a ValueError exception is raised.

  • job_lock_kwargs – Kwargs passed to MongoLock for the Job lock.

  • flow_lock_kwargs – Kwargs passed to MongoLock for the Flow lock.

Returns:

An instance of MongoLock.

Return type:

MongoLock, MongoLock

lock_job_for_update(query, max_step_attempts, delta_retry, **kwargs) Generator[MongoLock, None, None][source]#

Lock a Job document for state update by the Runner.

See MongoLock context manager for more details about the locking options.

Parameters:
  • query – The query used to select the Job document to lock.

  • max_step_attempts – The maximum number of attempts for a single step after which the Job should be set to the REMOTE_ERROR state.

  • delta_retry – List of increasing delay between subsequent attempts when the advancement of a remote step fails. Used to set the retry time.

  • kwargs – Kwargs passed to the MongoLock context manager.

Returns:

An instance of MongoLock.

Return type:

MongoLock

pause_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None) str[source]#

Pause a single Job. Only READY and WAITING Jobs can be paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is reversible.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

Returns:

The db_id of the updated Job.

Return type:

str

pause_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None) list[str][source]#

Pause selected Jobs. Only READY and WAITING Jobs can be paused. The action is reversible.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

ping_flow_doc(uuid: str) None[source]#

Ping a Flow document to update its “updated_on” value.

Parameters:

uuid – The uuid of the Flow to update.

play_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Restart a single Jobs that was previously paused. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

play_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Restart selected Jobs that were previously paused.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

refresh_children(job_uuids: list[str]) list[str][source]#

Set the state of Jobs children to READY following the completion of a Job.

Parameters:

job_uuids – List of Jobs uuids belonging to a Flow.

Return type:

List of db_ids of modified Jobs.

remove_batch_process(process_id: str, worker: str) dict[source]#

Remove a process from the list of running batch processes.

Parameters:
  • process_id – The ID of the processes obtained from the QueueManager.

  • worker – The worker where the process was being executed.

Returns:

The updated document.

Return type:

dict

rerun_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, force: bool = False, wait: int | None = None, break_lock: bool = False) list[str][source]#

Rerun a single Job, i.e. bring its state back to READY. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

By default, only Jobs in one of the running states (CHECKED_OUT, UPLOADED, …), in the REMOTE_ERROR state or FAILED with children in the READY or WAITING state can be rerun. This should guarantee that no unexpected inconsistencies due to dynamic Jobs generation should appear. This limitation can be bypassed with the force option. In any case, no Job with children with index > 1 can be rerun, as there is no sensible way of handling it.

Rerunning a Job in a REMOTE_ERROR or on an intermediate STATE also results in a reset of the remote attempts and errors. When rerunning a Job in a SUBMITTED or RUNNING state the system also tries to cancel the process in the worker. Rerunning a FAILED Job also lead to change of state in its children. The full list of modified Jobs is returned.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • force – Bypass the limitation that only Jobs in a certain state can be rerun.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

rerun_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, force: bool = False, wait: int | None = None, break_lock: bool = False) list[str][source]#

Rerun a list of selected Jobs, i.e. bring their state back to READY. See the docs of rerun_job for more details.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • force – Bypass the limitation that only failed Jobs can be rerun.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

reset(reset_output: bool = False, max_limit: int = 25) bool[source]#

Reset the content of the queue database and builds the indexes. Optionally deletes the content of the JobStore with the outputs. In this case all the data contained in the JobStore will be removed, not just those associated to the data in the queue.

Parameters:
  • reset_output – If True also reset the JobStore containing the outputs.

  • max_limit – Maximum number of Flows present in the DB. If number is larger the database will not be reset. Set 0 for not limit.

Returns:

True if the database was reset, False otherwise.

Return type:

bool

retry_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Retry a single Job, i.e. bring it back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state. Jobs in other states cannot be retried. The Job is selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Only locking of the retried Job is required.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

retry_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Retry selected Jobs, i.e. bring them back to its previous state if REMOTE_ERROR, or reset the remote attempts and time of retry if in another running state.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

set_job_doc_properties(values: dict, db_id: str | None = None, job_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False, acceptable_states: list[JobState] | None = None, use_pipeline: bool = False) str[source]#

Helper to set multiple values in a JobDoc while locking the Job. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

Parameters:
  • values – Dictionary with the values to be set. Will be passed to a pymongo update_one method.

  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents.

  • acceptable_states – List of JobState for which the Job values can be changed. If None all states are acceptable.

  • use_pipeline – if True a pipeline will be used in the update of the document

Returns:

The db_id of the updated Job. None if the Job was not updated.

Return type:

str

set_job_run_properties(worker: str | None = None, exec_config: str | ExecutionConfig | dict | None = None, resources: dict | QResources | None = None, update: bool = True, job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True) list[str][source]#

Set execution properties for selected Jobs: worker, exec_config and resources.

Parameters:
  • worker – The name of the worker to set.

  • exec_config – The name of the exec_config to set or an explicit value of ExecutionConfig or dict.

  • resources – The resources to be set, either as a dict or a QResources instance.

  • update – If True, when setting exec_config and resources a passed dictionary will be used to update already existing values. If False it will replace the original values.

  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

set_job_state(state: JobState, job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Set the state of a Job to an arbitrary JobState. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined.

No check is performed! Any job can be set to any state. Only for advanced users or for debugging purposes.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None the Job with the largest index will be selected.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents.

Returns:

The db_id of the updated Job. None if the Job was not updated.

Return type:

str

stop_children(job_uuid: str) int[source]#

Stop the direct children of a Job in the WAITING state.

Parameters:

job_uuid – The uuid of the Job.

Return type:

The number of modified Jobs.

stop_job(job_id: str | None = None, db_id: str | None = None, job_index: int | None = None, wait: int | None = None, break_lock: bool = False) str[source]#

Stop a single Job. Only Jobs in the READY and all the running states can be stopped. Selected by db_id or uuid+index. Only one among db_id and job_id should be defined. The action is not reversible.

Parameters:
  • db_id – The db_id of the Job.

  • job_id – The uuid of the Job.

  • job_index – The index of the Job. If None: the Job with the highest index.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

The db_id of the updated Job.

Return type:

str

stop_jobflow(job_uuid: str = None, flow_uuid: str = None) int[source]#

Stop all the WAITING Jobs in a Flow.

Parameters:
  • job_uuid – The uuid of Job to identify the Flow. Incompatible with flow_uuid.

  • flow_uuid – The Flow uuid. Incompatible with job_uuid.

Return type:

The number of modified Jobs.

stop_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None, custom_query: dict | None = None, raise_on_error: bool = True, wait: int | None = None, break_lock: bool = False) list[str][source]#

Stop selected Jobs. Only Jobs in the READY and all the running states can be stopped. The action is not reversible.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

  • custom_query – A generic query. Incompatible with all the other filtering options.

  • raise_on_error – If True raise in case of error on one job error and stop the loop. Otherwise, just log the error and proceed.

  • wait – In case the Flow or Jobs that need to be updated are locked, wait this time (in seconds) for the lock to be released. Raise an error if lock is not released.

  • break_lock – Forcibly break the lock on locked documents. Use with care and verify that the lock has been set by a process that is not running anymore. Doing otherwise will likely lead to inconsistencies in the DB.

Returns:

List of db_ids of the updated Jobs.

Return type:

list

unlock_flows(job_ids: str | list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: FlowState | list[FlowState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None) int[source]#

Forcibly remove the lock on a locked Flow document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies.

Parameters:
  • job_ids – One or more strings with uuids of Jobs belonging to the Flow.

  • db_ids – One or more db_ids of Jobs belonging to the Flow.

  • flow_ids – One or more Flow uuids.

  • states – One or more states of the Flows.

  • start_date – Filter Flows that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Flows that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Flow. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

Returns:

Number of modified Flows.

Return type:

int

unlock_jobs(job_ids: tuple[str, int] | list[tuple[str, int]] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, workers: str | list[str] | None = None) int[source]#

Forcibly remove the lock on a locked Job document. This should be used only if a lock is a leftover of a process that is not running anymore. Doing otherwise may result in inconsistencies.

Parameters:
  • job_ids – One or more tuples, each containing the (uuid, index) pair of the Jobs to retrieve.

  • db_ids – One or more db_ids of the Jobs to retrieve.

  • flow_ids – One or more Flow uuids to which the Jobs to retrieve belong.

  • states – One or more states of the Jobs.

  • start_date – Filter Jobs that were updated_on after this date. Should be in the machine local time zone. It will be converted to UTC.

  • end_date – Filter Jobs that were updated_on before this date. Should be in the machine local time zone. It will be converted to UTC.

  • name – Pattern matching the name of Job. Default is an exact match, but all conventions from python fnmatch can be used (e.g. test)

  • metadata – A dictionary of the values of the metadata to match. Should be an exact match for all the values provided.

  • workers – One or more worker names.

Returns:

Number of modified Jobs.

Return type:

int

update_flow_state(flow_uuid: str, updated_states: dict[str, dict[int, JobState | None]] | None = None) None[source]#

Update the state of a Flow in the DB based on the Job’s states.

The Flow should be locked while performing this operation.

Parameters:
  • flow_uuid – The uuid of the Flow to update.

  • updated_states – A dictionary with the updated states of Jobs that have not been stored in the DB yet. In the form {job_uuid: JobState value}. If the value is None the Job is considered deleted and the state of that Job will be ignored while determining the state of the whole Flow.

pydantic settings jobflow_remote.JobflowRemoteSettings[source]#

Bases: BaseSettings

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Show JSON schema
{
   "title": "JobflowRemoteSettings",
   "type": "object",
   "properties": {
      "config_file": {
         "default": "/home/runner/.jfremote.yaml",
         "description": "Location of the config file for jobflow remote.",
         "title": "Config File",
         "type": "string"
      },
      "projects_folder": {
         "default": "/home/runner/.jfremote",
         "description": "Location of the projects files.",
         "title": "Projects Folder",
         "type": "string"
      },
      "project": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The name of the project used.",
         "title": "Project"
      },
      "cli_full_exc": {
         "default": false,
         "description": "If True prints the full stack trace of the exception when raised in the CLI.",
         "title": "Cli Full Exc",
         "type": "boolean"
      },
      "cli_suggestions": {
         "default": true,
         "description": "If True prints some suggestions in the CLI commands.",
         "title": "Cli Suggestions",
         "type": "boolean"
      },
      "cli_log_level": {
         "$ref": "#/$defs/LogLevel",
         "default": "warn",
         "description": "The level set for logging in the CLI"
      }
   },
   "$defs": {
      "LogLevel": {
         "description": "Enumeration of logging level.",
         "enum": [
            "error",
            "warn",
            "info",
            "debug"
         ],
         "title": "LogLevel",
         "type": "string"
      }
   },
   "additionalProperties": false
}

Config:
  • env_prefix: str = jfremote_

Fields:
Validators:
field cli_full_exc: bool = False#

If True prints the full stack trace of the exception when raised in the CLI.

Validated by:
field cli_log_level: LogLevel = LogLevel.WARN#

The level set for logging in the CLI

Validated by:
field cli_suggestions: bool = True#

If True prints some suggestions in the CLI commands.

Validated by:
field config_file: str = '/home/runner/.jfremote.yaml'#

Location of the config file for jobflow remote.

Validated by:
field project: str | None = None#

The name of the project used.

Validated by:
field projects_folder: str = '/home/runner/.jfremote'#

Location of the projects files.

Validated by:
validator load_default_settings  »  all fields[source]#

Load settings from file or environment variables.

Loads settings from a root file if available and uses that as defaults in place of built-in defaults.

This allows setting of the config file path through environment variables.

jobflow_remote.get_jobstore(project_name: str | None = None) JobStore[source]#

Helper function to get the jobstore in a project.

Parameters:

project_name – Name of the project or None to use the one from the settings.

Return type:

A JobStore

jobflow_remote.set_run_config(flow_or_job: Flow | Job, name_filter: str = None, function_filter: Callable = None, exec_config: str | ExecutionConfig | None = None, resources: dict | QResources | None = None, worker: str | None = None, dynamic: bool = True) Flow | Job[source]#

Modify in place a Flow or a Job by setting the properties in the “manager_config” entry in the JobConfig associated to each Job matching the filter.

The values left as None will not be set and their value may be filled by those set in the submit_flow. If you want/need to leave resources or exec_config empty pass an empty QResources (or an empty dict) or an empty ExecutionConfig.

Uses the Flow/Job update_config() method, so follows the same conventions, also setting the options in the config_updates of the Job, to allow setting the same properties also in dynamically generated Jobs, if the dynamic option is True.

Note that calling this function will override all previously set values for matching Jobs, even if not specified here.

Parameters:
  • flow_or_job – A Flow or a Job to be modified

  • name_filter – A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. “ad” will match a job with the name “adder”.

  • function_filter – A filter for the job function. Only jobs with a matching function will be updated.

  • exec_config – The execution configuration to be added to the selected Jobs.

  • resources – The resources to be set for the selected Jobs.

  • worker – The worker where the selected Jobs will be executed.

  • dynamic – The updates will be propagated to Jobs/Flows dynamically generated at runtime.

Returns:

The modified object.

Return type:

Flow or Job

jobflow_remote.submit_flow(flow: jobflow.Flow | jobflow.Job | list[jobflow.Job], worker: str | None = None, project: str | None = None, exec_config: str | ExecutionConfig | None = None, resources: dict | QResources | None = None, allow_external_references: bool = False) list[int][source]#

Submit a flow for calculation to the selected Worker.

This will not start the calculation but just add to the database of the calculation to be executed.

Parameters:
  • flow – A flow or job.

  • worker – The name of the Worker where the calculation will be submitted. If None, use the first configured worker for this project.

  • project – the name of the project to which the Flow should be submitted. If None the current project will be used.

  • exec_config (str or ExecutionConfig) – the options to set before the execution of the job in the submission script. In addition to those defined in the Worker.

  • resources (Dict or QResources) – information passed to qtoolkit to require the resources for the submission to the queue.

  • allow_external_references – If False all the references to other outputs should be from other Jobs of the Flow.

Returns:

The list of db_ids of the submitted Jobs.

Return type:

List of int