jobflow_remote.config package#

Submodules#

Module contents#

exception jobflow_remote.config.ConfigError[source]#

Bases: Exception

A generic Exception related to the configuration.

class jobflow_remote.config.ConfigManager(exclude_unset: bool = False, exclude_none: bool = False, warn: bool = False)[source]#

Bases: object

A manager for the projects configuration files.

Provides tool to parse project information from the selected projects folder as well as methods to update the properties of each project.

Parameters:
  • exclude_unset – when dumping projects determine whether fields which were not explicitly set when creating the model should be excluded from the dictionary

  • exclude_none – when dumping projects determine whether fields which are equal to None should be excluded from the dictionary

  • warn – if True print warnings related to the parsing of the files in the projects folder

create_project(project: Project, ext='yaml') None[source]#

Create a new Project in the project folder by dumping the project to file.

Parameters:
  • project – The data of the project to be created.

  • ext

    The extension of the file to which the project will be dumped (yaml, json

    or toml)

dump_project(project_data: ProjectData) None[source]#

Dump the project to filepath specified in the ProjectData.

Parameters:

project_data – The project data to be dumped

get_exec_config(exec_config_name: str, project_name: str | None = None) ExecutionConfig[source]#

Return the ExecutionConfig object based on the name.

Parameters:
  • exec_config_name – Name of the ExecutionConfig.

  • project_name – Name of the project from which the ExecutionConfig should be retrieved, or None to use the one from the settings.

Returns:

The selected ExecutionConfig

Return type:

ExecutionConfig

get_project(project_name: str | None = None) Project[source]#

Get the Project object based from the project name.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The selected Project

Return type:

Project

get_project_data(project_name: str | None = None) ProjectData[source]#

Get the ProjectData object based from the project name.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The selected ProjectData

Return type:

ProjectData

get_worker(worker_name: str, project_name: str | None = None) WorkerBase[source]#

Return the worker object based on the name.

Parameters:
  • worker_name – Name of the worker to retrieve.

  • project_name – Name of the project from which the Worker should be retrieved, or None to use the one from the settings.

Returns:

The selected Worker.

Return type:

WorkerBase

load_projects_data() dict[str, ProjectData][source]#

Load projects from the selected projects folder.

Returns:

Dictionary with project name as key and ProjectData as value.

Return type:

dict

project_names_from_files() list[str][source]#

Parses all the prasable files and only checks for the “name” attribute to return a list of potential project file names.

Useful in case some projects cannot be properly parsed, but the full list needs to be returned.

Returns:

List of project names.

Return type:

list

property projects: dict[str, Project]#

returns: Dictionary with project name as key and Project as value. :rtype: dict

projects_ext = ('json', 'yaml', 'toml')#
remove_exec_config(exec_config_name: str, project_name: str | None = None) None[source]#

Remove an ExecutionConfig from the selected project.

Parameters:
  • exec_config_name – Name of the ExecutionConfig to be removed

  • project_name – Name of the project from which the ExecutionConfig should be removed, or None to use the one from the settings.

remove_project(project_name: str, remove_folders: bool = True) None[source]#

Remove a project from the projects folder.

Parameters:
  • project_name – Name of the project to be removed.

  • remove_folders – Optionally remove the folders related to the project (e.g. tmp, log).

remove_worker(worker_name: str, project_name: str | None = None) None[source]#

Remove a worker from the selected project.

Parameters:
  • worker_name – Name of the worker to be removed

  • project_name – Name of the project from which the Worker should be removed, or None to use the one from the settings.

select_project_name(project_name: str | None = None) str[source]#

Determine the project name to be used based on the passed value and on the general settings.

Parameters:

project_name – The name of the project or None to use the value from the settings

Returns:

The name of the selected project.

Return type:

str

set_exec_config(exec_config_name: str, exec_config: ExecutionConfig, project_name: str | None = None, replace: bool = False) None[source]#

Set an ExecutionConfig in the selected project. Can add a new ExecutionConfig or replace an existing one.

Parameters:
  • exec_config_name – Name of the ExecutionConfig to be added or replaced.

  • exec_config – The ExecutionConfig.

  • project_name

    Name of the project where the ExecutionConfig is set, or None to use

    the one from the settings.

  • replace – Raise an exception if False and an ExecutionConfig with the chosen name already exists.

set_jobstore(jobstore: JobStore, project_name: str | None = None) None[source]#

Set the project specific store used for jobflow.

Parameters:
  • jobstore – A maggma Store

  • project_name – Name of the project where the Store is set, or None to use the one from the settings.

set_queue_db(store: MongoStore, project_name: str | None = None) None[source]#

Set the project specific store used for managing the queue.

Parameters:
  • store – A maggma Store

  • project_name – Name of the project where the Store is set, or None to use the one from the settings.

set_worker(name: str, worker: WorkerBase, project_name: str | None = None, replace: bool = False) None[source]#

Set a worker in the selected project. Can add a new worker or replace an existing one.

Parameters:
  • name – Name of the worker to be added or replaced.

  • worker – Worker to be set.

  • project_name – Name of the project where the Worker is set, or None to use the one from the settings.

  • replace – Raise an exception if False and a Worker with the chosen name already exists.

update_project(config: dict, project_name: str) None[source]#

Update the project values. The passed dict with values will be recursively merged in the current project.

Parameters:
  • config – Dictionary with the project values to be updated.

  • project_name – Name of the project to be updated

pydantic model jobflow_remote.config.LocalWorker[source]#

Bases: WorkerBase

Worker representing the local host.

Executes command directly.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Show JSON schema
{
   "title": "LocalWorker",
   "description": "Worker representing the local host.\n\nExecutes command directly.",
   "type": "object",
   "properties": {
      "type": {
         "const": "local",
         "default": "local",
         "description": "The discriminator field to determine the worker type",
         "enum": [
            "local"
         ],
         "title": "Type",
         "type": "string"
      },
      "scheduler_type": {
         "description": "Type of the scheduler. Depending on the values supported by QToolKit",
         "title": "Scheduler Type",
         "type": "string"
      },
      "work_dir": {
         "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created",
         "format": "path",
         "title": "Work Dir",
         "type": "string"
      },
      "resources": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template",
         "title": "Resources"
      },
      "pre_run": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "String with commands that will be executed before the execution of the Job",
         "title": "Pre Run"
      },
      "post_run": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "String with commands that will be executed after the execution of the Job",
         "title": "Post Run"
      },
      "timeout_execute": {
         "default": 60,
         "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)",
         "title": "Timeout Execute",
         "type": "integer"
      },
      "max_jobs": {
         "anyOf": [
            {
               "minimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The maximum number of jobs that can be submitted to the queue.",
         "title": "Max Jobs"
      },
      "batch": {
         "anyOf": [
            {
               "$ref": "#/$defs/BatchConfig"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Options for batch execution. If define the worker will be considered a batch worker"
      }
   },
   "$defs": {
      "BatchConfig": {
         "additionalProperties": false,
         "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).",
         "properties": {
            "jobs_handle_dir": {
               "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed",
               "format": "path",
               "title": "Jobs Handle Dir",
               "type": "string"
            },
            "work_dir": {
               "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.",
               "format": "path",
               "title": "Work Dir",
               "type": "string"
            },
            "max_jobs": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum number of jobs executed in a single run in the queue",
               "title": "Max Jobs"
            },
            "max_wait": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 60,
               "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)",
               "title": "Max Wait"
            },
            "max_time": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum time after which a job will not submit more jobs (seconds). To help avoid hitting the walltime",
               "title": "Max Time"
            }
         },
         "required": [
            "jobs_handle_dir",
            "work_dir"
         ],
         "title": "BatchConfig",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "scheduler_type",
      "work_dir"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:

field type: Literal['local'] = 'local'#

The discriminator field to determine the worker type

get_host() BaseHost[source]#

Return the LocalHost.

Return type:

The LocalHost.

property cli_info: dict#

Short information about the worker to be displayed in the command line interface.

Return type:

A dictionary with the Worker short information.

pydantic model jobflow_remote.config.Project[source]#

Bases: BaseModel

The configurations of a Project.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Show JSON schema
{
   "title": "Project",
   "description": "The configurations of a Project.",
   "type": "object",
   "properties": {
      "name": {
         "description": "The name of the project",
         "title": "Name",
         "type": "string"
      },
      "base_dir": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The base directory containing the project related files. Default is a folder with the project name inside the projects folder",
         "title": "Base Dir"
      },
      "tmp_dir": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Folder where remote files are copied. Default a 'tmp' folder in base_dir",
         "title": "Tmp Dir"
      },
      "log_dir": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Folder containing all the logs. Default a 'log' folder in base_dir",
         "title": "Log Dir"
      },
      "daemon_dir": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Folder containing daemon related files. Default to a 'daemon' folder in base_dir",
         "title": "Daemon Dir"
      },
      "log_level": {
         "$ref": "#/$defs/LogLevel",
         "default": "info",
         "description": "The level set for logging"
      },
      "runner": {
         "$ref": "#/$defs/RunnerOptions",
         "description": "The options for the Runner"
      },
      "workers": {
         "additionalProperties": {
            "discriminator": {
               "mapping": {
                  "local": "#/$defs/LocalWorker",
                  "remote": "#/$defs/RemoteWorker"
               },
               "propertyName": "type"
            },
            "oneOf": [
               {
                  "$ref": "#/$defs/LocalWorker"
               },
               {
                  "$ref": "#/$defs/RemoteWorker"
               }
            ]
         },
         "description": "A dictionary with the worker name as keys and the worker configuration as values",
         "title": "Workers",
         "type": "object"
      },
      "queue": {
         "$ref": "#/$defs/QueueConfig",
         "description": "The configuration of the Store used to store the states of the Jobs and the Flows"
      },
      "exec_config": {
         "additionalProperties": {
            "$ref": "#/$defs/ExecutionConfig"
         },
         "description": "A dictionary with the ExecutionConfig name as keys and the ExecutionConfig configuration as values",
         "title": "Exec Config",
         "type": "object"
      },
      "jobstore": {
         "description": "The JobStore used for the output. Can contain the monty serialized dictionary or the Store in the Jobflow format",
         "title": "Jobstore",
         "type": "object"
      },
      "remote_jobstore": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The JobStore used for the data transfer between the Runnerand the workers. Can be a string with the standard values",
         "title": "Remote Jobstore"
      },
      "metadata": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "A dictionary with metadata associated to the project",
         "title": "Metadata"
      }
   },
   "$defs": {
      "BatchConfig": {
         "additionalProperties": false,
         "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).",
         "properties": {
            "jobs_handle_dir": {
               "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed",
               "format": "path",
               "title": "Jobs Handle Dir",
               "type": "string"
            },
            "work_dir": {
               "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.",
               "format": "path",
               "title": "Work Dir",
               "type": "string"
            },
            "max_jobs": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum number of jobs executed in a single run in the queue",
               "title": "Max Jobs"
            },
            "max_wait": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 60,
               "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)",
               "title": "Max Wait"
            },
            "max_time": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum time after which a job will not submit more jobs (seconds). To help avoid hitting the walltime",
               "title": "Max Time"
            }
         },
         "required": [
            "jobs_handle_dir",
            "work_dir"
         ],
         "title": "BatchConfig",
         "type": "object"
      },
      "ConnectionData": {
         "description": "The representation of a fabric connection.\nMainly used in case of nested gateways.",
         "properties": {
            "host": {
               "description": "The host to which to connect",
               "title": "Host",
               "type": "string"
            },
            "user": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login username",
               "title": "User"
            },
            "port": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Port number",
               "title": "Port"
            },
            "password": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login password",
               "title": "Password"
            },
            "key_filename": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication",
               "title": "Key Filename"
            },
            "passphrase": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Passphrase used for decrypting private keys",
               "title": "Passphrase"
            },
            "gateway": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "$ref": "#/$defs/ConnectionData"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "A shell command string to use as a proxy or gateway",
               "title": "Gateway"
            },
            "connect_kwargs": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect",
               "title": "Connect Kwargs"
            }
         },
         "required": [
            "host"
         ],
         "title": "ConnectionData",
         "type": "object"
      },
      "ExecutionConfig": {
         "additionalProperties": false,
         "description": "Configuration to be set before and after the execution of a Job.",
         "properties": {
            "modules": {
               "anyOf": [
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "list of modules to be loaded",
               "title": "Modules"
            },
            "export": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "dictionary with variable to be exported",
               "title": "Export"
            },
            "pre_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Other commands to be executed before the execution of a job",
               "title": "Pre Run"
            },
            "post_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Commands to be executed after the execution of a job",
               "title": "Post Run"
            }
         },
         "title": "ExecutionConfig",
         "type": "object"
      },
      "LocalWorker": {
         "additionalProperties": false,
         "description": "Worker representing the local host.\n\nExecutes command directly.",
         "properties": {
            "type": {
               "const": "local",
               "default": "local",
               "description": "The discriminator field to determine the worker type",
               "enum": [
                  "local"
               ],
               "title": "Type",
               "type": "string"
            },
            "scheduler_type": {
               "description": "Type of the scheduler. Depending on the values supported by QToolKit",
               "title": "Scheduler Type",
               "type": "string"
            },
            "work_dir": {
               "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created",
               "format": "path",
               "title": "Work Dir",
               "type": "string"
            },
            "resources": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template",
               "title": "Resources"
            },
            "pre_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "String with commands that will be executed before the execution of the Job",
               "title": "Pre Run"
            },
            "post_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "String with commands that will be executed after the execution of the Job",
               "title": "Post Run"
            },
            "timeout_execute": {
               "default": 60,
               "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)",
               "title": "Timeout Execute",
               "type": "integer"
            },
            "max_jobs": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The maximum number of jobs that can be submitted to the queue.",
               "title": "Max Jobs"
            },
            "batch": {
               "anyOf": [
                  {
                     "$ref": "#/$defs/BatchConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Options for batch execution. If define the worker will be considered a batch worker"
            }
         },
         "required": [
            "scheduler_type",
            "work_dir"
         ],
         "title": "LocalWorker",
         "type": "object"
      },
      "LogLevel": {
         "description": "Enumeration of logging level.",
         "enum": [
            "error",
            "warn",
            "info",
            "debug"
         ],
         "title": "LogLevel",
         "type": "string"
      },
      "QueueConfig": {
         "additionalProperties": false,
         "properties": {
            "store": {
               "description": "Dictionary describing a maggma Store used for the queue data. Can contain the monty serialized dictionary or a dictionary with a 'type' specifying the Store subclass. Should be subclass of a MongoStore, as it requires to perform MongoDB actions. The collection is used to store the jobs",
               "title": "Store",
               "type": "object"
            },
            "flows_collection": {
               "default": "flows",
               "description": "The name of the collection containing information about the flows. Taken from the same database as the one defined in the store",
               "title": "Flows Collection",
               "type": "string"
            },
            "auxiliary_collection": {
               "default": "jf_auxiliary",
               "description": "The name of the collection containing auxiliary information. Taken from the same database as the one defined in the store",
               "title": "Auxiliary Collection",
               "type": "string"
            },
            "db_id_prefix": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "a string defining the prefix added to the integer ID associated to each Job in the database",
               "title": "Db Id Prefix"
            }
         },
         "title": "QueueConfig",
         "type": "object"
      },
      "RemoteWorker": {
         "additionalProperties": false,
         "description": "Worker representing a remote host reached through an SSH connection.\n\nUses a Fabric Connection. Check Fabric documentation for more details on the\noptions defining a Connection.",
         "properties": {
            "type": {
               "const": "remote",
               "default": "remote",
               "description": "The discriminator field to determine the worker type",
               "enum": [
                  "remote"
               ],
               "title": "Type",
               "type": "string"
            },
            "scheduler_type": {
               "description": "Type of the scheduler. Depending on the values supported by QToolKit",
               "title": "Scheduler Type",
               "type": "string"
            },
            "work_dir": {
               "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created",
               "format": "path",
               "title": "Work Dir",
               "type": "string"
            },
            "resources": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template",
               "title": "Resources"
            },
            "pre_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "String with commands that will be executed before the execution of the Job",
               "title": "Pre Run"
            },
            "post_run": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "String with commands that will be executed after the execution of the Job",
               "title": "Post Run"
            },
            "timeout_execute": {
               "default": 60,
               "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)",
               "title": "Timeout Execute",
               "type": "integer"
            },
            "max_jobs": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The maximum number of jobs that can be submitted to the queue.",
               "title": "Max Jobs"
            },
            "batch": {
               "anyOf": [
                  {
                     "$ref": "#/$defs/BatchConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Options for batch execution. If define the worker will be considered a batch worker"
            },
            "host": {
               "description": "The host to which to connect",
               "title": "Host",
               "type": "string"
            },
            "user": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login username",
               "title": "User"
            },
            "port": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Port number",
               "title": "Port"
            },
            "password": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login password",
               "title": "Password"
            },
            "key_filename": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication",
               "title": "Key Filename"
            },
            "passphrase": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Passphrase used for decrypting private keys",
               "title": "Passphrase"
            },
            "gateway": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "$ref": "#/$defs/ConnectionData"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "A shell command string to use as a proxy or gateway",
               "title": "Gateway"
            },
            "forward_agent": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Whether to enable SSH agent forwarding",
               "title": "Forward Agent"
            },
            "connect_timeout": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Connection timeout, in seconds",
               "title": "Connect Timeout"
            },
            "connect_kwargs": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect",
               "title": "Connect Kwargs"
            },
            "inline_ssh_env": {
               "anyOf": [
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Whether to send environment variables 'inline' as prefixes in front of command strings",
               "title": "Inline Ssh Env"
            },
            "keepalive": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 60,
               "description": "Keepalive value in seconds passed to paramiko's transport",
               "title": "Keepalive"
            },
            "shell_cmd": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": "bash",
               "description": "The shell command used to execute the command remotely. If None the command is executed directly",
               "title": "Shell Cmd"
            },
            "login_shell": {
               "default": true,
               "description": "Whether to use a login shell when executing the command",
               "title": "Login Shell",
               "type": "boolean"
            },
            "interactive_login": {
               "default": false,
               "description": "Whether the authentication to the host should be interactive",
               "title": "Interactive Login",
               "type": "boolean"
            }
         },
         "required": [
            "scheduler_type",
            "work_dir",
            "host"
         ],
         "title": "RemoteWorker",
         "type": "object"
      },
      "RunnerOptions": {
         "additionalProperties": false,
         "description": "Options to tune the execution of the Runner.",
         "properties": {
            "delay_checkout": {
               "default": 30,
               "description": "Delay between subsequent execution of the checkout from database (seconds)",
               "title": "Delay Checkout",
               "type": "integer"
            },
            "delay_check_run_status": {
               "default": 30,
               "description": "Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)",
               "title": "Delay Check Run Status",
               "type": "integer"
            },
            "delay_advance_status": {
               "default": 30,
               "description": "Delay between subsequent advancement of the job's remote state (seconds)",
               "title": "Delay Advance Status",
               "type": "integer"
            },
            "delay_refresh_limited": {
               "default": 600,
               "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present",
               "title": "Delay Refresh Limited",
               "type": "integer"
            },
            "delay_update_batch": {
               "default": 60,
               "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present",
               "title": "Delay Update Batch",
               "type": "integer"
            },
            "lock_timeout": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 86400,
               "description": "Time to consider the lock on a document expired and can be overridden (seconds)",
               "title": "Lock Timeout"
            },
            "delete_tmp_folder": {
               "default": true,
               "description": "Whether to delete the local temporary folder after a job has completed",
               "title": "Delete Tmp Folder",
               "type": "boolean"
            },
            "max_step_attempts": {
               "default": 3,
               "description": "Maximum number of attempt performed before failing an advancement of a remote state",
               "title": "Max Step Attempts",
               "type": "integer"
            },
            "delta_retry": {
               "default": [
                  30,
                  300,
                  1200
               ],
               "description": "List of increasing delay between subsequent attempts when the advancement of a remote step fails",
               "items": {
                  "type": "integer"
               },
               "title": "Delta Retry",
               "type": "array"
            }
         },
         "title": "RunnerOptions",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "name",
      "queue"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:
field base_dir: str | None = None#

The base directory containing the project related files. Default is a folder with the project name inside the projects folder

Validated by:
field daemon_dir: str | None = None#

Folder containing daemon related files. Default to a ‘daemon’ folder in base_dir

Validated by:
field exec_config: dict[str, ExecutionConfig] [Optional]#

A dictionary with the ExecutionConfig name as keys and the ExecutionConfig configuration as values

field jobstore: dict [Optional]#

The JobStore used for the output. Can contain the monty serialized dictionary or the Store in the Jobflow format

Validated by:
field log_dir: str | None = None#

Folder containing all the logs. Default a ‘log’ folder in base_dir

Validated by:
field log_level: LogLevel = LogLevel.INFO#

The level set for logging

field metadata: dict | None = None#

A dictionary with metadata associated to the project

field name: str [Required]#

The name of the project

field queue: QueueConfig [Required]#

The configuration of the Store used to store the states of the Jobs and the Flows

field remote_jobstore: dict | None = None#

The JobStore used for the data transfer between the Runnerand the workers. Can be a string with the standard values

field runner: RunnerOptions [Optional]#

The options for the Runner

field tmp_dir: str | None = None#

Folder where remote files are copied. Default a ‘tmp’ folder in base_dir

Validated by:
field workers: dict[str, Annotated[LocalWorker | RemoteWorker, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] [Optional]#

A dictionary with the worker name as keys and the worker configuration as values

validator check_base_dir  »  base_dir[source]#

Validator to set the default of base_dir based on the project name.

validator check_daemon_dir  »  daemon_dir[source]#

Validator to set the default of daemon_dir based on the base_dir.

validator check_jobstore  »  jobstore[source]#

Check that the jobstore configuration could be converted to a JobStore.

validator check_log_dir  »  log_dir[source]#

Validator to set the default of log_dir based on the base_dir.

validator check_tmp_dir  »  tmp_dir[source]#

Validator to set the default of tmp_dir based on the base_dir.

get_job_controller()[source]#
get_jobstore() JobStore | None[source]#

Generate an instance of the JobStore based on the configuration.

Return type:

A JobStore

get_queue_store()[source]#

Generate an instance of a maggma Store based on the queue configuration.

Return type:

A maggma Store

class jobflow_remote.config.ProjectData(filepath, project, ext)[source]#

Bases: NamedTuple

Create new instance of ProjectData(filepath, project, ext)

ext: str#

Alias for field number 2

filepath: str | Path#

Alias for field number 0

project: Project#

Alias for field number 1

pydantic model jobflow_remote.config.RemoteWorker[source]#

Bases: WorkerBase

Worker representing a remote host reached through an SSH connection.

Uses a Fabric Connection. Check Fabric documentation for more details on the options defining a Connection.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Show JSON schema
{
   "title": "RemoteWorker",
   "description": "Worker representing a remote host reached through an SSH connection.\n\nUses a Fabric Connection. Check Fabric documentation for more details on the\noptions defining a Connection.",
   "type": "object",
   "properties": {
      "type": {
         "const": "remote",
         "default": "remote",
         "description": "The discriminator field to determine the worker type",
         "enum": [
            "remote"
         ],
         "title": "Type",
         "type": "string"
      },
      "scheduler_type": {
         "description": "Type of the scheduler. Depending on the values supported by QToolKit",
         "title": "Scheduler Type",
         "type": "string"
      },
      "work_dir": {
         "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created",
         "format": "path",
         "title": "Work Dir",
         "type": "string"
      },
      "resources": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template",
         "title": "Resources"
      },
      "pre_run": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "String with commands that will be executed before the execution of the Job",
         "title": "Pre Run"
      },
      "post_run": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "String with commands that will be executed after the execution of the Job",
         "title": "Post Run"
      },
      "timeout_execute": {
         "default": 60,
         "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)",
         "title": "Timeout Execute",
         "type": "integer"
      },
      "max_jobs": {
         "anyOf": [
            {
               "minimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The maximum number of jobs that can be submitted to the queue.",
         "title": "Max Jobs"
      },
      "batch": {
         "anyOf": [
            {
               "$ref": "#/$defs/BatchConfig"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Options for batch execution. If define the worker will be considered a batch worker"
      },
      "host": {
         "description": "The host to which to connect",
         "title": "Host",
         "type": "string"
      },
      "user": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Login username",
         "title": "User"
      },
      "port": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Port number",
         "title": "Port"
      },
      "password": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Login password",
         "title": "Password"
      },
      "key_filename": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "items": {
                  "type": "string"
               },
               "type": "array"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication",
         "title": "Key Filename"
      },
      "passphrase": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Passphrase used for decrypting private keys",
         "title": "Passphrase"
      },
      "gateway": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "$ref": "#/$defs/ConnectionData"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "A shell command string to use as a proxy or gateway",
         "title": "Gateway"
      },
      "forward_agent": {
         "anyOf": [
            {
               "type": "boolean"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Whether to enable SSH agent forwarding",
         "title": "Forward Agent"
      },
      "connect_timeout": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Connection timeout, in seconds",
         "title": "Connect Timeout"
      },
      "connect_kwargs": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect",
         "title": "Connect Kwargs"
      },
      "inline_ssh_env": {
         "anyOf": [
            {
               "type": "boolean"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Whether to send environment variables 'inline' as prefixes in front of command strings",
         "title": "Inline Ssh Env"
      },
      "keepalive": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": 60,
         "description": "Keepalive value in seconds passed to paramiko's transport",
         "title": "Keepalive"
      },
      "shell_cmd": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": "bash",
         "description": "The shell command used to execute the command remotely. If None the command is executed directly",
         "title": "Shell Cmd"
      },
      "login_shell": {
         "default": true,
         "description": "Whether to use a login shell when executing the command",
         "title": "Login Shell",
         "type": "boolean"
      },
      "interactive_login": {
         "default": false,
         "description": "Whether the authentication to the host should be interactive",
         "title": "Interactive Login",
         "type": "boolean"
      }
   },
   "$defs": {
      "BatchConfig": {
         "additionalProperties": false,
         "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).",
         "properties": {
            "jobs_handle_dir": {
               "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed",
               "format": "path",
               "title": "Jobs Handle Dir",
               "type": "string"
            },
            "work_dir": {
               "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.",
               "format": "path",
               "title": "Work Dir",
               "type": "string"
            },
            "max_jobs": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum number of jobs executed in a single run in the queue",
               "title": "Max Jobs"
            },
            "max_wait": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": 60,
               "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)",
               "title": "Max Wait"
            },
            "max_time": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum time after which a job will not submit more jobs (seconds). To help avoid hitting the walltime",
               "title": "Max Time"
            }
         },
         "required": [
            "jobs_handle_dir",
            "work_dir"
         ],
         "title": "BatchConfig",
         "type": "object"
      },
      "ConnectionData": {
         "description": "The representation of a fabric connection.\nMainly used in case of nested gateways.",
         "properties": {
            "host": {
               "description": "The host to which to connect",
               "title": "Host",
               "type": "string"
            },
            "user": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login username",
               "title": "User"
            },
            "port": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Port number",
               "title": "Port"
            },
            "password": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Login password",
               "title": "Password"
            },
            "key_filename": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "items": {
                        "type": "string"
                     },
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication",
               "title": "Key Filename"
            },
            "passphrase": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Passphrase used for decrypting private keys",
               "title": "Passphrase"
            },
            "gateway": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "$ref": "#/$defs/ConnectionData"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "A shell command string to use as a proxy or gateway",
               "title": "Gateway"
            },
            "connect_kwargs": {
               "anyOf": [
                  {
                     "type": "object"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect",
               "title": "Connect Kwargs"
            }
         },
         "required": [
            "host"
         ],
         "title": "ConnectionData",
         "type": "object"
      }
   },
   "additionalProperties": false,
   "required": [
      "scheduler_type",
      "work_dir",
      "host"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:

field connect_kwargs: dict | None = None#

Other keyword arguments passed to paramiko.client.SSHClient.connect

field connect_timeout: int | None = None#

Connection timeout, in seconds

field forward_agent: bool | None = None#

Whether to enable SSH agent forwarding

field gateway: str | ConnectionData | None = None#

A shell command string to use as a proxy or gateway

field host: str [Required]#

The host to which to connect

field inline_ssh_env: bool | None = None#

Whether to send environment variables ‘inline’ as prefixes in front of command strings

field interactive_login: bool = False#

Whether the authentication to the host should be interactive

field keepalive: int | None = 60#

Keepalive value in seconds passed to paramiko’s transport

field key_filename: str | list[str] | None = None#

The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication

field login_shell: bool = True#

Whether to use a login shell when executing the command

field passphrase: str | None = None#

Passphrase used for decrypting private keys

field password: str | None = None#

Login password

field port: int | None = None#

Port number

field shell_cmd: str | None = 'bash'#

The shell command used to execute the command remotely. If None the command is executed directly

field type: Literal['remote'] = 'remote'#

The discriminator field to determine the worker type

field user: str | None = None#

Login username

get_host() BaseHost[source]#

Return the RemoteHost.

Return type:

The RemoteHost.

property cli_info: dict#

Short information about the worker to be displayed in the command line interface.

Return type:

A dictionary with the Worker short information.

pydantic model jobflow_remote.config.RunnerOptions[source]#

Bases: BaseModel

Options to tune the execution of the Runner.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Show JSON schema
{
   "title": "RunnerOptions",
   "description": "Options to tune the execution of the Runner.",
   "type": "object",
   "properties": {
      "delay_checkout": {
         "default": 30,
         "description": "Delay between subsequent execution of the checkout from database (seconds)",
         "title": "Delay Checkout",
         "type": "integer"
      },
      "delay_check_run_status": {
         "default": 30,
         "description": "Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)",
         "title": "Delay Check Run Status",
         "type": "integer"
      },
      "delay_advance_status": {
         "default": 30,
         "description": "Delay between subsequent advancement of the job's remote state (seconds)",
         "title": "Delay Advance Status",
         "type": "integer"
      },
      "delay_refresh_limited": {
         "default": 600,
         "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present",
         "title": "Delay Refresh Limited",
         "type": "integer"
      },
      "delay_update_batch": {
         "default": 60,
         "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present",
         "title": "Delay Update Batch",
         "type": "integer"
      },
      "lock_timeout": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": 86400,
         "description": "Time to consider the lock on a document expired and can be overridden (seconds)",
         "title": "Lock Timeout"
      },
      "delete_tmp_folder": {
         "default": true,
         "description": "Whether to delete the local temporary folder after a job has completed",
         "title": "Delete Tmp Folder",
         "type": "boolean"
      },
      "max_step_attempts": {
         "default": 3,
         "description": "Maximum number of attempt performed before failing an advancement of a remote state",
         "title": "Max Step Attempts",
         "type": "integer"
      },
      "delta_retry": {
         "default": [
            30,
            300,
            1200
         ],
         "description": "List of increasing delay between subsequent attempts when the advancement of a remote step fails",
         "items": {
            "type": "integer"
         },
         "title": "Delta Retry",
         "type": "array"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
field delay_advance_status: int = 30#

Delay between subsequent advancement of the job’s remote state (seconds)

field delay_check_run_status: int = 30#

Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)

field delay_checkout: int = 30#

Delay between subsequent execution of the checkout from database (seconds)

field delay_refresh_limited: int = 600#

Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present

field delay_update_batch: int = 60#

Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present

field delete_tmp_folder: bool = True#

Whether to delete the local temporary folder after a job has completed

field delta_retry: tuple[int, ...] = (30, 300, 1200)#

List of increasing delay between subsequent attempts when the advancement of a remote step fails

field lock_timeout: int | None = 86400#

Time to consider the lock on a document expired and can be overridden (seconds)

field max_step_attempts: int = 3#

Maximum number of attempt performed before failing an advancement of a remote state

get_delta_retry(step_attempts: int) int[source]#

The time to wait before retrying a failed advancement of the remote state, based on the number of attempts.

If exceeding the size of the list delta_retry, the last value is returned.

Parameters:

step_attempts – The number of attempts advancing a remote state.

Return type:

The delay in seconds.