Source code for jobflow_remote.jobs.runner

"""The Runner orchestrating the Jobs execution."""

from __future__ import annotations

import json
import logging
import shutil
import signal
import time
import traceback
import uuid
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING

from monty.json import MontyDecoder
from monty.os import makedirs_p
from qtoolkit.core.data_objects import QState, SubmissionStatus
from rich.prompt import Confirm

from jobflow_remote import JobController
from jobflow_remote.config.base import (
    ConfigError,
    ExecutionConfig,
    LogLevel,
    Project,
    RunnerOptions,
    WorkerBase,
)
from jobflow_remote.config.manager import ConfigManager
from jobflow_remote.jobs.batch import RemoteBatchManager
from jobflow_remote.jobs.data import IN_FILENAME, OUT_FILENAME, RemoteError
from jobflow_remote.jobs.state import JobState
from jobflow_remote.remote.data import (
    get_job_path,
    get_remote_in_file,
    get_remote_store,
    get_remote_store_filenames,
    resolve_job_dict_args,
)
from jobflow_remote.remote.queue import ERR_FNAME, OUT_FNAME, QueueManager, set_name_out
from jobflow_remote.utils.data import suuid
from jobflow_remote.utils.log import initialize_runner_logger
from jobflow_remote.utils.schedule import SafeScheduler

if TYPE_CHECKING:
    from jobflow_remote.remote.host import BaseHost
    from jobflow_remote.utils.db import MongoLock

logger = logging.getLogger(__name__)


[docs] class Runner: """ Object orchestrating the execution of all the Jobs. Advances the status of the Jobs, handles the communication with the workers and updates the queue and output databases. The main entry point is the `run` method. It is supposed to be executed by a daemon, but can also be run directly for testing purposes. It allows to run all the steps required to advance the Job's states or even a subset of them, to parallelize the different tasks. The runner instantiates a pool of workers and hosts given in the project definition. A single connection will be opened if multiple workers share the same host. The Runner schedules the execution of the specific tasks at regular intervals and relies on objects like QueueManager, BaseHost and JobController to interact with workers and databases. """ def __init__( self, project_name: str | None = None, log_level: LogLevel | None = None, runner_id: str | None = None, connect_interactive: bool = False, ) -> None: """ Parameters ---------- project_name Name of the project. Used to retrieve all the configurations required to execute the runner. log_level Logging level of the Runner. runner_id A unique identifier for the Runner process. Used to identify the runner process in logging and in the DB locks. If None a uuid will be generated. connect_interactive: If True during initialization will open connections to the hosts marked as interactive. """ self.stop_signal = False self.runner_id: str = runner_id or str(uuid.uuid4()) self.config_manager: ConfigManager = ConfigManager() self.project_name = project_name self.project: Project = self.config_manager.get_project(project_name) self.job_controller: JobController = JobController.from_project(self.project) self.workers: dict[str, WorkerBase] = self.project.workers # Build the dictionary of hosts. The reference is the worker name. # If two hosts match, use the same instance self.hosts: dict[str, BaseHost] = {} for wname, w in self.workers.items(): new_host = w.get_host() for host in self.hosts.values(): if new_host == host: self.hosts[wname] = host break else: self.hosts[wname] = new_host self.limited_workers = { name: {"max": w.max_jobs, "current": 0} for name, w in self.workers.items() if w.max_jobs and not w.batch } self.batch_workers = {} for wname, w in self.workers.items(): if w.batch is not None: self.batch_workers[wname] = RemoteBatchManager( self.hosts[wname], w.batch.jobs_handle_dir ) self.queue_managers: dict = {} log_level = log_level if log_level is not None else self.project.log_level initialize_runner_logger( log_folder=self.project.log_dir, level=log_level.to_logging(), runner_id=self.runner_id, ) # TODO it could be better to create a pool of stores that are connected # How to deal with cases where the connection gets closed? # how to deal with file based stores? self.jobstore = self.project.get_jobstore() if connect_interactive: for host_name, host in self.hosts.items(): if ( host.interactive_login and not host.is_connected and Confirm.ask( f"Do you want to open the connection for the host of the {host_name} worker?" ) ): host.connect() @property def runner_options(self) -> RunnerOptions: """The Runner options defined in the project.""" return self.project.runner
[docs] def handle_signal(self, signum, frame) -> None: """ Handle the SIGTERM signal in the Runner. Sets a variable that will stop the Runner loop. """ logger.info(f"Received signal: {signum}") self.stop_signal = True
[docs] def get_worker(self, worker_name: str) -> WorkerBase: """ Get the worker from the pool of workers instantiated by the Runner. Parameters ---------- worker_name The name of the worker. Returns ------- An instance of the corresponding worker. """ if worker_name not in self.workers: raise ConfigError( f"No worker {worker_name} is defined in project {self.project_name}" ) return self.workers[worker_name]
[docs] def get_host(self, worker_name: str) -> BaseHost: """ Get the host associated to a worker from the pool of hosts instantiated by the Runner. Parameters ---------- worker_name The name of the worker. Returns ------- An instance of the Host associated to the worker. """ host = self.hosts[worker_name] if not host.is_connected: host.connect() return host
[docs] def get_queue_manager(self, worker_name: str) -> QueueManager: """ Get an instance of the queue manager associated to a worker, based on its host. Parameters ---------- worker_name The name of the worker. Returns ------- An instance of the QueueManager associated to the worker. """ if worker_name not in self.queue_managers: worker = self.get_worker(worker_name) self.queue_managers[worker_name] = QueueManager( worker.get_scheduler_io(), self.get_host(worker_name) ) return self.queue_managers[worker_name]
[docs] def run( self, transfer: bool = True, complete: bool = True, queue: bool = True, checkout: bool = True, ticks: int | None = None, ) -> None: """ Start the runner. Which actions are being performed can be tuned by the arguments. Parameters ---------- transfer If True actions related to file transfer are performed by the runner. complete If True Job completion is performed by the runner. queue If True interactions with the queue manager are handled by the Runner. checkout If True the checkout of Jobs is performed by the Runner. ticks If provided, the Runner will run for this number of ticks before exiting. """ signal.signal(signal.SIGTERM, self.handle_signal) states = [] if transfer: states.append(JobState.CHECKED_OUT.value) states.append(JobState.TERMINATED.value) if complete: states.append(JobState.DOWNLOADED.value) if queue: states.append(JobState.UPLOADED.value) logger.info( f"Runner run options: transfer: {transfer} complete: {complete} queue: {queue} checkout: {checkout}" ) scheduler = SafeScheduler(seconds_after_failure=120) # run a first call for each case, since schedule will wait for the delay # to make the first execution. if checkout: self.checkout() scheduler.every(self.runner_options.delay_checkout).seconds.do( self.checkout ) if transfer or queue or complete: self.advance_state(states) scheduler.every(self.runner_options.delay_advance_status).seconds.do( self.advance_state, states=states ) if queue: self.check_run_status() scheduler.every(self.runner_options.delay_check_run_status).seconds.do( self.check_run_status ) # Limited workers will only affect the process interacting with the queue # manager. When a job is submitted or terminated the count in the # limited_workers can be directly updated, since by construction only one # process will take care of the queue state. # The refresh can be run on a relatively high delay since it should only # account for actions from the user (e.g. rerun, cancel), that can alter # the number of submitted/running jobs. if self.limited_workers: self.refresh_num_current_jobs() scheduler.every(self.runner_options.delay_refresh_limited).seconds.do( self.refresh_num_current_jobs ) if self.batch_workers: self.update_batch_jobs() scheduler.every(self.runner_options.delay_update_batch).seconds.do( self.update_batch_jobs ) ticks_remaining: int | bool = True if ticks is not None: ticks_remaining = ticks while ticks_remaining: if self.stop_signal: logger.info("stopping due to sigterm") break scheduler.run_pending() time.sleep(1) if ticks is not None: ticks_remaining -= 1
[docs] def run_all_jobs( self, max_seconds: int | None = None, ) -> None: """ Use the runner to run all the jobs in the DB. Mainly used for testing. """ states = [ JobState.CHECKED_OUT.value, JobState.TERMINATED.value, JobState.DOWNLOADED.value, JobState.UPLOADED.value, ] scheduler = SafeScheduler(seconds_after_failure=120) t0 = time.time() # run a first call for each case, since schedule will wait for the delay # to make the first execution. self.checkout() scheduler.every(self.runner_options.delay_checkout).seconds.do(self.checkout) self.advance_state(states) scheduler.every(self.runner_options.delay_advance_status).seconds.do( self.advance_state, states=states ) self.check_run_status() scheduler.every(self.runner_options.delay_check_run_status).seconds.do( self.check_run_status ) # Limited workers will only affect the process interacting with the queue # manager. When a job is submitted or terminated the count in the # limited_workers can be directly updated, since by construction only one # process will take care of the queue state. # The refresh can be run on a relatively high delay since it should only # account for actions from the user (e.g. rerun, cancel), that can alter # the number of submitted/running jobs. if self.limited_workers: self.refresh_num_current_jobs() scheduler.every(self.runner_options.delay_refresh_limited).seconds.do( self.refresh_num_current_jobs ) if self.batch_workers: self.update_batch_jobs() scheduler.every(self.runner_options.delay_update_batch).seconds.do( self.update_batch_jobs ) running_states = [ JobState.READY.value, JobState.CHECKED_OUT.value, JobState.TERMINATED.value, JobState.DOWNLOADED.value, JobState.UPLOADED.value, JobState.SUBMITTED.value, JobState.RUNNING.value, JobState.BATCH_RUNNING.value, JobState.BATCH_SUBMITTED.value, ] query = {"state": {"$in": running_states}} jobs_available = 1 while jobs_available: scheduler.run_pending() time.sleep(0.2) jobs_available = self.job_controller.count_jobs(query=query) if max_seconds and time.time() - t0 > max_seconds: raise RuntimeError( "Could execute all the jobs within the selected amount of time" )
[docs] def run_one_job( self, db_id: str | None = None, job_id: tuple[str, int] | None = None, max_seconds: int | None = None, raise_at_timeout: bool = True, ) -> bool: """ Use the runner to run a single Job until it reaches a terminal state. The job should be in the READY state and there should be no Mainly used for testing. """ states = [ JobState.CHECKED_OUT.value, JobState.TERMINATED.value, JobState.DOWNLOADED.value, JobState.UPLOADED.value, ] scheduler = SafeScheduler(seconds_after_failure=120) t0 = time.time() query: dict = {} if db_id: query["db_id"] = db_id if job_id: query["uuid"] = job_id[0] query["index"] = job_id[1] job_data = self.job_controller.checkout_job(query=query) if not job_data: if not db_id and not job_id: return False if not db_id: job_data = job_id else: j_info = self.job_controller.get_job_info(db_id=db_id) job_data = (j_info.uuid, j_info.index) filters = {"uuid": job_data[0], "index": job_data[1]} self.advance_state(states) scheduler.every(self.runner_options.delay_advance_status).seconds.do( self.advance_state, states=states, filter=filters, ) self.check_run_status() scheduler.every(self.runner_options.delay_check_run_status).seconds.do( self.check_run_status, filter=filters ) running_states = [ JobState.READY.value, JobState.CHECKED_OUT.value, JobState.TERMINATED.value, JobState.DOWNLOADED.value, JobState.UPLOADED.value, JobState.SUBMITTED.value, JobState.RUNNING.value, ] while True: scheduler.run_pending() time.sleep(0.2) job_info = self.job_controller.get_job_info( job_id=job_data[0], job_index=job_data[1] ) if job_info.state.value not in running_states: return True if max_seconds and time.time() - t0 > max_seconds: if raise_at_timeout: raise RuntimeError( "Could execute the job within the selected amount of time" ) return False
def _get_limited_worker_query(self, states: list[str]) -> dict | None: """ Generate the query to be used for fetching Jobs for workers with limited number of Jobs allowed. Parameters ---------- states The states to be used in the query. Returns ------- A dictionary with the query. """ states = [s for s in states if s != JobState.UPLOADED.value] available_workers = [w for w in self.workers if w not in self.limited_workers] for worker, status in self.limited_workers.items(): if status["current"] < status["max"]: available_workers.append(worker) states_query = {"state": {"$in": states}} uploaded_query = { "state": JobState.UPLOADED.value, "worker": {"$in": available_workers}, } if states and available_workers: return {"$or": [states_query, uploaded_query]} if states: return states_query if available_workers: return uploaded_query return None
[docs] def advance_state(self, states: list[str], filter: dict | None = None) -> None: # noqa: A002 """ Acquire the lock and advance the state of a single job. Parameters ---------- states The state of the Jobs that can be queried. """ states_methods = { JobState.CHECKED_OUT: self.upload, JobState.UPLOADED: self.submit, JobState.TERMINATED: self.download, JobState.DOWNLOADED: self.complete_job, } while True: # handle the case of workers with limited number of jobs if self.limited_workers and JobState.UPLOADED.value in states: query = self._get_limited_worker_query(states=states) if not query: return else: query = {"state": {"$in": states}} if filter: query.update(filter) with self.job_controller.lock_job_for_update( query=query, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, ) as lock: doc = lock.locked_document if not doc: return state = JobState(doc["state"]) states_methods[state](lock)
[docs] def upload(self, lock: MongoLock) -> None: """ Upload files for a locked Job in the CHECKED_OUT state. If successful set the state to UPLOADED. Parameters ---------- lock The MongoLock with the locked Job document. """ doc = lock.locked_document if doc is None: raise RuntimeError("No document found in the lock.") db_id = doc["db_id"] logger.debug(f"upload db_id: {db_id}") job_dict = doc["job"] worker = self.get_worker(doc["worker"]) host = self.get_host(doc["worker"]) store = self.jobstore # TODO would it be better/feasible to keep a pool of the required # Stores already connected, to avoid opening and closing them? store.connect() try: resolve_job_dict_args(job_dict, store) finally: try: store.close() except Exception: logging.exception(f"error while closing the store {store}") remote_path = get_job_path(job_dict["uuid"], job_dict["index"], worker.work_dir) # Set the value of the original store for dynamical workflow. Usually it # will be None don't add the serializer, at this stage the default_orjson # serializer could undergo refactoring and this could break deserialization # of older FWs. It is set in the FireTask at runtime. remote_store = get_remote_store( store=store, work_dir=remote_path, config_dict=self.project.remote_jobstore ) created = host.mkdir(remote_path) if not created: err_msg = ( f"Could not create remote directory {remote_path} for db_id {db_id}" ) logger.error(err_msg) raise RemoteError(err_msg, no_retry=False) serialized_input = get_remote_in_file(job_dict, remote_store) path_file = Path(remote_path, IN_FILENAME) host.put(serialized_input, str(path_file)) set_output = { "$set": {"run_dir": remote_path, "state": JobState.UPLOADED.value} } lock.update_on_release = set_output
[docs] def submit(self, lock: MongoLock) -> None: """ Submit to the queue for a locked Job in the UPLOADED state. If successful set the state to SUBMITTED. Parameters ---------- lock The MongoLock with the locked Job document. """ doc = lock.locked_document if doc is None: raise RuntimeError("No document found in the lock.") logger.debug(f"submit db_id: {doc['db_id']}") job_dict = doc["job"] worker_name = doc["worker"] worker = self.get_worker(worker_name) remote_path = Path(doc["run_dir"]) script_commands = [f"jf -fe execution run {remote_path}"] queue_manager = self.get_queue_manager(worker_name) qout_fpath = remote_path / OUT_FNAME qerr_fpath = remote_path / ERR_FNAME exec_config = doc["exec_config"] if isinstance(exec_config, str): exec_config = self.config_manager.get_exec_config( exec_config_name=exec_config, project_name=self.project_name ) elif isinstance(exec_config, dict): exec_config = ExecutionConfig.parse_obj(exec_config) # define an empty default if it is not set exec_config = exec_config or ExecutionConfig() if worker_name in self.batch_workers: resources: dict = {} set_name_out( resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath ) shell_manager = queue_manager.get_shell_manager() shell_manager.write_submission_script( commands=script_commands, pre_run=exec_config.pre_run, post_run=exec_config.post_run, options=resources, export=exec_config.export, modules=exec_config.modules, work_dir=remote_path, create_submit_dir=False, ) self.batch_workers[worker_name].submit_job( job_id=doc["uuid"], index=doc["index"] ) lock.update_on_release = { "$set": { "state": JobState.BATCH_SUBMITTED.value, } } else: # decode in case it contains a QResources. It was not deserialized before. resources = ( MontyDecoder().process_decoded(doc["resources"]) or worker.resources or {} ) set_name_out( resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath ) pre_run = worker.pre_run or "" if exec_config.pre_run: pre_run += "\n" + exec_config.pre_run post_run = worker.post_run or "" if exec_config.post_run: post_run += "\n" + exec_config.post_run submit_result = queue_manager.submit( commands=script_commands, pre_run=pre_run, post_run=post_run, options=resources, export=exec_config.export, modules=exec_config.modules, work_dir=remote_path, create_submit_dir=False, ) if submit_result.status == SubmissionStatus.FAILED: err_msg = f"submission failed. {submit_result!r}" raise RemoteError(err_msg, no_retry=False) if submit_result.status == SubmissionStatus.JOB_ID_UNKNOWN: err_msg = ( "submission succeeded but ID not known. Job may be running " f"but status cannot be checked. {submit_result!r}" ) raise RemoteError(err_msg, no_retry=True) if submit_result.status == SubmissionStatus.SUCCESSFUL: lock.update_on_release = { "$set": { "remote.process_id": str(submit_result.job_id), "state": JobState.SUBMITTED.value, } } if worker_name in self.limited_workers: self.limited_workers[worker_name]["current"] += 1 else: raise RemoteError( f"unhandled submission status {submit_result.status}", no_retry=True )
[docs] def download(self, lock) -> None: """ Download the final files for a locked Job in the TERMINATED state. If successful set the state to DOWNLOADED. Parameters ---------- lock The MongoLock with the locked Job document. """ doc = lock.locked_document logger.debug(f"download db_id: {doc['db_id']}") # job_doc = JobDoc(**doc) job_dict = doc["job"] # If the worker is local do not copy the files in the temporary folder # It should not arrive to this point, since it should go directly # from SUBMITTED/RUNNING to DOWNLOADED in case of local worker worker = self.get_worker(doc["worker"]) if not worker.is_local: host = self.get_host(doc["worker"]) store = self.jobstore remote_path = doc["run_dir"] local_base_dir = Path(self.project.tmp_dir, "download") local_path = get_job_path( job_dict["uuid"], job_dict["index"], local_base_dir ) makedirs_p(local_path) fnames = [OUT_FILENAME] fnames.extend( get_remote_store_filenames( store, config_dict=self.project.remote_jobstore ) ) for fname in fnames: # in principle fabric should work by just passing the # destination folder, but it fails remote_file_path = str(Path(remote_path, fname)) try: host.get(remote_file_path, str(Path(local_path, fname))) except FileNotFoundError as exc: # if files are missing it should not retry err_msg = f"file {remote_file_path} for job {job_dict['uuid']} does not exist" logger.exception(err_msg) raise RemoteError(err_msg, no_retry=True) from exc lock.update_on_release = {"$set": {"state": JobState.DOWNLOADED.value}}
[docs] def complete_job(self, lock) -> None: """ Complete a locked Job in the DOWNLOADED state. If successful set the state to COMPLETED, otherwise to FAILED. Parameters ---------- lock The MongoLock with the locked Job document. """ doc = lock.locked_document logger.debug(f"complete job db_id: {doc['db_id']}") # if the worker is local the files were not copied to the temporary # folder, but the files could be directly updated worker = self.get_worker(doc["worker"]) if worker.is_local: local_path = doc["run_dir"] else: local_base_dir = Path(self.project.tmp_dir, "download") local_path = get_job_path(doc["uuid"], doc["index"], local_base_dir) try: store = self.jobstore completed = self.job_controller.complete_job(doc, local_path, store) except json.JSONDecodeError as exc: # if an empty file is copied this error can appear, do not retry err_msg = traceback.format_exc() raise RemoteError(err_msg, no_retry=True) from exc # remove local folder with downloaded files if successfully completed if completed and self.runner_options.delete_tmp_folder and not worker.is_local: shutil.rmtree(local_path, ignore_errors=True) if not completed: err_msg = "the parsed output does not contain the required information to complete the job" raise RemoteError(err_msg, no_retry=True)
[docs] def check_run_status(self, filter: dict | None = None) -> None: # noqa: A002 """ Check the status of all the jobs submitted to a queue. If Jobs started update their state from SUBMITTED to RUNNING. If Jobs terminated set their state to TERMINATED if running on a remote host. If on a local host set them directly to DOWNLOADED. """ logger.debug("check_run_status") # check for jobs that could have changed state workers_ids_docs: dict = defaultdict(dict) db_filter = { "state": {"$in": [JobState.SUBMITTED.value, JobState.RUNNING.value]}, "lock_id": None, "remote.retry_time_limit": {"$not": {"$gt": datetime.utcnow()}}, } if filter: db_filter.update(filter) projection = [ "db_id", "uuid", "index", "remote", "worker", "state", ] for doc in self.job_controller.get_jobs(db_filter, projection): worker_name = doc["worker"] remote_doc = doc["remote"] workers_ids_docs[worker_name][remote_doc["process_id"]] = doc for worker_name, ids_docs in workers_ids_docs.items(): error = None if not ids_docs: continue qjobs_dict = {} try: ids_list = list(ids_docs) queue = self.get_queue_manager(worker_name) qjobs = queue.get_jobs_list(ids_list) qjobs_dict = {qjob.job_id: qjob for qjob in qjobs} except Exception: logger.warning( f"error trying to get jobs list for worker: {worker_name}", exc_info=True, ) error = traceback.format_exc() for doc_id, doc in ids_docs.items(): # TODO if failed should maybe be handled differently? remote_doc = doc["remote"] qjob = qjobs_dict.get(doc_id) qstate = qjob.state if qjob else None next_state = None start_time = None if ( qstate == QState.RUNNING and doc["state"] == JobState.SUBMITTED.value ): next_state = JobState.RUNNING start_time = datetime.utcnow() logger.debug( f"remote job with id {remote_doc['process_id']} is running" ) elif qstate in [None, QState.DONE, QState.FAILED]: worker = self.get_worker(worker_name) # if the worker is local go directly to DOWNLOADED, as files # are not copied locally if not worker.is_local: next_state = JobState.TERMINATED else: next_state = JobState.DOWNLOADED logger.debug( f"terminated remote job with id {remote_doc['process_id']}" ) elif not error and remote_doc["step_attempts"] > 0: # reset the step attempts if succeeding in case there was # an error earlier. Setting the state to the same as the # current triggers the update that cleans the state next_state = JobState(doc["state"]) # the document needs to be updated only in case of error or if a # next state has been set. # Only update if the state did not change in the meanwhile if next_state or error: lock_filter = { "uuid": doc["uuid"], "index": doc["index"], "state": doc["state"], } with self.job_controller.lock_job_for_update( query=lock_filter, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, ) as lock: if lock.locked_document: if error: raise RemoteError(error, no_retry=False) set_output = { "$set": { "remote.queue_state": ( qstate.value if qstate else None ), "state": next_state.value, } } if start_time: set_output["$set"]["start_time"] = start_time lock.update_on_release = set_output # decrease the amount of jobs running if it is a limited worker if ( next_state in (JobState.TERMINATED, JobState.DOWNLOADED) and worker_name in self.limited_workers ): self.limited_workers[doc["worker"]]["current"] -= 1
[docs] def checkout(self) -> None: """Checkout READY Jobs.""" logger.debug("checkout jobs") n_checked_out = 0 while True: try: reserved = self.job_controller.checkout_job() if not reserved: break except Exception: logger.exception("Error while checking out jobs") break n_checked_out += 1 logger.debug(f"checked out {n_checked_out} jobs")
[docs] def refresh_num_current_jobs(self) -> None: """ Update the number of jobs currently running for worker with limited number of Jobs. """ for name, state in self.limited_workers.items(): query = { "state": {"$in": [JobState.SUBMITTED.value, JobState.RUNNING.value]}, "worker": name, } state["current"] = self.job_controller.count_jobs(query)
[docs] def update_batch_jobs(self) -> None: """ Update the status of batch jobs. Includes submitting to the remote queue, checking the status of running jobs in the queue and handle the files with the Jobs information about their status. """ logger.debug("update batch jobs") for worker_name, batch_manager in self.batch_workers.items(): worker = self.get_worker(worker_name) # first check the processes that are running from the folder # and set them to running if needed running_jobs = batch_manager.get_running() for job_id, job_index, process_running_uuid in running_jobs: lock_filter = { "uuid": job_id, "index": job_index, "state": JobState.BATCH_SUBMITTED.value, } with self.job_controller.lock_job_for_update( query=lock_filter, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, ) as lock: if lock.locked_document: set_output = { "$set": { "state": JobState.BATCH_RUNNING.value, "start_time": datetime.utcnow(), "remote.process_id": process_running_uuid, } } lock.update_on_release = set_output # Check the processes that should be running on the remote queue # and update the state in the DB if something changed batch_processes_data = self.job_controller.get_batch_processes(worker_name) processes = list(batch_processes_data) queue_manager = self.get_queue_manager(worker_name) if processes: qjobs = queue_manager.get_jobs_list(processes) running_processes = {qjob.job_id for qjob in qjobs} stopped_processes = set(processes) - running_processes for pid in stopped_processes: self.job_controller.remove_batch_process(pid, worker_name) # check if there are jobs that were in the running folder of a # process that finished and set them to remote error for job_id, job_index, process_running_uuid in running_jobs: if batch_processes_data[pid] == process_running_uuid: lock_filter = { "uuid": job_id, "index": job_index, "state": { "$in": ( JobState.BATCH_SUBMITTED.value, JobState.BATCH_RUNNING.value, ) }, } with self.job_controller.lock_job_for_update( query=lock_filter, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, ) as lock: if lock.locked_document: err_msg = f"The batch process that was running the job (process_id: {pid}, uuid: {process_running_uuid} was likely killed before terminating the job execution" raise RuntimeError(err_msg) processes = list(running_processes) # check that enough processes are submitted and submit the required # amount to reach max_jobs, if needed. n_jobs = self.job_controller.count_jobs( { "state": { "$in": ( JobState.BATCH_SUBMITTED.value, JobState.BATCH_RUNNING.value, ) }, "worker": worker_name, } ) n_processes = len(processes) n_jobs_to_submit = min( max(worker.max_jobs - n_processes, 0), max(n_jobs - n_processes, 0) ) logger.debug( f"submitting {n_jobs_to_submit} batch jobs for worker {worker_name}" ) for _ in range(n_jobs_to_submit): resources = worker.resources or {} process_running_uuid = suuid() remote_path = Path( get_job_path(process_running_uuid, None, worker.batch.work_dir) ) qout_fpath = remote_path / OUT_FNAME qerr_fpath = remote_path / ERR_FNAME set_name_out( resources, f"batch_{process_running_uuid}", out_fpath=qout_fpath, err_fpath=qerr_fpath, ) # note that here the worker.work_dir needs to be passed, # not the worker.batch.work_dir command = f"jf -fe execution run-batch {worker.work_dir} {worker.batch.jobs_handle_dir} {process_running_uuid}" if worker.batch.max_jobs: command += f" -mj {worker.batch.max_jobs}" if worker.batch.max_time: command += f" -mt {worker.batch.max_time}" if worker.batch.max_wait: command += f" -mw {worker.batch.max_wait}" submit_result = queue_manager.submit( commands=[command], pre_run=worker.pre_run, post_run=worker.post_run, options=resources, work_dir=remote_path, create_submit_dir=True, ) if submit_result.status == SubmissionStatus.FAILED: logger.error(f"submission failed. {submit_result!r}") elif submit_result.status == SubmissionStatus.JOB_ID_UNKNOWN: logger.error( f"submission succeeded but ID not known. Job may be running but status cannot be checked. {submit_result!r}" ) elif submit_result.status == SubmissionStatus.SUCCESSFUL: self.job_controller.add_batch_process( submit_result.job_id, process_running_uuid, worker_name ) else: logger.error(f"unhandled submission status {submit_result.status}") # check for jobs that have terminated in the batch runner and # update the DB state accordingly terminated_jobs = batch_manager.get_terminated() for job_id, job_index, process_running_uuid in terminated_jobs: lock_filter = { "uuid": job_id, "index": job_index, "state": { "$in": ( JobState.BATCH_SUBMITTED.value, JobState.BATCH_RUNNING.value, ) }, } with self.job_controller.lock_job_for_update( query=lock_filter, max_step_attempts=self.runner_options.max_step_attempts, delta_retry=self.runner_options.delta_retry, ) as lock: if lock.locked_document: if not worker.is_local: next_state = JobState.TERMINATED else: next_state = JobState.DOWNLOADED set_output = { "$set": { "state": next_state.value, "remote.process_id": process_running_uuid, } } lock.update_on_release = set_output batch_manager.delete_terminated( [(job_id, job_index, process_running_uuid)] )
[docs] def cleanup(self) -> None: """Close all the connections after stopping the Runner.""" for worker_name, host in self.hosts.items(): try: host.close() except Exception: logging.exception( f"error while closing connection to worker {worker_name}" ) try: self.jobstore.close() except Exception: logging.exception("error while closing connection to jobstore") self.job_controller.close()