jobflow_remote.config.base module#
- pydantic model jobflow_remote.config.base.BatchConfig[source]#
Configuration for execution of batch jobs.
Allows to execute multiple Jobs in a single process executed on the worker (e.g. SLURM job).
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "BatchConfig", "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).", "type": "object", "properties": { "jobs_handle_dir": { "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed", "format": "path", "title": "Jobs Handle Dir", "type": "string" }, "work_dir": { "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.", "format": "path", "title": "Work Dir", "type": "string" }, "max_jobs_per_batch": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum number of jobs executed in a single batch process", "title": "Max Jobs Per Batch" }, "max_wait": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)", "title": "Max Wait" }, "max_time": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime", "title": "Max Time" }, "parallel_jobs": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Number of jobs executed in parallel in the same process", "title": "Parallel Jobs" } }, "additionalProperties": false, "required": [ "jobs_handle_dir", "work_dir" ] }
- Config:
extra: str = forbid
- Fields:
- field jobs_handle_dir: Path [Required]#
Absolute path to a folder that will be used to store information to share with the jobs being executed
- field max_jobs_per_batch: int | None = None#
Maximum number of jobs executed in a single batch process
- field max_time: int | None = None#
Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime
- exception jobflow_remote.config.base.ConfigError[source]#
A generic Exception related to the configuration.
- pydantic model jobflow_remote.config.base.ConnectionData[source]#
The representation of a fabric connection. Mainly used in case of nested gateways.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "$defs": { "ConnectionData": { "description": "The representation of a fabric connection.\nMainly used in case of nested gateways.", "properties": { "host": { "description": "The host to which to connect", "title": "Host", "type": "string" }, "user": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login username", "title": "User" }, "port": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Port number", "title": "Port" }, "password": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login password", "title": "Password" }, "key_filename": { "anyOf": [ { "type": "string" }, { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication", "title": "Key Filename" }, "passphrase": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Passphrase used for decrypting private keys", "title": "Passphrase" }, "gateway": { "anyOf": [ { "type": "string" }, { "$ref": "#/$defs/ConnectionData" }, { "type": "null" } ], "default": null, "description": "A shell command string to use as a proxy or gateway", "title": "Gateway" }, "connect_kwargs": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect", "title": "Connect Kwargs" } }, "required": [ "host" ], "title": "ConnectionData", "type": "object" } }, "$ref": "#/$defs/ConnectionData" }
- Fields:
- field connect_kwargs: dict | None = None#
Other keyword arguments passed to paramiko.client.SSHClient.connect
- field gateway: str | ConnectionData | None = None#
A shell command string to use as a proxy or gateway
- pydantic model jobflow_remote.config.base.ExecutionConfig[source]#
Configuration to be set before and after the execution of a Job.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "ExecutionConfig", "description": "Configuration to be set before and after the execution of a Job.", "type": "object", "properties": { "modules": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "list of modules to be loaded", "title": "Modules" }, "export": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "dictionary with variable to be exported", "title": "Export" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Other commands to be executed before the execution of a job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Commands to be executed after the execution of a job", "title": "Post Run" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- pydantic model jobflow_remote.config.base.LocalWorker[source]#
Worker representing the local host.
Executes command directly.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "LocalWorker", "description": "Worker representing the local host.\n\nExecutes command directly.", "type": "object", "properties": { "type": { "const": "local", "default": "local", "description": "The discriminator field to determine the worker type", "title": "Type", "type": "string" }, "scheduler_type": { "description": "Type of the scheduler. Depending on the values supported by QToolKit", "title": "Scheduler Type", "type": "string" }, "work_dir": { "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created", "format": "path", "title": "Work Dir", "type": "string" }, "resources": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template", "title": "Resources" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed before the execution of the Job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed after the execution of the Job", "title": "Post Run" }, "execution_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands to execute the Job on the worker. By default will be set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).", "title": "Execution Cmd" }, "timeout_execute": { "default": 60, "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)", "title": "Timeout Execute", "type": "integer" }, "max_jobs": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "The maximum number of jobs that can be submitted to the queue.", "title": "Max Jobs" }, "batch": { "anyOf": [ { "$ref": "#/$defs/BatchConfig" }, { "type": "null" } ], "default": null, "description": "Options for batch execution. If define the worker will be considered a batch worker" }, "scheduler_username": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)", "title": "Scheduler Username" }, "sanitize_command": { "default": false, "description": "Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.", "title": "Sanitize Command", "type": "boolean" }, "delay_download": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).", "title": "Delay Download" } }, "$defs": { "BatchConfig": { "additionalProperties": false, "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).", "properties": { "jobs_handle_dir": { "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed", "format": "path", "title": "Jobs Handle Dir", "type": "string" }, "work_dir": { "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.", "format": "path", "title": "Work Dir", "type": "string" }, "max_jobs_per_batch": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum number of jobs executed in a single batch process", "title": "Max Jobs Per Batch" }, "max_wait": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)", "title": "Max Wait" }, "max_time": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime", "title": "Max Time" }, "parallel_jobs": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Number of jobs executed in parallel in the same process", "title": "Parallel Jobs" } }, "required": [ "jobs_handle_dir", "work_dir" ], "title": "BatchConfig", "type": "object" } }, "additionalProperties": false, "required": [ "scheduler_type", "work_dir" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field batch: BatchConfig | None = None#
Options for batch execution. If define the worker will be considered a batch worker
- field delay_download: int | None = None#
Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).
- field execution_cmd: str | None = None#
String with commands to execute the Job on the worker. By default will be set to jf -fe execution run {}. The {} part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).
- Validated by:
- field max_jobs: int | None = None#
The maximum number of jobs that can be submitted to the queue.
- Constraints:
ge = 0
- field post_run: str | None = None#
String with commands that will be executed after the execution of the Job
- field pre_run: str | None = None#
String with commands that will be executed before the execution of the Job
- field resources: dict | None = None#
A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template
- field sanitize_command: bool = False#
Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.
- field scheduler_type: str [Required]#
Type of the scheduler. Depending on the values supported by QToolKit
- Validated by:
- field scheduler_username: str | None = None#
If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)
- field timeout_execute: int = 60#
Timeout for the execution of the commands in the worker (e.g. submitting a job)
- field work_dir: Path [Required]#
Absolute path of the directory of the worker where subfolders for executing the calculation will be created
- Validated by:
- class jobflow_remote.config.base.LogLevel(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Enumeration of logging level.
- DEBUG = 'debug'#
- ERROR = 'error'#
- INFO = 'info'#
- WARN = 'warn'#
- pydantic model jobflow_remote.config.base.Project[source]#
The configurations of a Project.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "Project", "description": "The configurations of a Project.", "type": "object", "properties": { "name": { "description": "The name of the project", "title": "Name", "type": "string" }, "base_dir": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "The base directory containing the project related files. Default is a folder with the project name inside the projects folder", "title": "Base Dir" }, "tmp_dir": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Folder where remote files are copied. Default a 'tmp' folder in base_dir", "title": "Tmp Dir" }, "log_dir": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Folder containing all the logs. Default a 'log' folder in base_dir", "title": "Log Dir" }, "daemon_dir": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Folder containing daemon related files. Default to a 'daemon' folder in base_dir", "title": "Daemon Dir" }, "log_level": { "$ref": "#/$defs/LogLevel", "default": "info", "description": "The level set for logging" }, "runner": { "$ref": "#/$defs/RunnerOptions", "description": "The options for the Runner" }, "workers": { "additionalProperties": { "discriminator": { "mapping": { "local": "#/$defs/LocalWorker", "remote": "#/$defs/RemoteWorker" }, "propertyName": "type" }, "oneOf": [ { "$ref": "#/$defs/LocalWorker" }, { "$ref": "#/$defs/RemoteWorker" } ] }, "description": "A dictionary with the worker name as keys and the worker configuration as values", "title": "Workers", "type": "object" }, "queue": { "$ref": "#/$defs/QueueConfig", "description": "The configuration of the Store used to store the states of the Jobs and the Flows" }, "exec_config": { "additionalProperties": { "$ref": "#/$defs/ExecutionConfig" }, "description": "A dictionary with the ExecutionConfig name as keys and the ExecutionConfig configuration as values", "title": "Exec Config", "type": "object" }, "jobstore": { "description": "The JobStore used for the output. Can contain the monty serialized dictionary or the Store in the Jobflow format", "title": "Jobstore", "type": "object" }, "remote_jobstore": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "The JobStore used for the data transfer between the Runnerand the workers. Can be a string with the standard values", "title": "Remote Jobstore" }, "metadata": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary with metadata associated to the project", "title": "Metadata" } }, "$defs": { "BatchConfig": { "additionalProperties": false, "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).", "properties": { "jobs_handle_dir": { "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed", "format": "path", "title": "Jobs Handle Dir", "type": "string" }, "work_dir": { "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.", "format": "path", "title": "Work Dir", "type": "string" }, "max_jobs_per_batch": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum number of jobs executed in a single batch process", "title": "Max Jobs Per Batch" }, "max_wait": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)", "title": "Max Wait" }, "max_time": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime", "title": "Max Time" }, "parallel_jobs": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Number of jobs executed in parallel in the same process", "title": "Parallel Jobs" } }, "required": [ "jobs_handle_dir", "work_dir" ], "title": "BatchConfig", "type": "object" }, "ConnectionData": { "description": "The representation of a fabric connection.\nMainly used in case of nested gateways.", "properties": { "host": { "description": "The host to which to connect", "title": "Host", "type": "string" }, "user": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login username", "title": "User" }, "port": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Port number", "title": "Port" }, "password": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login password", "title": "Password" }, "key_filename": { "anyOf": [ { "type": "string" }, { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication", "title": "Key Filename" }, "passphrase": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Passphrase used for decrypting private keys", "title": "Passphrase" }, "gateway": { "anyOf": [ { "type": "string" }, { "$ref": "#/$defs/ConnectionData" }, { "type": "null" } ], "default": null, "description": "A shell command string to use as a proxy or gateway", "title": "Gateway" }, "connect_kwargs": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect", "title": "Connect Kwargs" } }, "required": [ "host" ], "title": "ConnectionData", "type": "object" }, "ExecutionConfig": { "additionalProperties": false, "description": "Configuration to be set before and after the execution of a Job.", "properties": { "modules": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "list of modules to be loaded", "title": "Modules" }, "export": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "dictionary with variable to be exported", "title": "Export" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Other commands to be executed before the execution of a job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Commands to be executed after the execution of a job", "title": "Post Run" } }, "title": "ExecutionConfig", "type": "object" }, "LocalWorker": { "additionalProperties": false, "description": "Worker representing the local host.\n\nExecutes command directly.", "properties": { "type": { "const": "local", "default": "local", "description": "The discriminator field to determine the worker type", "title": "Type", "type": "string" }, "scheduler_type": { "description": "Type of the scheduler. Depending on the values supported by QToolKit", "title": "Scheduler Type", "type": "string" }, "work_dir": { "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created", "format": "path", "title": "Work Dir", "type": "string" }, "resources": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template", "title": "Resources" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed before the execution of the Job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed after the execution of the Job", "title": "Post Run" }, "execution_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands to execute the Job on the worker. By default will be set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).", "title": "Execution Cmd" }, "timeout_execute": { "default": 60, "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)", "title": "Timeout Execute", "type": "integer" }, "max_jobs": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "The maximum number of jobs that can be submitted to the queue.", "title": "Max Jobs" }, "batch": { "anyOf": [ { "$ref": "#/$defs/BatchConfig" }, { "type": "null" } ], "default": null, "description": "Options for batch execution. If define the worker will be considered a batch worker" }, "scheduler_username": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)", "title": "Scheduler Username" }, "sanitize_command": { "default": false, "description": "Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.", "title": "Sanitize Command", "type": "boolean" }, "delay_download": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).", "title": "Delay Download" } }, "required": [ "scheduler_type", "work_dir" ], "title": "LocalWorker", "type": "object" }, "LogLevel": { "description": "Enumeration of logging level.", "enum": [ "error", "warn", "info", "debug" ], "title": "LogLevel", "type": "string" }, "QueueConfig": { "additionalProperties": false, "properties": { "store": { "description": "Dictionary describing a maggma Store used for the queue data. Can contain the monty serialized dictionary or a dictionary with a 'type' specifying the Store subclass. Should be subclass of a MongoStore, as it requires to perform MongoDB actions. The collection is used to store the jobs", "title": "Store", "type": "object" }, "flows_collection": { "default": "flows", "description": "The name of the collection containing information about the flows. Taken from the same database as the one defined in the store", "title": "Flows Collection", "type": "string" }, "auxiliary_collection": { "default": "jf_auxiliary", "description": "The name of the collection containing auxiliary information. Taken from the same database as the one defined in the store", "title": "Auxiliary Collection", "type": "string" }, "db_id_prefix": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "a string defining the prefix added to the integer ID associated to each Job in the database", "title": "Db Id Prefix" } }, "title": "QueueConfig", "type": "object" }, "RemoteWorker": { "additionalProperties": false, "description": "Worker representing a remote host reached through an SSH connection.\n\nUses a Fabric Connection. Check Fabric documentation for more details on the\noptions defining a Connection.", "properties": { "type": { "const": "remote", "default": "remote", "description": "The discriminator field to determine the worker type", "title": "Type", "type": "string" }, "scheduler_type": { "description": "Type of the scheduler. Depending on the values supported by QToolKit", "title": "Scheduler Type", "type": "string" }, "work_dir": { "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created", "format": "path", "title": "Work Dir", "type": "string" }, "resources": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template", "title": "Resources" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed before the execution of the Job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed after the execution of the Job", "title": "Post Run" }, "execution_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands to execute the Job on the worker. By default will be set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).", "title": "Execution Cmd" }, "timeout_execute": { "default": 60, "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)", "title": "Timeout Execute", "type": "integer" }, "max_jobs": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "The maximum number of jobs that can be submitted to the queue.", "title": "Max Jobs" }, "batch": { "anyOf": [ { "$ref": "#/$defs/BatchConfig" }, { "type": "null" } ], "default": null, "description": "Options for batch execution. If define the worker will be considered a batch worker" }, "scheduler_username": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)", "title": "Scheduler Username" }, "sanitize_command": { "default": false, "description": "Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.", "title": "Sanitize Command", "type": "boolean" }, "delay_download": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).", "title": "Delay Download" }, "host": { "description": "The host to which to connect", "title": "Host", "type": "string" }, "user": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login username", "title": "User" }, "port": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Port number", "title": "Port" }, "password": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login password", "title": "Password" }, "key_filename": { "anyOf": [ { "type": "string" }, { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication", "title": "Key Filename" }, "passphrase": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Passphrase used for decrypting private keys", "title": "Passphrase" }, "gateway": { "anyOf": [ { "type": "string" }, { "$ref": "#/$defs/ConnectionData" }, { "type": "null" } ], "default": null, "description": "A shell command string to use as a proxy or gateway", "title": "Gateway" }, "forward_agent": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "description": "Whether to enable SSH agent forwarding", "title": "Forward Agent" }, "connect_timeout": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Connection timeout, in seconds", "title": "Connect Timeout" }, "connect_kwargs": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect", "title": "Connect Kwargs" }, "inline_ssh_env": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "description": "Whether to send environment variables 'inline' as prefixes in front of command strings", "title": "Inline Ssh Env" }, "keepalive": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Keepalive value in seconds passed to paramiko's transport", "title": "Keepalive" }, "shell_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": "bash", "description": "The shell command used to execute the command remotely. If None the command is executed directly", "title": "Shell Cmd" }, "login_shell": { "default": true, "description": "Whether to use a login shell when executing the command", "title": "Login Shell", "type": "boolean" }, "interactive_login": { "default": false, "description": "Whether the authentication to the host should be interactive", "title": "Interactive Login", "type": "boolean" } }, "required": [ "scheduler_type", "work_dir", "host" ], "title": "RemoteWorker", "type": "object" }, "RunnerOptions": { "additionalProperties": false, "description": "Options to tune the execution of the Runner.", "properties": { "delay_checkout": { "default": 30, "description": "Delay between subsequent execution of the checkout from database (seconds)", "title": "Delay Checkout", "type": "integer" }, "delay_check_run_status": { "default": 30, "description": "Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)", "title": "Delay Check Run Status", "type": "integer" }, "delay_advance_status": { "default": 30, "description": "Delay between subsequent advancement of the job's remote state (seconds)", "title": "Delay Advance Status", "type": "integer" }, "delay_refresh_limited": { "default": 600, "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present", "title": "Delay Refresh Limited", "type": "integer" }, "delay_update_batch": { "default": 60, "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present", "title": "Delay Update Batch", "type": "integer" }, "delay_ping_db": { "default": 7200, "description": "Delay between subsequent pings to the running runner document.", "title": "Delay Ping Db", "type": "integer" }, "lock_timeout": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 86400, "description": "Time to consider the lock on a document expired and can be overridden (seconds)", "title": "Lock Timeout" }, "delete_tmp_folder": { "default": true, "description": "Whether to delete the local temporary folder after a job has completed", "title": "Delete Tmp Folder", "type": "boolean" }, "max_step_attempts": { "default": 3, "description": "Maximum number of attempt performed before failing an advancement of a remote state", "title": "Max Step Attempts", "type": "integer" }, "delta_retry": { "default": [ 30, 300, 1200 ], "description": "List of increasing delay between subsequent attempts when the advancement of a remote step fails", "items": { "type": "integer" }, "title": "Delta Retry", "type": "array" } }, "title": "RunnerOptions", "type": "object" } }, "additionalProperties": false, "required": [ "name", "queue" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field base_dir: str | None = None#
The base directory containing the project related files. Default is a folder with the project name inside the projects folder
- Validated by:
- field daemon_dir: str | None = None#
Folder containing daemon related files. Default to a ‘daemon’ folder in base_dir
- Validated by:
- field exec_config: dict[str, ExecutionConfig] [Optional]#
A dictionary with the ExecutionConfig name as keys and the ExecutionConfig configuration as values
- field jobstore: dict [Optional]#
The JobStore used for the output. Can contain the monty serialized dictionary or the Store in the Jobflow format
- Validated by:
- field log_dir: str | None = None#
Folder containing all the logs. Default a ‘log’ folder in base_dir
- Validated by:
- field queue: QueueConfig [Required]#
The configuration of the Store used to store the states of the Jobs and the Flows
- field remote_jobstore: dict | None = None#
The JobStore used for the data transfer between the Runnerand the workers. Can be a string with the standard values
- field runner: RunnerOptions [Optional]#
The options for the Runner
- field tmp_dir: str | None = None#
Folder where remote files are copied. Default a ‘tmp’ folder in base_dir
- Validated by:
- field workers: dict[str, Annotated[LocalWorker | RemoteWorker, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] [Optional]#
A dictionary with the worker name as keys and the worker configuration as values
- validator check_base_dir » base_dir[source]#
Validator to set the default of base_dir based on the project name.
- validator check_daemon_dir » daemon_dir[source]#
Validator to set the default of daemon_dir based on the base_dir.
- validator check_jobstore » jobstore[source]#
Check that the jobstore configuration could be converted to a JobStore.
- validator check_log_dir » log_dir[source]#
Validator to set the default of log_dir based on the base_dir.
- validator check_tmp_dir » tmp_dir[source]#
Validator to set the default of tmp_dir based on the base_dir.
- get_jobstore() JobStore | None [source]#
Generate an instance of the JobStore based on the configuration.
- Return type:
A JobStore
- exception jobflow_remote.config.base.ProjectUndefinedError[source]#
Exception raised if the Project has not been defined or could not be determined.
- pydantic model jobflow_remote.config.base.QueueConfig[source]#
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "QueueConfig", "type": "object", "properties": { "store": { "description": "Dictionary describing a maggma Store used for the queue data. Can contain the monty serialized dictionary or a dictionary with a 'type' specifying the Store subclass. Should be subclass of a MongoStore, as it requires to perform MongoDB actions. The collection is used to store the jobs", "title": "Store", "type": "object" }, "flows_collection": { "default": "flows", "description": "The name of the collection containing information about the flows. Taken from the same database as the one defined in the store", "title": "Flows Collection", "type": "string" }, "auxiliary_collection": { "default": "jf_auxiliary", "description": "The name of the collection containing auxiliary information. Taken from the same database as the one defined in the store", "title": "Auxiliary Collection", "type": "string" }, "db_id_prefix": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "a string defining the prefix added to the integer ID associated to each Job in the database", "title": "Db Id Prefix" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field auxiliary_collection: str = 'jf_auxiliary'#
The name of the collection containing auxiliary information. Taken from the same database as the one defined in the store
- field db_id_prefix: str | None = None#
a string defining the prefix added to the integer ID associated to each Job in the database
- field flows_collection: str = 'flows'#
The name of the collection containing information about the flows. Taken from the same database as the one defined in the store
- field store: dict [Optional]#
Dictionary describing a maggma Store used for the queue data. Can contain the monty serialized dictionary or a dictionary with a ‘type’ specifying the Store subclass. Should be subclass of a MongoStore, as it requires to perform MongoDB actions. The collection is used to store the jobs
- Validated by:
- pydantic model jobflow_remote.config.base.RemoteWorker[source]#
Worker representing a remote host reached through an SSH connection.
Uses a Fabric Connection. Check Fabric documentation for more details on the options defining a Connection.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "RemoteWorker", "description": "Worker representing a remote host reached through an SSH connection.\n\nUses a Fabric Connection. Check Fabric documentation for more details on the\noptions defining a Connection.", "type": "object", "properties": { "type": { "const": "remote", "default": "remote", "description": "The discriminator field to determine the worker type", "title": "Type", "type": "string" }, "scheduler_type": { "description": "Type of the scheduler. Depending on the values supported by QToolKit", "title": "Scheduler Type", "type": "string" }, "work_dir": { "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created", "format": "path", "title": "Work Dir", "type": "string" }, "resources": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template", "title": "Resources" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed before the execution of the Job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed after the execution of the Job", "title": "Post Run" }, "execution_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands to execute the Job on the worker. By default will be set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).", "title": "Execution Cmd" }, "timeout_execute": { "default": 60, "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)", "title": "Timeout Execute", "type": "integer" }, "max_jobs": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "The maximum number of jobs that can be submitted to the queue.", "title": "Max Jobs" }, "batch": { "anyOf": [ { "$ref": "#/$defs/BatchConfig" }, { "type": "null" } ], "default": null, "description": "Options for batch execution. If define the worker will be considered a batch worker" }, "scheduler_username": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)", "title": "Scheduler Username" }, "sanitize_command": { "default": false, "description": "Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.", "title": "Sanitize Command", "type": "boolean" }, "delay_download": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).", "title": "Delay Download" }, "host": { "description": "The host to which to connect", "title": "Host", "type": "string" }, "user": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login username", "title": "User" }, "port": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Port number", "title": "Port" }, "password": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login password", "title": "Password" }, "key_filename": { "anyOf": [ { "type": "string" }, { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication", "title": "Key Filename" }, "passphrase": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Passphrase used for decrypting private keys", "title": "Passphrase" }, "gateway": { "anyOf": [ { "type": "string" }, { "$ref": "#/$defs/ConnectionData" }, { "type": "null" } ], "default": null, "description": "A shell command string to use as a proxy or gateway", "title": "Gateway" }, "forward_agent": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "description": "Whether to enable SSH agent forwarding", "title": "Forward Agent" }, "connect_timeout": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Connection timeout, in seconds", "title": "Connect Timeout" }, "connect_kwargs": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect", "title": "Connect Kwargs" }, "inline_ssh_env": { "anyOf": [ { "type": "boolean" }, { "type": "null" } ], "default": null, "description": "Whether to send environment variables 'inline' as prefixes in front of command strings", "title": "Inline Ssh Env" }, "keepalive": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Keepalive value in seconds passed to paramiko's transport", "title": "Keepalive" }, "shell_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": "bash", "description": "The shell command used to execute the command remotely. If None the command is executed directly", "title": "Shell Cmd" }, "login_shell": { "default": true, "description": "Whether to use a login shell when executing the command", "title": "Login Shell", "type": "boolean" }, "interactive_login": { "default": false, "description": "Whether the authentication to the host should be interactive", "title": "Interactive Login", "type": "boolean" } }, "$defs": { "BatchConfig": { "additionalProperties": false, "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).", "properties": { "jobs_handle_dir": { "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed", "format": "path", "title": "Jobs Handle Dir", "type": "string" }, "work_dir": { "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.", "format": "path", "title": "Work Dir", "type": "string" }, "max_jobs_per_batch": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum number of jobs executed in a single batch process", "title": "Max Jobs Per Batch" }, "max_wait": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)", "title": "Max Wait" }, "max_time": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime", "title": "Max Time" }, "parallel_jobs": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Number of jobs executed in parallel in the same process", "title": "Parallel Jobs" } }, "required": [ "jobs_handle_dir", "work_dir" ], "title": "BatchConfig", "type": "object" }, "ConnectionData": { "description": "The representation of a fabric connection.\nMainly used in case of nested gateways.", "properties": { "host": { "description": "The host to which to connect", "title": "Host", "type": "string" }, "user": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login username", "title": "User" }, "port": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Port number", "title": "Port" }, "password": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Login password", "title": "Password" }, "key_filename": { "anyOf": [ { "type": "string" }, { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "description": "The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication", "title": "Key Filename" }, "passphrase": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Passphrase used for decrypting private keys", "title": "Passphrase" }, "gateway": { "anyOf": [ { "type": "string" }, { "$ref": "#/$defs/ConnectionData" }, { "type": "null" } ], "default": null, "description": "A shell command string to use as a proxy or gateway", "title": "Gateway" }, "connect_kwargs": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "Other keyword arguments passed to paramiko.client.SSHClient.connect", "title": "Connect Kwargs" } }, "required": [ "host" ], "title": "ConnectionData", "type": "object" } }, "additionalProperties": false, "required": [ "scheduler_type", "work_dir", "host" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field batch: BatchConfig | None = None#
Options for batch execution. If define the worker will be considered a batch worker
- field connect_kwargs: dict | None = None#
Other keyword arguments passed to paramiko.client.SSHClient.connect
- field delay_download: int | None = None#
Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).
- field execution_cmd: str | None = None#
String with commands to execute the Job on the worker. By default will be set to jf -fe execution run {}. The {} part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).
- Validated by:
- field gateway: str | ConnectionData | None = None#
A shell command string to use as a proxy or gateway
- field inline_ssh_env: bool | None = None#
Whether to send environment variables ‘inline’ as prefixes in front of command strings
- field key_filename: str | list[str] | None = None#
The filename, or list of filenames, of optional private key(s) and/or certs to try for authentication
- field max_jobs: int | None = None#
The maximum number of jobs that can be submitted to the queue.
- Constraints:
ge = 0
- field post_run: str | None = None#
String with commands that will be executed after the execution of the Job
- field pre_run: str | None = None#
String with commands that will be executed before the execution of the Job
- field resources: dict | None = None#
A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template
- field sanitize_command: bool = False#
Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.
- field scheduler_type: str [Required]#
Type of the scheduler. Depending on the values supported by QToolKit
- Validated by:
- field scheduler_username: str | None = None#
If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)
- field shell_cmd: str | None = 'bash'#
The shell command used to execute the command remotely. If None the command is executed directly
- field timeout_execute: int = 60#
Timeout for the execution of the commands in the worker (e.g. submitting a job)
- field work_dir: Path [Required]#
Absolute path of the directory of the worker where subfolders for executing the calculation will be created
- Validated by:
- pydantic model jobflow_remote.config.base.RunnerOptions[source]#
Options to tune the execution of the Runner.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "RunnerOptions", "description": "Options to tune the execution of the Runner.", "type": "object", "properties": { "delay_checkout": { "default": 30, "description": "Delay between subsequent execution of the checkout from database (seconds)", "title": "Delay Checkout", "type": "integer" }, "delay_check_run_status": { "default": 30, "description": "Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)", "title": "Delay Check Run Status", "type": "integer" }, "delay_advance_status": { "default": 30, "description": "Delay between subsequent advancement of the job's remote state (seconds)", "title": "Delay Advance Status", "type": "integer" }, "delay_refresh_limited": { "default": 600, "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present", "title": "Delay Refresh Limited", "type": "integer" }, "delay_update_batch": { "default": 60, "description": "Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present", "title": "Delay Update Batch", "type": "integer" }, "delay_ping_db": { "default": 7200, "description": "Delay between subsequent pings to the running runner document.", "title": "Delay Ping Db", "type": "integer" }, "lock_timeout": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 86400, "description": "Time to consider the lock on a document expired and can be overridden (seconds)", "title": "Lock Timeout" }, "delete_tmp_folder": { "default": true, "description": "Whether to delete the local temporary folder after a job has completed", "title": "Delete Tmp Folder", "type": "boolean" }, "max_step_attempts": { "default": 3, "description": "Maximum number of attempt performed before failing an advancement of a remote state", "title": "Max Step Attempts", "type": "integer" }, "delta_retry": { "default": [ 30, 300, 1200 ], "description": "List of increasing delay between subsequent attempts when the advancement of a remote step fails", "items": { "type": "integer" }, "title": "Delta Retry", "type": "array" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- field delay_advance_status: int = 30#
Delay between subsequent advancement of the job’s remote state (seconds)
- field delay_check_run_status: int = 30#
Delay between subsequent execution of the checking the status of jobs that are submitted to the scheduler (seconds)
- field delay_checkout: int = 30#
Delay between subsequent execution of the checkout from database (seconds)
- field delay_refresh_limited: int = 600#
Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a worker with max_jobs is present
- field delay_update_batch: int = 60#
Delay between subsequent refresh from the DB of the number of submitted and running jobs (seconds). Only used if a batch worker is present
- field delete_tmp_folder: bool = True#
Whether to delete the local temporary folder after a job has completed
- field delta_retry: tuple[int, ...] = (30, 300, 1200)#
List of increasing delay between subsequent attempts when the advancement of a remote step fails
- field lock_timeout: int | None = 86400#
Time to consider the lock on a document expired and can be overridden (seconds)
- field max_step_attempts: int = 3#
Maximum number of attempt performed before failing an advancement of a remote state
- get_delta_retry(step_attempts: int) int [source]#
The time to wait before retrying a failed advancement of the remote state, based on the number of attempts.
If exceeding the size of the list delta_retry, the last value is returned.
- Parameters:
step_attempts – The number of attempts advancing a remote state.
- Return type:
The delay in seconds.
- pydantic model jobflow_remote.config.base.WorkerBase[source]#
Base class defining the common field for the different types of Worker.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
Show JSON schema
{ "title": "WorkerBase", "description": "Base class defining the common field for the different types of Worker.", "type": "object", "properties": { "type": { "description": "The discriminator field to determine the worker type", "title": "Type", "type": "string" }, "scheduler_type": { "description": "Type of the scheduler. Depending on the values supported by QToolKit", "title": "Scheduler Type", "type": "string" }, "work_dir": { "description": "Absolute path of the directory of the worker where subfolders for executing the calculation will be created", "format": "path", "title": "Work Dir", "type": "string" }, "resources": { "anyOf": [ { "type": "object" }, { "type": "null" } ], "default": null, "description": "A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template", "title": "Resources" }, "pre_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed before the execution of the Job", "title": "Pre Run" }, "post_run": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands that will be executed after the execution of the Job", "title": "Post Run" }, "execution_cmd": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "String with commands to execute the Job on the worker. By default will be set to `jf -fe execution run {}`. The `{}` part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).", "title": "Execution Cmd" }, "timeout_execute": { "default": 60, "description": "Timeout for the execution of the commands in the worker (e.g. submitting a job)", "title": "Timeout Execute", "type": "integer" }, "max_jobs": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "The maximum number of jobs that can be submitted to the queue.", "title": "Max Jobs" }, "batch": { "anyOf": [ { "$ref": "#/$defs/BatchConfig" }, { "type": "null" } ], "default": null, "description": "Options for batch execution. If define the worker will be considered a batch worker" }, "scheduler_username": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)", "title": "Scheduler Username" }, "sanitize_command": { "default": false, "description": "Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.", "title": "Sanitize Command", "type": "boolean" }, "delay_download": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).", "title": "Delay Download" } }, "$defs": { "BatchConfig": { "additionalProperties": false, "description": "Configuration for execution of batch jobs.\n\nAllows to execute multiple Jobs in a single process executed on the\nworker (e.g. SLURM job).", "properties": { "jobs_handle_dir": { "description": "Absolute path to a folder that will be used to store information to share with the jobs being executed", "format": "path", "title": "Jobs Handle Dir", "type": "string" }, "work_dir": { "description": "Absolute path to a folder where the batch jobs will be executed. This refers to the jobs submittedto the queue. Jobflow's Job will still be executed in the standard folders.", "format": "path", "title": "Work Dir", "type": "string" }, "max_jobs_per_batch": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum number of jobs executed in a single batch process", "title": "Max Jobs Per Batch" }, "max_wait": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": 60, "description": "Maximum time to wait before stopping if no new jobs are available to run (seconds)", "title": "Max Wait" }, "max_time": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum time after which a job will not start more jobs (seconds). To help avoid hitting the walltime", "title": "Max Time" }, "parallel_jobs": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "description": "Number of jobs executed in parallel in the same process", "title": "Parallel Jobs" } }, "required": [ "jobs_handle_dir", "work_dir" ], "title": "BatchConfig", "type": "object" } }, "additionalProperties": false, "required": [ "type", "scheduler_type", "work_dir" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field batch: BatchConfig | None = None#
Options for batch execution. If define the worker will be considered a batch worker
- field delay_download: int | None = None#
Amount of seconds to wait to start the download after the Runner marked a Job as TERMINATED. To account for delays in the writing of the file on the worker file system (e.g. NFS).
- field execution_cmd: str | None = None#
String with commands to execute the Job on the worker. By default will be set to jf -fe execution run {}. The {} part will be used to insert the path to the execution directory and it is mandatory. Change only for specific needs (e.g. thejf command needs to be executed in a container).
- Validated by:
- field max_jobs: int | None = None#
The maximum number of jobs that can be submitted to the queue.
- Constraints:
ge = 0
- field post_run: str | None = None#
String with commands that will be executed after the execution of the Job
- field pre_run: str | None = None#
String with commands that will be executed before the execution of the Job
- field resources: dict | None = None#
A dictionary defining the default resources requested to the scheduler. Used to fill in the QToolKit template
- field sanitize_command: bool = False#
Sanitize the output of commands in case of failures due to spurious text producedby the worker shell.
- field scheduler_type: str [Required]#
Type of the scheduler. Depending on the values supported by QToolKit
- Validated by:
- field scheduler_username: str | None = None#
If defined, the list of jobs running on the worker will be fetched based on theusername instead that from the list of job ids. May be necessary for some scheduler_type (e.g. SGE)
- field timeout_execute: int = 60#
Timeout for the execution of the commands in the worker (e.g. submitting a job)
- field work_dir: Path [Required]#
Absolute path of the directory of the worker where subfolders for executing the calculation will be created
- Validated by:
- validator check_execution_cmd » execution_cmd[source]#
- validator check_scheduler_type » scheduler_type[source]#
Validator to set the default of scheduler_type.
- get_scheduler_io() BaseSchedulerIO [source]#
Get the BaseSchedulerIO from QToolKit depending on scheduler_type.
- Return type:
The instance of the scheduler_type.