jobflow_remote.jobs.runner module#

The Runner orchestrating the Jobs execution.

class jobflow_remote.jobs.runner.Runner(project_name: str | None = None, log_level: LogLevel | None = None, runner_id: str | None = None, connect_interactive: bool = False)[source]#

Bases: object

Object orchestrating the execution of all the Jobs.

Advances the status of the Jobs, handles the communication with the workers and updates the queue and output databases.

The main entry point is the run method. It is supposed to be executed by a daemon, but can also be run directly for testing purposes. It allows to run all the steps required to advance the Job’s states or even a subset of them, to parallelize the different tasks.

The runner instantiates a pool of workers and hosts given in the project definition. A single connection will be opened if multiple workers share the same host.

The Runner schedules the execution of the specific tasks at regular intervals and relies on objects like QueueManager, BaseHost and JobController to interact with workers and databases.

Parameters:
  • project_name – Name of the project. Used to retrieve all the configurations required to execute the runner.

  • log_level – Logging level of the Runner.

  • runner_id – A unique identifier for the Runner process. Used to identify the runner process in logging and in the DB locks. If None a uuid will be generated.

  • connect_interactive – If True during initialization will open connections to the hosts marked as interactive.

advance_state(states: list[str], filter: dict | None = None) None[source]#

Acquire the lock and advance the state of a single job.

Parameters:

states – The state of the Jobs that can be queried.

check_run_status(filter: dict | None = None) None[source]#

Check the status of all the jobs submitted to a queue.

If Jobs started update their state from SUBMITTED to RUNNING. If Jobs terminated set their state to TERMINATED if running on a remote host. If on a local host set them directly to DOWNLOADED.

checkout() None[source]#

Checkout READY Jobs.

cleanup() None[source]#

Close all the connections after stopping the Runner.

complete_job(lock) None[source]#

Complete a locked Job in the DOWNLOADED state. If successful set the state to COMPLETED, otherwise to FAILED.

Parameters:

lock – The MongoLock with the locked Job document.

download(lock) None[source]#

Download the final files for a locked Job in the TERMINATED state. If successful set the state to DOWNLOADED.

Parameters:

lock – The MongoLock with the locked Job document.

get_host(worker_name: str) BaseHost[source]#

Get the host associated to a worker from the pool of hosts instantiated by the Runner.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the Host associated to the worker.

get_queue_manager(worker_name: str) QueueManager[source]#

Get an instance of the queue manager associated to a worker, based on its host.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the QueueManager associated to the worker.

get_worker(worker_name: str) WorkerBase[source]#

Get the worker from the pool of workers instantiated by the Runner.

Parameters:

worker_name – The name of the worker.

Return type:

An instance of the corresponding worker.

handle_signal(signum, frame) None[source]#

Handle the SIGTERM signal in the Runner. Sets a variable that will stop the Runner loop.

refresh_num_current_jobs() None[source]#

Update the number of jobs currently running for worker with limited number of Jobs.

run(transfer: bool = True, complete: bool = True, queue: bool = True, checkout: bool = True, ticks: int | None = None) None[source]#

Start the runner.

Which actions are being performed can be tuned by the arguments.

Parameters:
  • transfer – If True actions related to file transfer are performed by the runner.

  • complete – If True Job completion is performed by the runner.

  • queue – If True interactions with the queue manager are handled by the Runner.

  • checkout – If True the checkout of Jobs is performed by the Runner.

  • ticks – If provided, the Runner will run for this number of ticks before exiting.

run_all_jobs(max_seconds: int | None = None) None[source]#

Use the runner to run all the jobs in the DB. Mainly used for testing.

run_one_job(db_id: str | None = None, job_id: tuple[str, int] | None = None, max_seconds: int | None = None, raise_at_timeout: bool = True) bool[source]#

Use the runner to run a single Job until it reaches a terminal state. The job should be in the READY state and there should be no Mainly used for testing.

property runner_options: RunnerOptions#

The Runner options defined in the project.

submit(lock: MongoLock) None[source]#

Submit to the queue for a locked Job in the UPLOADED state. If successful set the state to SUBMITTED.

Parameters:

lock – The MongoLock with the locked Job document.

update_batch_jobs() None[source]#

Update the status of batch jobs.

Includes submitting to the remote queue, checking the status of running jobs in the queue and handle the files with the Jobs information about their status.

upload(lock: MongoLock) None[source]#

Upload files for a locked Job in the CHECKED_OUT state. If successful set the state to UPLOADED.

Parameters:

lock – The MongoLock with the locked Job document.