jobflow_remote.jobs.daemon module#
- class jobflow_remote.jobs.daemon.DaemonManager(daemon_dir: str | Path, log_dir: str | Path, project: Project, job_controller: JobController | None = None)[source]#
Bases:
object
A manager for handling the daemonized Runner processes.
Checks are performed to avoid starting a daemon in two different machines. To perform the actions on the daemon process the information about the runner in the DB should match with the one of the current machine.
- Parameters:
daemon_dir – Directory where the supervisord configuration file is written.
log_dir – Directory where the supervisord log file is written.
project – Project configuration.
job_controller – JobController instance, used internally to handle the job queue.
- check_matching_runner(raise_on_error: bool = False, allow_missing: bool = False) str | None [source]#
Checks if the information stored in the runner document in the auxiliary collection matches the current machine.
- Parameters:
raise_on_error – If True and the information does not match, raise an error.
allow_missing – If True, return None instead of raising an error if the document is not present.
- Returns:
If the information matches, return None. Otherwise, return a string with the error message.
- Return type:
str | None
- check_status() DaemonStatus [source]#
Get the current status of the daemon based on the state of the supervisord process and the running processes.
- Returns:
Status of the daemon. Can be one of the following:
- Return type:
- check_supervisord_process() bool [source]#
Check if the supervisord process is running and belongs to the same user.
If the supervisord process is not running but the daemon files are present, clean them up.
- Returns:
True if the supervisord process is running and belongs to the same user, False otherwise.
- Return type:
- conf_template_single = <string.Template object>#
- conf_template_split = <string.Template object>#
- foreground_process(name, print_function: Callable | None = None) None [source]#
Attach to the foreground of one process based on the process name.
- Parameters:
name – Name of the process to attach to.
print_function – A function to use to print the output of the process. If None, the default print function is used.
- foreground_processes(processes_names: list | None = None, print_function: Callable | None = None) None [source]#
Attach to the foreground of the processes of the daemon.
- Parameters:
processes_names – Names of the processes to attach to. If None, all processes are attached to.
print_function – A function to use to print the output of the processes. If None, the default print function is used.
- classmethod from_project(project: Project) DaemonManager [source]#
Generate a DaemonManager instance from a Project configuration.
- Parameters:
project – Project configuration.
- Returns:
An instance of DaemonManager associated with the project.
- Return type:
- classmethod from_project_name(project_name: str | None = None) DaemonManager [source]#
Generate a DaemonManager instance from a project name.
- Parameters:
project_name – Name of the project to use. If None, the default project will be used.
- Returns:
An instance of DaemonManager associated with the project.
- Return type:
- get_controller() Controller [source]#
Return a Supervisor supervisor.supervisorctl.Controller connected to the supervisord configured in the project.
- get_interface()[source]#
Return an interface to the supervisord RPC server using the socket file.
- Returns:
Interface to the supervisord RPC server.
- Return type:
xmlrpc.ServerProxy
- get_processes_info() dict[str, dict] | None [source]#
Get the information about the processes of the daemon.
None if the daemon is not running.
- Returns:
A dictionary with the information about the processes. The keys are the process names and the values are dictionaries with the process information.
- Return type:
- get_supervisord_pid() int | None [source]#
Get the PID of the supervisord process from the PID file.
If the PID file does not exist or can not be parsed, return None.
- Returns:
PID of the supervisord process.
- Return type:
int | None
- property job_controller: JobController#
JobController instance associated with the project.
- lock_runner_doc(allow_missing: bool = False) Generator[MongoLock, None, None] [source]#
Context manager to lock a document in the auxiliary collection that is used to keep track of the currently running runner.
- Parameters:
allow_missing – Determine if it is acceptable that the document is not present in the DB. If not, raise an error. Mainly present to handle databases that have been created with version <=0.1.4.
- parse_config_file() dict [source]#
Parses a supervisord config file and returns the extracted input variables. This includes values for num_procs_transfer, num_procs_complete, single, log_level, nodaemon, and connect_interactive.
- Returns:
Dictionary containing the extracted input variables.
- Return type:
- shut_down(raise_on_error: bool = False) bool [source]#
Shut down the supervisord process and all the processes of the daemon.
- Parameters:
raise_on_error – If True, raise an exception if an error occurs.
- Returns:
True if the daemon is shut down correctly, False otherwise.
- Return type:
- property sock_filepath: Path#
Path to the supervisord socket file.
This path is used by supervisord to communicate with the client. The length of the path is checked to ensure it is not too long for UNIX systems.
- start(num_procs_transfer: int = 1, num_procs_complete: int = 1, single: bool = True, log_level: str = 'info', raise_on_error: bool = False, connect_interactive: bool = False) bool [source]#
Start the daemon by starting the supervisord process and all the processes of the daemon.
- Parameters:
num_procs_transfer – Number of processes to use for the transfer of jobs.
num_procs_complete – Number of processes to use for the completion of jobs.
single – If True, the runner will be started with a single process.
log_level (str) – Log level of the daemon.
raise_on_error (bool) – If True, raise an exception if an error occurs.
connect_interactive (bool) – Allow the daemon to perform an interactive initial setup.
- Returns:
True if the daemon is started correctly, False otherwise.
- Return type:
- start_processes() str | None [source]#
Start all the processes of the daemon when the supervisord processes is running but the daemon processes are stopped.
- Returns:
An error message if not all the processes started correctly, otherwise None.
- Return type:
str | None
- start_supervisord(num_procs_transfer: int = 1, num_procs_complete: int = 1, single: bool = True, log_level: str = 'info', nodaemon: bool = False, connect_interactive: bool = False) str | None [source]#
Start the supervisord daemon and all the processes.
- Parameters:
num_procs_transfer – Number of processes to use for the transfer step.
num_procs_complete – Number of processes to use for the complete step.
single – Write a single configuration file instead of a split one.
log_level – The log level to use for the daemon.
nodaemon (bool, optional) – Run the daemon in foreground.
connect_interactive (bool, optional) – Allow the daemon to perform an interactive initial setup.
- Returns:
An error message if the daemon could not be started, otherwise None.
- Return type:
str | None
- stop(wait: bool = False, raise_on_error: bool = False) bool [source]#
Stop the daemon.
- Parameters:
wait – If True wait until the daemon has stopped.
raise_on_error – Raise an exception if the daemon cannot be stopped.
- Returns:
True if the daemon was stopped successfully, False otherwise.
- Return type:
- wait_start(timeout: int = 30) None [source]#
Wait for all processes of the daemon to start.
- Parameters:
timeout – Maximum time in seconds to wait for all processes to start.
- write_config(num_procs_transfer: int = 1, num_procs_complete: int = 1, single: bool = True, log_level: str = 'info', nodaemon: bool = False, connect_interactive: bool = False) None [source]#
Write the configuration file for the daemon.
If single is True, a configuration to execute all the Runner tasks in a single process is written. Otherwise, a split configuration is written with separate sections for each task of the Runner.
- Parameters:
num_procs_transfer (int) – Number of processes to use for the transfer step.
num_procs_complete (int) – Number of processes to use for the complete step.
single (bool, optional) – Write a single configuration file instead of a split one.
log_level (str, optional) – The log level to use for the daemon.
nodaemon (bool, optional) – Run the daemon in foreground.
connect_interactive (bool, optional) – Allow the daemon to perform an interactive initial setup.
- class jobflow_remote.jobs.daemon.DaemonStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
Possible states of the daemon.
- PARTIALLY_RUNNING = 'PARTIALLY_RUNNING'#
- RUNNING = 'RUNNING'#
- SHUT_DOWN = 'SHUT_DOWN'#
- STARTING = 'STARTING'#
- STOPPED = 'STOPPED'#
- STOPPING = 'STOPPING'#