jobflow_remote.jobs.runner module#

The Runner orchestrating the Jobs execution.

class jobflow_remote.jobs.runner.Runner(project_name: str | None = None, log_level: LogLevel | None = None, runner_id: str | None = None, connect_interactive: bool = False, daemon_id: str | None = None)[source]#

Bases: object

Object orchestrating the execution of all the Jobs.

Advances the status of the Jobs, handles the communication with the workers and updates the queue and output databases.

The main entry point is the run method. It is supposed to be executed by a daemon, but can also be run directly for testing purposes. It allows to run all the steps required to advance the Job’s states or even a subset of them, to parallelize the different tasks.

The runner instantiates a pool of workers and hosts given in the project definition. A single connection will be opened if multiple workers share the same host.

The Runner schedules the execution of the specific tasks at regular intervals and relies on objects like QueueManager, BaseHost and JobController to interact with workers and databases.

Parameters:
  • project_name – Name of the project. Used to retrieve all the configurations required to execute the runner.

  • log_level – Logging level of the Runner.

  • runner_id – A unique identifier for the Runner process. Used to identify the runner process in logging and in the DB locks. If None a uuid will be generated.

  • connect_interactive – If True during initialization will open connections to the hosts marked as interactive.

  • daemon_id – A unique identifier for the daemon process that manages this Runner process.

advance_state(states: list[str], filter: dict | None = None) None[source]#

Acquire the lock and advance the state of a single job.

Parameters:

states – The state of the Jobs that can be queried.

batch_get_process_id(batch_processes: list, batch_uid: str, worker_name: str, worker: WorkerBase) str | None[source]#
batch_update_run_finished_jobs(batch_manager, worker_name, worker)[source]#
batch_update_running_jobs(batch_manager: RemoteBatchManager, worker_name: str, worker) list[tuple[str, int, str]][source]#

Update the status of running jobs.

Parameters:
  • batch_manager – Manager of remote files

  • worker_name – Name of the batch worker

  • worker – Batch worker

Returns:

The list of job ids, job indexes and batch unique ids in the host running directory.

Return type:

list

batch_update_status(batch_manager: RemoteBatchManager, queue_manager: QueueManager, worker_name: str, worker: WorkerBase, running_jobs: list[tuple[str, int, str]]) list[str] | None[source]#

Update the status of the batch processes.

This method - removes the stopped (completed, cancelled or killed) batch processes from the database - sets the state of the jobs that were running in these stopped batch processes upon batch process

termination to a remote error state

  • deletes the corresponding “running” files in the batch handle directory

  • sets an empty file for the stopped batch processes with

Parameters:
  • batch_manager

  • queue_manager

  • worker_name

  • worker

  • running_jobs

Returns:

List of running batch process ids (e.g. Slurm ids) or None if the list of jobs could not be retrieved from the worker.

Return type:

list or None

check_run_status(filter: dict | None = None) None[source]#

Check the status of all the jobs submitted to a queue.

If Jobs started, update their state from SUBMITTED to RUNNING. If Jobs finished to run, set their state to RUN_FINISHED if running on a remote host. If on a local host, set them directly to DOWNLOADED.

checkout() None[source]#

Checkout READY Jobs.

cleanup() None[source]#

Close all the connections after stopping the Runner.

complete_job(lock) None[source]#

Complete a locked Job in the DOWNLOADED state. If successful set the state to COMPLETED, otherwise to FAILED.

Parameters:

lock – The MongoLock with the locked Job document.

download(lock) None[source]#

Download the final files for a locked Job in the RUN_FINISHED state. If successful set the state to DOWNLOADED.

Parameters:

lock – The MongoLock with the locked Job document.

get_host(worker_name: str) BaseHost[source]#

Get the host associated to a worker from the pool of hosts instantiated by the Runner.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the Host associated to the worker.

get_jobstore(flow_id: str | None) JobStore[source]#

Get the JobStore associated to a Flow.

Uses a small internal cache to reduce the calls to the DB, assuming that not too many Flows will be updated at the same time.

Parameters:

flow_id – The uuid of the Flow.

Returns:

The JobStore associated to a Flow.

Return type:

JobStore

get_queue_manager(worker_name: str) QueueManager[source]#

Get an instance of the queue manager associated to a worker, based on its host.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the QueueManager associated to the worker.

get_worker(worker_name: str) WorkerBase[source]#

Get the worker from the pool of workers instantiated by the Runner.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the corresponding worker.

handle_signal(signum, frame) None[source]#

Handle the SIGTERM signal in the Runner. Sets a variable that will stop the Runner loop.

ping_running_runner(run_options: dict | None = None)[source]#
refresh_num_current_jobs() None[source]#

Update the number of jobs currently running for worker with limited number of Jobs.

run(transfer: bool = True, complete: bool = True, queue: bool = True, checkout: bool = True, ticks: int | None = None) None[source]#

Start the runner.

Which actions are being performed can be tuned by the arguments.

Parameters:
  • transfer – If True actions related to file transfer are performed by the runner.

  • complete – If True Job completion is performed by the runner.

  • queue – If True interactions with the queue manager are handled by the Runner.

  • checkout – If True the checkout of Jobs is performed by the Runner.

  • ticks – If provided, the Runner will run for this number of ticks before exiting.

run_all_jobs(max_seconds: int | None = None, wait_for_batches: bool = False) None[source]#

Use the runner to run all the jobs in the DB. Mainly used for testing.

run_one_job(db_id: str | None = None, job_id: tuple[str, int] | None = None, max_seconds: int | None = None, raise_at_timeout: bool = True, target_state: JobState | None = None) bool[source]#

Use the runner to run a single Job until it reaches a terminal state. The job should be in the READY state and there should be no Mainly used for testing.

property runner_options: RunnerOptions#

The Runner options defined in the project.

submit(lock: MongoLock) None[source]#

Submit to the queue for a locked Job in the UPLOADED state. If successful set the state to SUBMITTED.

Parameters:

lock – The MongoLock with the locked Job document.

submit_batch_processes(queue_manager: QueueManager, worker_name: str, worker: WorkerBase, running_batch_processes: list[str])[source]#
update_batch_jobs(submit: bool = True) None[source]#

Update the status of batch jobs.

Includes submitting to the remote queue, checking the status of running jobs in the queue and handle the files with the Jobs information about their status.

Parameters:

submit – Whether to submit new batch processes.

upload(lock: MongoLock) None[source]#

Upload files for a locked Job in the CHECKED_OUT state. If successful set the state to UPLOADED.

Parameters:

lock – The MongoLock with the locked Job document.