"""The Runner orchestrating the Jobs execution."""
from __future__ import annotations
import getpass
import json
import logging
import math
import shutil
import signal
import socket
import time
import traceback
import uuid
from collections import OrderedDict, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from monty.json import MontyDecoder
from monty.os import makedirs_p
from qtoolkit.core.data_objects import QState, SubmissionStatus
from rich.prompt import Confirm
from jobflow_remote import JobController
from jobflow_remote.config.base import (
ConfigError,
ExecutionConfig,
LogLevel,
Project,
RunnerOptions,
WorkerBase,
)
from jobflow_remote.config.manager import ConfigManager
from jobflow_remote.jobs.batch import RemoteBatchManager
from jobflow_remote.jobs.data import (
BATCH_INFO_FILENAME,
IN_FILENAME,
OUT_FILENAME,
RemoteError,
)
from jobflow_remote.jobs.state import BatchState, JobState
from jobflow_remote.remote.data import (
get_job_path,
get_local_data_path,
get_remote_in_file,
get_remote_store,
get_remote_store_filenames,
resolve_job_dict_args,
)
from jobflow_remote.remote.queue import ERR_FNAME, OUT_FNAME, QueueManager, set_name_out
from jobflow_remote.utils.data import suuid
from jobflow_remote.utils.db import MissingDocumentError
from jobflow_remote.utils.log import initialize_runner_logger
from jobflow_remote.utils.remote import UnsafeDeletionError, safe_remove_job_files
from jobflow_remote.utils.schedule import SafeScheduler
if TYPE_CHECKING:
from jobflow.core.store import JobStore
from jobflow_remote.remote.host import BaseHost
from jobflow_remote.utils.db import MongoLock
logger = logging.getLogger(__name__)
[docs]
class Runner:
"""
Object orchestrating the execution of all the Jobs.
Advances the status of the Jobs, handles the communication with the workers
and updates the queue and output databases.
The main entry point is the `run` method. It is supposed to be executed
by a daemon, but can also be run directly for testing purposes.
It allows to run all the steps required to advance the Job's states or even
a subset of them, to parallelize the different tasks.
The runner instantiates a pool of workers and hosts given in the project
definition. A single connection will be opened if multiple workers share
the same host.
The Runner schedules the execution of the specific tasks at regular intervals
and relies on objects like QueueManager, BaseHost and JobController to
interact with workers and databases.
"""
def __init__(
self,
project_name: str | None = None,
log_level: LogLevel | None = None,
runner_id: str | None = None,
connect_interactive: bool = False,
daemon_id: str | None = None,
) -> None:
"""
Parameters
----------
project_name
Name of the project. Used to retrieve all the configurations required
to execute the runner.
log_level
Logging level of the Runner.
runner_id
A unique identifier for the Runner process. Used to identify the
runner process in logging and in the DB locks.
If None a uuid will be generated.
connect_interactive
If True during initialization will open connections to the hosts
marked as interactive.
daemon_id
A unique identifier for the daemon process that manages this Runner process.
"""
self.stop_signal = False
self.runner_id: str = runner_id or str(uuid.uuid4())
self.daemon_id = daemon_id
self.config_manager: ConfigManager = ConfigManager()
self.project: Project = self.config_manager.get_project(project_name)
self.project_name = self.project.name
self.job_controller: JobController = JobController.from_project(self.project)
self.workers: dict[str, WorkerBase] = self.project.workers
# Build the dictionary of hosts. The reference is the worker name.
# If two hosts match, use the same instance
self.hosts: dict[str, BaseHost] = {}
for wname, w in self.workers.items():
new_host = w.get_host()
for host in self.hosts.values():
if new_host == host:
self.hosts[wname] = host
break
else:
self.hosts[wname] = new_host
self.limited_workers = {
name: {"max": w.max_jobs, "current": 0}
for name, w in self.workers.items()
if w.max_jobs and not w.batch
}
self.batch_workers = {}
for wname, w in self.workers.items():
if w.batch is not None:
self.batch_workers[wname] = RemoteBatchManager(
self.hosts[wname], w.batch.jobs_handle_dir
)
self.queue_managers: dict = {}
log_level = log_level if log_level is not None else self.project.log_level
initialize_runner_logger(
log_folder=self.project.log_dir,
level=log_level.to_logging(),
runner_id=self.runner_id,
)
# TODO it could be better to create a pool of stores that are connected
# How to deal with cases where the connection gets closed?
# how to deal with file based stores?
self.jobstore = self.project.get_jobstore()
self.optional_jobstores = {}
if self.project.optional_jobstores:
self.optional_jobstores = {
name: self.project.get_jobstore(name)
for name in self.project.optional_jobstores
}
# create a cached for the jobstores to be used in the get_jobstore method
self._cached_jobstores: OrderedDict[str, JobStore] = OrderedDict()
# create a cached for the batches containing batch_uid to process_id
self._cached_batches: dict = {}
# create a cache of running batch pids: set of running pids for each worker name
self._cached_running_batch_pids: dict[str, set] = {}
if connect_interactive:
for host_name, host in self.hosts.items():
if (
host.interactive_login
and not host.is_connected
and Confirm.ask(
f"Do you want to open the connection for the host of the {host_name} worker?"
)
):
host.connect()
self.runner_host = socket.gethostname()
self.pinged_db = False
@property
def runner_options(self) -> RunnerOptions:
"""The Runner options defined in the project."""
return self.project.runner
[docs]
def handle_signal(self, signum, frame) -> None:
"""
Handle the SIGTERM signal in the Runner.
Sets a variable that will stop the Runner loop.
"""
logger.info(f"Received signal: {signum}")
self.stop_signal = True
[docs]
def get_worker(self, worker_name: str) -> WorkerBase:
"""
Get the worker from the pool of workers instantiated by the Runner.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the corresponding worker.
"""
if worker_name not in self.workers:
raise ConfigError(
f"No worker {worker_name} is defined in project {self.project_name}"
)
return self.workers[worker_name]
[docs]
def get_host(self, worker_name: str) -> BaseHost:
"""
Get the host associated to a worker from the pool of hosts instantiated
by the Runner.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the Host associated to the worker.
"""
host = self.hosts[worker_name]
if not host.is_connected:
host.connect()
return host
[docs]
def get_queue_manager(self, worker_name: str) -> QueueManager:
"""
Get an instance of the queue manager associated to a worker, based on its host.
Parameters
----------
worker_name
The name of the worker.
Returns
-------
An instance of the QueueManager associated to the worker.
"""
if worker_name not in self.queue_managers:
worker = self.get_worker(worker_name)
self.queue_managers[worker_name] = QueueManager(
worker.get_scheduler_io(), self.get_host(worker_name)
)
return self.queue_managers[worker_name]
[docs]
def get_jobstore(self, flow_id: str | None) -> JobStore:
"""
Get the JobStore associated to a Flow.
Uses a small internal cache to reduce the calls to the DB, assuming
that not too many Flows will be updated at the same time.
Parameters
----------
flow_id
The uuid of the Flow.
Returns
-------
JobStore
The JobStore associated to a Flow.
"""
# A simple manual cache based on an OrderedDict is used here.
# The lru_cache decorator is discouraged since it will prevent
# garbage collection of the object. In principle this is likely
# not an issue for the Runner, but to avoid potential issues it is
# not used.
# Other packages exist that offer these functionalities (e.g. cachetools),
# but adding a dependence for this trivial caching seems an overkill.
# Note that an OrderedDict is needed because popitem() for a standard
# dict does not have the `last` argument.
if flow_id in self._cached_jobstores:
return self._cached_jobstores[flow_id]
jobstore = self.jobstore
if flow_id and self.optional_jobstores:
store_name = self.job_controller.get_flow_store(flow_id=flow_id)
if store_name:
jobstore = self.optional_jobstores[store_name]
self._cached_jobstores[flow_id] = jobstore
if len(self._cached_jobstores) > 20:
self._cached_jobstores.popitem(last=False)
return jobstore
[docs]
def run(
self,
transfer: bool = True,
complete: bool = True,
queue: bool = True,
checkout: bool = True,
ticks: int | None = None,
) -> None:
"""
Start the runner.
Which actions are being performed can be tuned by the arguments.
Parameters
----------
transfer
If True actions related to file transfer are performed by the runner.
complete
If True Job completion is performed by the runner.
queue
If True interactions with the queue manager are handled by the Runner.
checkout
If True the checkout of Jobs is performed by the Runner.
ticks
If provided, the Runner will run for this number of ticks before exiting.
"""
signal.signal(signal.SIGTERM, self.handle_signal)
states = []
if transfer:
states.append(JobState.CHECKED_OUT.value)
states.append(JobState.RUN_FINISHED.value)
if complete:
states.append(JobState.DOWNLOADED.value)
if queue:
states.append(JobState.UPLOADED.value)
logger.info(
f"Runner run options: transfer: {transfer} complete: {complete} queue: {queue} checkout: {checkout}"
)
scheduler = SafeScheduler(seconds_after_failure=120)
# run a first call for each case, since schedule will wait for the delay
# to make the first execution.
if checkout:
try:
self.checkout()
except Exception:
logger.exception("Error during initial checkout")
scheduler.every(self.runner_options.delay_checkout).seconds.do(
self.checkout
)
# first perform this step with check run status and update the status of
# the limited workers (if any). Otherwise, upon start of the runner the initial
# count of the currently submitted jobs is 0.
if queue:
try:
self.check_run_status()
except Exception:
logger.exception("Error during initial check_run_status")
scheduler.every(self.runner_options.delay_check_run_status).seconds.do(
self.check_run_status
)
# Limited workers will only affect the process interacting with the queue
# manager. When a job is submitted or finished to run, the count in the
# limited_workers can be directly updated, since by construction only one
# process will take care of the queue state.
# The refresh can be run on a relatively high delay since it should only
# account for actions from the user (e.g. rerun, cancel), that can alter
# the number of submitted/running jobs.
if self.limited_workers:
try:
self.refresh_num_current_jobs()
except Exception:
logger.exception("Error during initial refresh_num_current_jobs")
scheduler.every(self.runner_options.delay_refresh_limited).seconds.do(
self.refresh_num_current_jobs
)
if self.batch_workers:
try:
self.update_batch_jobs()
except Exception:
logger.exception("Error during initial update_batch_jobs")
scheduler.every(self.runner_options.delay_update_batch).seconds.do(
self.update_batch_jobs
)
if transfer or queue or complete:
try:
self.advance_state(states)
except Exception:
logger.exception("Error during initial advance_state")
scheduler.every(self.runner_options.delay_advance_status).seconds.do(
self.advance_state, states=states
)
# all the processes will ping the running_runner document to signal
# that at least one is still active.
run_options = dict(
transfer=transfer,
complete=complete,
queue=queue,
checkout=checkout,
)
try:
self.ping_running_runner(run_options=run_options)
except Exception:
logger.exception("Error during initial ping running runner")
scheduler.every(self.project.runner.delay_ping_db).seconds.do(
self.ping_running_runner, run_options=run_options
)
ticks_remaining: int | bool = True
if ticks is not None:
ticks_remaining = ticks
while ticks_remaining:
if self.stop_signal:
logger.info("stopping due to sigterm")
break
scheduler.run_pending()
time.sleep(1)
if ticks is not None:
ticks_remaining -= 1
[docs]
def run_all_jobs(
self,
max_seconds: int | None = None,
wait_for_batches: bool = False,
) -> None:
"""
Use the runner to run all the jobs in the DB.
Mainly used for testing.
"""
states = [
JobState.CHECKED_OUT.value,
JobState.RUN_FINISHED.value,
JobState.DOWNLOADED.value,
JobState.UPLOADED.value,
]
scheduler = SafeScheduler(seconds_after_failure=120)
t0 = time.time()
# run a first call for each case, since schedule will wait for the delay
# to make the first execution.
self.checkout()
scheduler.every(self.runner_options.delay_checkout).seconds.do(self.checkout)
self.check_run_status()
scheduler.every(self.runner_options.delay_check_run_status).seconds.do(
self.check_run_status
)
# Limited workers will only affect the process interacting with the queue
# manager. When a job is submitted or finished to run, the count in the
# limited_workers can be directly updated, since by construction only one
# process will take care of the queue state.
# The refresh can be run on a relatively high delay since it should only
# account for actions from the user (e.g. rerun, cancel), that can alter
# the number of submitted/running jobs.
if self.limited_workers:
self.refresh_num_current_jobs()
scheduler.every(self.runner_options.delay_refresh_limited).seconds.do(
self.refresh_num_current_jobs
)
if self.batch_workers:
self.update_batch_jobs()
scheduler.every(self.runner_options.delay_update_batch).seconds.do(
self.update_batch_jobs
)
self.advance_state(states)
scheduler.every(self.runner_options.delay_advance_status).seconds.do(
self.advance_state, states=states
)
running_states = [
JobState.READY.value,
JobState.CHECKED_OUT.value,
JobState.RUN_FINISHED.value,
JobState.DOWNLOADED.value,
JobState.UPLOADED.value,
JobState.SUBMITTED.value,
JobState.RUNNING.value,
JobState.BATCH_RUNNING.value,
JobState.BATCH_SUBMITTED.value,
]
query = {"state": {"$in": running_states}}
jobs_available = 1
unfinished_batches = 1 if wait_for_batches else 0
while jobs_available + unfinished_batches:
scheduler.run_pending()
time.sleep(0.2)
jobs_available = self.job_controller.count_jobs(query=query)
if self.job_controller.batches is None:
unfinished_batches = 0
else:
unfinished_batches = len(
self.job_controller.get_batches(
batch_state=[BatchState.SUBMITTED, BatchState.RUNNING]
)
)
if max_seconds and time.time() - t0 > max_seconds:
raise RuntimeError(
"Could not execute all the jobs within the selected amount of time"
)
[docs]
def run_one_job(
self,
db_id: str | None = None,
job_id: tuple[str, int] | None = None,
max_seconds: int | None = None,
raise_at_timeout: bool = True,
target_state: JobState | None = None,
) -> bool:
"""
Use the runner to run a single Job until it reaches a terminal state.
The job should be in the READY state and there should be no
Mainly used for testing.
"""
states = [
JobState.CHECKED_OUT.value,
JobState.RUN_FINISHED.value,
JobState.DOWNLOADED.value,
JobState.UPLOADED.value,
]
scheduler = SafeScheduler(seconds_after_failure=120)
t0 = time.time()
query: dict = {}
if db_id:
query["db_id"] = db_id
if job_id:
query["uuid"] = job_id[0]
query["index"] = job_id[1]
job_data = self.job_controller.checkout_job(query=query)
if not job_data:
if not db_id and not job_id:
return False
if not db_id:
job_data = job_id
else:
j_info = self.job_controller.get_job_info(db_id=db_id)
job_data = (j_info.uuid, j_info.index)
filters = {"uuid": job_data[0], "index": job_data[1]}
self.advance_state(states)
scheduler.every(self.runner_options.delay_advance_status).seconds.do(
self.advance_state,
states=states,
filter=filters,
)
self.check_run_status()
scheduler.every(self.runner_options.delay_check_run_status).seconds.do(
self.check_run_status, filter=filters
)
running_states = [
JobState.READY.value,
JobState.CHECKED_OUT.value,
JobState.RUN_FINISHED.value,
JobState.DOWNLOADED.value,
JobState.UPLOADED.value,
JobState.SUBMITTED.value,
JobState.RUNNING.value,
]
while True:
scheduler.run_pending()
time.sleep(0.2)
job_info = self.job_controller.get_job_info(
job_id=job_data[0], job_index=job_data[1]
)
if target_state and job_info.state == target_state:
return True
if job_info.state.value not in running_states:
# if the target state is defined and the code got
# here, it means it missed the target state, so
# the target was not achieved.
if target_state:
raise RuntimeError(
f"The target state {target_state.value} was not achieved. "
f"Final state: {job_info.state.value}"
)
return True
if max_seconds and time.time() - t0 > max_seconds:
if raise_at_timeout:
raise RuntimeError(
"Could not execute the job within the selected amount of time"
)
return False
def _get_limited_worker_query(self, states: list[str]) -> dict | None:
"""
Generate the query to be used for fetching Jobs for workers with limited
number of Jobs allowed.
Parameters
----------
states
The states to be used in the query.
Returns
-------
A dictionary with the query.
"""
states = [s for s in states if s != JobState.UPLOADED.value]
available_workers = [w for w in self.workers if w not in self.limited_workers]
for worker, status in self.limited_workers.items():
if status["current"] < status["max"]:
available_workers.append(worker)
states_query = {"state": {"$in": states}}
uploaded_query = {
"state": JobState.UPLOADED.value,
"worker": {"$in": available_workers},
}
if states and available_workers:
return {"$or": [states_query, uploaded_query]}
if states:
return states_query
if available_workers:
return uploaded_query
return None
[docs]
def advance_state(self, states: list[str], filter: dict | None = None) -> None: # noqa: A002
"""
Acquire the lock and advance the state of a single job.
Parameters
----------
states
The state of the Jobs that can be queried.
"""
states_methods = {
JobState.CHECKED_OUT: self.upload,
JobState.UPLOADED: self.submit,
JobState.RUN_FINISHED: self.download,
JobState.DOWNLOADED: self.complete_job,
}
while True:
# handle the case of workers with limited number of jobs
if self.limited_workers and JobState.UPLOADED.value in states:
query = self._get_limited_worker_query(states=states)
if not query:
return
else:
query = {"state": {"$in": states}}
if filter:
query.update(filter)
with self.job_controller.lock_job_for_update(
query=query,
max_step_attempts=self.runner_options.max_step_attempts,
delta_retry=self.runner_options.delta_retry,
) as lock:
doc = lock.locked_document
if not doc:
return
state = JobState(doc["state"])
states_methods[state](lock)
[docs]
def upload(self, lock: MongoLock) -> None:
"""
Upload files for a locked Job in the CHECKED_OUT state.
If successful set the state to UPLOADED.
Parameters
----------
lock
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")
db_id = doc["db_id"]
logger.debug(f"upload db_id: {db_id}")
job_dict = doc["job"]
worker = self.get_worker(doc["worker"])
host = self.get_host(doc["worker"])
# if prerun_cleanup is specified (job was likely rerun) delete the folder before
# reuploading the data
run_dir = doc["run_dir"]
if doc.get("remote", {}).get("prerun_cleanup", False):
try:
deleted = safe_remove_job_files(
host=host, run_dir=run_dir, raise_on_error=True
)
# Log as debug. If deletion failed because of an error it will raise.
not_string = "" if deleted else "not "
logger.debug(
f"Folder for job {db_id} ({run_dir}) was {not_string}deleted"
)
except UnsafeDeletionError as e:
raise RemoteError(
f"Error while performing cleanup of the run_dir folder for job {db_id}: {run_dir}",
no_retry=True,
) from e
except Exception as e:
raise RemoteError(
f"Error while performing cleanup of the run_dir folder for job {db_id}: {run_dir}",
no_retry=False,
) from e
store = self.get_jobstore(job_dict["hosts"][-1])
# TODO would it be better/feasible to keep a pool of the required
# Stores already connected, to avoid opening and closing them?
store.connect()
try:
resolve_job_dict_args(job_dict, store)
finally:
try:
store.close()
except Exception:
logging.exception(f"error while closing the store {store}")
remote_path = get_job_path(job_dict["uuid"], job_dict["index"], worker.work_dir)
# Set the value of the original store for dynamical workflow. Usually it
# will be None don't add the serializer, at this stage the default_orjson
# serializer could undergo refactoring and this could break deserialization
# of older FWs. It is set in the FireTask at runtime.
remote_store = get_remote_store(
store=store, work_dir=remote_path, config_dict=self.project.remote_jobstore
)
created = host.mkdir(remote_path)
if not created:
err_msg = (
f"Could not create remote directory {remote_path} for db_id {db_id}"
)
logger.error(err_msg)
raise RemoteError(err_msg, no_retry=False)
serialized_input = get_remote_in_file(job_dict, remote_store, doc)
path_file = Path(remote_path, IN_FILENAME)
host.put(serialized_input, str(path_file))
set_output = {
"$set": {
"run_dir": remote_path,
"state": JobState.UPLOADED.value,
"remote.prerun_cleanup": False,
}
}
lock.update_on_release = set_output
[docs]
def submit(self, lock: MongoLock) -> None:
"""
Submit to the queue for a locked Job in the UPLOADED state.
If successful set the state to SUBMITTED.
Parameters
----------
lock
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
if doc is None:
raise RuntimeError("No document found in the lock.")
logger.debug(f"submit db_id: {doc['db_id']}")
job_dict = doc["job"]
worker_name = doc["worker"]
worker = self.get_worker(worker_name)
remote_path = Path(doc["run_dir"])
execution_cmd = worker.execution_cmd or "jf -fe execution run {}"
script_commands = [execution_cmd.format(remote_path)]
queue_manager = self.get_queue_manager(worker_name)
qout_fpath = remote_path / OUT_FNAME
qerr_fpath = remote_path / ERR_FNAME
exec_config = doc["exec_config"]
if isinstance(exec_config, str):
exec_config = self.config_manager.get_exec_config(
exec_config_name=exec_config, project_name=self.project_name
)
elif isinstance(exec_config, dict):
exec_config = ExecutionConfig.parse_obj(exec_config)
# define an empty default if it is not set
exec_config = exec_config or ExecutionConfig()
if worker_name in self.batch_workers:
resources: dict = {}
set_name_out(
resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath
)
shell_manager = queue_manager.get_shell_manager()
shell_manager.write_submission_script(
commands=script_commands,
pre_run=exec_config.pre_run,
post_run=exec_config.post_run,
options=resources,
export=exec_config.export,
modules=exec_config.modules,
work_dir=remote_path,
create_submit_dir=False,
)
self.batch_workers[worker_name].submit_job(
job_id=doc["uuid"], index=doc["index"]
)
lock.update_on_release = {
"$set": {
"state": JobState.BATCH_SUBMITTED.value,
}
}
else:
# decode in case it contains a QResources. It was not deserialized before.
resources = (
MontyDecoder().process_decoded(doc["resources"])
or worker.resources
or {}
)
set_name_out(
resources, job_dict["name"], out_fpath=qout_fpath, err_fpath=qerr_fpath
)
pre_run = worker.pre_run or ""
if exec_config.pre_run:
pre_run += "\n" + exec_config.pre_run
post_run = worker.post_run or ""
if exec_config.post_run:
post_run += "\n" + exec_config.post_run
submit_result = queue_manager.submit(
commands=script_commands,
pre_run=pre_run,
post_run=post_run,
options=resources,
export=exec_config.export,
modules=exec_config.modules,
work_dir=remote_path,
create_submit_dir=False,
)
if submit_result.status == SubmissionStatus.FAILED:
err_msg = f"submission failed. {submit_result!r}"
raise RemoteError(err_msg, no_retry=False)
if submit_result.status == SubmissionStatus.JOB_ID_UNKNOWN:
err_msg = (
"submission succeeded but ID not known. Job may be running "
f"but status cannot be checked. {submit_result!r}"
)
raise RemoteError(err_msg, no_retry=True)
if submit_result.status == SubmissionStatus.SUCCESSFUL:
lock.update_on_release = {
"$set": {
"remote.process_id": str(submit_result.job_id),
"state": JobState.SUBMITTED.value,
}
}
if worker_name in self.limited_workers:
self.limited_workers[worker_name]["current"] += 1
else:
raise RemoteError(
f"unhandled submission status {submit_result.status}", no_retry=True
)
[docs]
def download(self, lock) -> None:
"""
Download the final files for a locked Job in the RUN_FINISHED state.
If successful set the state to DOWNLOADED.
Parameters
----------
lock
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
logger.debug(f"download db_id: {doc['db_id']}")
# job_doc = JobDoc(**doc)
job_dict = doc["job"]
# If the worker is local do not copy the files in the temporary folder
# It should not arrive to this point, since it should go directly
# from SUBMITTED/RUNNING to DOWNLOADED in case of local worker
worker = self.get_worker(doc["worker"])
if not worker.is_local:
host = self.get_host(doc["worker"])
store = self.get_jobstore(job_dict["hosts"][-1])
remote_path = doc["run_dir"]
local_path = get_local_data_path(
project=self.project,
worker=worker,
job_id=doc["uuid"],
index=doc["index"],
run_dir=remote_path,
)
makedirs_p(local_path)
def download_file(fname: str, mandatory: bool):
# in principle fabric should work by just passing the
# destination folder, but it fails
remote_file_path = str(Path(remote_path, fname))
local_file_path = Path(local_path, fname)
try:
host.get(remote_file_path, str(local_file_path))
except FileNotFoundError as exc:
# fabric may still create an empty local file even if the remote
# file does not exist. Remove it to avoid errors when checking
# the file existence
local_file_path.unlink(missing_ok=True)
err_msg = f"file {remote_file_path} for job {job_dict['uuid']} does not exist"
if mandatory:
logger.exception(err_msg)
# if files are missing it should not retry
raise RemoteError(err_msg, no_retry=True) from exc
err_msg += ". Allow continuing to the next state"
logger.warning(err_msg)
# download the queue files first, so if an error is triggered
# afterwards they can be inserted in the DB
for fn in ("queue.out", "queue.err"):
download_file(fn, mandatory=False)
# only the output file is mandatory. If the others are missing
# it will be dealt with by the complete. The output file may contain
# an error and the fact that they are missing could be expected.
# The analysis of the output file is left to the completion procedure.
download_file(OUT_FILENAME, mandatory=True)
for fn in get_remote_store_filenames(
store, config_dict=self.project.remote_jobstore
):
download_file(fn, mandatory=False)
lock.update_on_release = {"$set": {"state": JobState.DOWNLOADED.value}}
[docs]
def complete_job(self, lock) -> None:
"""
Complete a locked Job in the DOWNLOADED state.
If successful set the state to COMPLETED, otherwise to FAILED.
Parameters
----------
lock
The MongoLock with the locked Job document.
"""
doc = lock.locked_document
logger.debug(f"complete job db_id: {doc['db_id']}")
# if the worker is local the files were not copied to the temporary
# folder, but the files could be directly accessed
worker = self.get_worker(doc["worker"])
local_path = get_local_data_path(
project=self.project,
worker=worker,
job_id=doc["uuid"],
index=doc["index"],
run_dir=doc["run_dir"],
)
try:
store = self.get_jobstore(doc["job"]["hosts"][-1])
completed = self.job_controller.complete_job(doc, local_path, store)
except json.JSONDecodeError as exc:
# if an empty file is copied this error can appear, do not retry
err_msg = traceback.format_exc()
raise RemoteError(err_msg, no_retry=True) from exc
# remove local folder with downloaded files if successfully completed
if completed and self.runner_options.delete_tmp_folder and not worker.is_local:
shutil.rmtree(local_path, ignore_errors=True)
if not completed:
err_msg = "the parsed output does not contain the required information to complete the job"
raise RemoteError(err_msg, no_retry=True)
[docs]
def check_run_status(self, filter: dict | None = None) -> None: # noqa: A002
"""
Check the status of all the jobs submitted to a queue.
If Jobs started, update their state from SUBMITTED to RUNNING.
If Jobs finished to run, set their state to RUN_FINISHED if running on a remote
host. If on a local host, set them directly to DOWNLOADED.
"""
logger.debug("check_run_status")
# check for jobs that could have changed state
workers_ids_docs: dict = defaultdict(dict)
db_filter = {
"state": {"$in": [JobState.SUBMITTED.value, JobState.RUNNING.value]},
"lock_id": None,
"remote.retry_time_limit": {"$not": {"$gt": datetime.now(timezone.utc)}},
}
if filter:
db_filter.update(filter)
projection = [
"db_id",
"uuid",
"index",
"remote",
"worker",
"state",
]
for doc in self.job_controller.get_jobs(db_filter, projection):
worker_name = doc["worker"]
remote_doc = doc["remote"]
workers_ids_docs[worker_name][remote_doc["process_id"]] = doc
for worker_name, ids_docs in workers_ids_docs.items():
error = None
if not ids_docs:
continue
worker = self.get_worker(worker_name)
qjobs_dict = {}
try:
ids_list = list(ids_docs)
queue = self.get_queue_manager(worker_name)
qjobs = queue.get_jobs_list(
jobs=ids_list, user=worker.scheduler_username
)
qjobs_dict = {
qjob.job_id: qjob for qjob in qjobs if qjob.job_id in ids_list
}
except Exception:
logger.warning(
f"error trying to get jobs list for worker: {worker_name}",
exc_info=True,
)
error = traceback.format_exc()
for doc_id, doc in ids_docs.items():
# TODO if failed should maybe be handled differently?
remote_doc = doc["remote"]
qjob = qjobs_dict.get(doc_id)
qstate = qjob.state if qjob else None
next_state = None
start_time = None
next_step_delay = None
if (
qstate == QState.RUNNING
and doc["state"] == JobState.SUBMITTED.value
):
next_state = JobState.RUNNING
start_time = datetime.now(timezone.utc)
logger.debug(
f"remote job with id {remote_doc['process_id']} is running"
)
elif qstate in [None, QState.DONE, QState.FAILED]:
# if the worker is local go directly to DOWNLOADED, as files
# are not copied locally
if not worker.is_local:
next_state = JobState.RUN_FINISHED
else:
next_state = JobState.DOWNLOADED
# the delay is applied if the job is finished on the worker
next_step_delay = worker.delay_download
logger.debug(
f"finished run for remote job with id {remote_doc['process_id']}"
)
elif not error and remote_doc["step_attempts"] > 0:
# reset the step attempts if succeeding in case there was
# an error earlier. Setting the state to the same as the
# current triggers the update that cleans the state
next_state = JobState(doc["state"])
# the document needs to be updated only in case of error or if a
# next state has been set.
# Only update if the state did not change in the meanwhile
if next_state or error:
lock_filter = {
"uuid": doc["uuid"],
"index": doc["index"],
"state": doc["state"],
}
with self.job_controller.lock_job_for_update(
query=lock_filter,
max_step_attempts=self.runner_options.max_step_attempts,
delta_retry=self.runner_options.delta_retry,
next_step_delay=next_step_delay,
) as lock:
if lock.locked_document:
if error:
raise RemoteError(error, no_retry=False)
set_output = {
"$set": {
"remote.queue_state": (
qstate.value if qstate else None
),
"state": next_state.value,
}
}
if start_time:
set_output["$set"]["start_time"] = start_time
lock.update_on_release = set_output
# decrease the amount of jobs running if it is a limited worker
if (
next_state in (JobState.RUN_FINISHED, JobState.DOWNLOADED)
and worker_name in self.limited_workers
):
self.limited_workers[doc["worker"]]["current"] -= 1
[docs]
def checkout(self) -> None:
"""Checkout READY Jobs."""
logger.debug("checkout jobs")
n_checked_out = 0
while True:
try:
reserved = self.job_controller.checkout_job()
if not reserved:
break
except Exception:
logger.exception("Error while checking out jobs")
break
n_checked_out += 1
logger.debug(f"checked out {n_checked_out} jobs")
[docs]
def refresh_num_current_jobs(self) -> None:
"""
Update the number of jobs currently running for worker with limited
number of Jobs.
"""
for name, state in self.limited_workers.items():
query = {
"state": {"$in": [JobState.SUBMITTED.value, JobState.RUNNING.value]},
"worker": name,
}
state["current"] = self.job_controller.count_jobs(query)
[docs]
def update_batch_jobs(self, submit: bool = True) -> None:
"""
Update the status of batch jobs.
Includes submitting to the remote queue, checking the status of
running jobs in the queue and handle the files with the Jobs information
about their status.
Parameters
----------
submit
Whether to submit new batch processes.
"""
logger.debug("update batch jobs")
for worker_name, batch_manager in self.batch_workers.items():
worker = self.get_worker(worker_name)
queue_manager = self.get_queue_manager(worker_name)
# first check the processes that are running from the folder
# and set them to running if needed
running_jobs = self.batch_update_running_jobs(
batch_manager, worker_name, worker
)
# Check the batch processes that should be running on the remote queue
# and update the state in the DB if something changed (both for batch processes and
# for the states of the jobs that were running in these batch processes)
running_batch_processes = self.batch_update_status(
batch_manager, queue_manager, worker_name, worker, running_jobs
)
# check that enough processes are submitted and submit the required
# amount to reach max_jobs, if needed.
if submit and (running_batch_processes is not None):
self.submit_batch_processes(
queue_manager, worker_name, worker, running_batch_processes
)
# check for jobs that have finished to run in the batch runner and
# update the DB state accordingly
self.batch_update_run_finished_jobs(batch_manager, worker_name, worker)
[docs]
def batch_update_running_jobs(
self, batch_manager: RemoteBatchManager, worker_name: str, worker
) -> list[tuple[str, int, str]]:
"""Update the status of running jobs.
Parameters
----------
batch_manager
Manager of remote files
worker_name
Name of the batch worker
worker
Batch worker
Returns
-------
list
The list of job ids, job indexes and batch unique ids in the host
running directory.
"""
logger.debug("update batch jobs: update running jobs")
batch_processes = self.job_controller.get_batches(
worker=worker_name,
batch_state=[BatchState.SUBMITTED, BatchState.RUNNING],
limit=0,
)
running_jobs = []
try:
running_jobs = batch_manager.get_running()
except Exception:
logger.warning(
f"error trying to get the list of batch running jobs for worker: {worker_name}",
exc_info=True,
)
# Set the process state to BATCH_RUNNING for those that are
# in the BATCH_SUBMITTED and started.
# This requires running the query at each check consider devising
# a cheaper option in terms of DB queries.
for job_id, job_index, batch_uid in running_jobs:
lock_filter = {
"uuid": job_id,
"index": job_index,
"state": JobState.BATCH_SUBMITTED.value,
}
with self.job_controller.lock_job_for_update(
query=lock_filter,
max_step_attempts=self.runner_options.max_step_attempts,
delta_retry=self.runner_options.delta_retry,
) as lock:
if lock.locked_document:
self.job_controller.update_job_in_batch(
job_id=job_id,
job_index=job_index,
batch_uid=batch_uid,
db_id=lock.locked_document.get("db_id"),
)
set_output = {
"$set": {
"state": JobState.BATCH_RUNNING.value,
"start_time": datetime.now(timezone.utc),
"remote.process_id": self.batch_get_process_id(
batch_processes, batch_uid, worker_name, worker
),
}
}
lock.update_on_release = set_output
return running_jobs
def _cache_batch(
self, worker_name: str, worker: WorkerBase, batch_uid: str, process_id: str
):
if worker_name not in self._cached_batches:
self._cached_batches[worker_name] = OrderedDict()
self._cached_batches[worker_name][batch_uid] = process_id
while len(self._cached_batches[worker_name]) > 2 * worker.max_jobs + 5:
self._cached_batches[worker_name].popitem(last=False)
[docs]
def batch_get_process_id(
self,
batch_processes: list,
batch_uid: str,
worker_name: str,
worker: WorkerBase,
) -> str | None:
# First, try to take from the cached batches info
if worker_batches := self._cached_batches.get(worker_name): # noqa: SIM102
if batch_uid in worker_batches:
return worker_batches[batch_uid]
# Then try to take it from the running batch processes
process_id = next(
(bp.process_id for bp in batch_processes if bp.batch_uid == batch_uid), None
)
if process_id is not None:
self._cache_batch(
worker_name=worker_name,
worker=worker,
batch_uid=batch_uid,
process_id=process_id,
)
return process_id
try:
process_id, wk_name = self.job_controller.get_batch_process_id(batch_uid)
if wk_name != worker_name:
raise RuntimeError("Wrong worker")
except MissingDocumentError:
# This situation should not normally occur unless multiple runners are active at once.
# Although running multiple runners is currently disallowed, we keep this fallback to
# handle unexpected cases, either due to a bug or if multiple runners are intentionally
# started by the user.
process_id = None
logger.warning(
f"error trying to get the process id and worker for batch with unique id: {batch_uid}",
exc_info=True,
)
return process_id
[docs]
def batch_update_status(
self,
batch_manager: RemoteBatchManager,
queue_manager: QueueManager,
worker_name: str,
worker: WorkerBase,
running_jobs: list[tuple[str, int, str]],
) -> list[str] | None:
"""Update the status of the batch processes.
This method
- removes the stopped (completed, cancelled or killed) batch processes from the database
- sets the state of the jobs that were running in these stopped batch processes upon batch process
termination to a remote error state
- deletes the corresponding "running" files in the batch handle directory
- sets an empty file for the stopped batch processes with
Parameters
----------
batch_manager
queue_manager
worker_name
worker
running_jobs
Returns
-------
list or None
List of running batch process ids (e.g. Slurm ids) or None if the list of jobs could not be retrieved
from the worker.
"""
logger.debug("update batch jobs: update status")
batch_processes_data = self.job_controller.get_batches(
worker=worker_name,
batch_state=[BatchState.SUBMITTED, BatchState.RUNNING],
limit=0,
)
if not batch_processes_data:
return []
processes = [batch_process.process_id for batch_process in batch_processes_data]
if processes:
try:
qjobs = queue_manager.get_jobs_list(
jobs=processes, user=worker.scheduler_username
)
running_processes = {
qjob.job_id for qjob in qjobs if qjob.job_id in processes
}
stopped_processes = set(processes) - running_processes
except Exception:
logger.warning(
f"error trying to get the list of batch processes for worker: {worker_name}",
exc_info=True,
)
# If the actual list of batch processes cannot be retrieved from the worker, return None
# and deal with that in the update_batch_jobs method
return None
for pid in running_processes:
if pid in self._cached_running_batch_pids.get(worker_name, set()):
continue
batch_uid = next(
(b.batch_uid for b in batch_processes_data if b.process_id == pid),
None,
)
if batch_uid is None:
logger.warning(
f"failed to find batch unique id for running batch with process id: {pid}",
)
continue
batch_dir = get_job_path(
job_id=batch_uid,
index=None,
base_path=worker.batch.work_dir,
)
batch_info = batch_manager.get_batch_info(
batch_dir, BATCH_INFO_FILENAME
)
start_time = batch_info.get("start_time", None) if batch_info else None
self.job_controller.set_running_batch_process(
batch_uid, start_time=start_time
)
if start_time:
if worker_name not in self._cached_running_batch_pids:
self._cached_running_batch_pids[worker_name] = set()
self._cached_running_batch_pids[worker_name].add(pid)
for pid in stopped_processes:
batch_uid = next(
(b.batch_uid for b in batch_processes_data if b.process_id == pid),
None,
)
if batch_uid is None:
logger.warning(
f"failed to find batch unique id for stopped batch with process id: {pid}",
)
continue
batch_dir = get_job_path(
job_id=batch_uid,
index=None,
base_path=worker.batch.work_dir,
)
batch_info = batch_manager.get_batch_info(
batch_dir, BATCH_INFO_FILENAME
)
if batch_info:
# If the batch process was killed abruptly, there may be no end_time in the batch info file
end_time = batch_info.get(
"end_time", batch_info.get("last_ping_time", None)
)
else:
end_time = None
self.job_controller.set_finished_batch_process(
batch_uid, end_time=end_time
)
if pid in self._cached_running_batch_pids.get(worker_name, set()):
self._cached_running_batch_pids[worker_name].remove(pid)
# check if there are jobs that were in the running folder of a
# process that finished and set them to remote error
for job_id, job_index, job_batch_uid in running_jobs:
if batch_uid == job_batch_uid:
lock_filter = {
"uuid": job_id,
"index": job_index,
"state": {
"$in": (
JobState.BATCH_SUBMITTED.value,
JobState.BATCH_RUNNING.value,
)
},
}
with self.job_controller.lock_job_for_update(
query=lock_filter,
max_step_attempts=self.runner_options.max_step_attempts,
delta_retry=self.runner_options.delta_retry,
) as lock:
if lock.locked_document:
err_msg = f"The batch process that was running the job (process_id: {pid}, uuid: {batch_uid} was likely killed before terminating the job execution"
# Note here that this RemoteError is caught by the locking mechanism
# The above loop will thus proceed normally
# This is done to automatically pass down the error message
raise RemoteError(err_msg, no_retry=True)
# Also remove the corresponding files from the running folder of batch manager
batch_manager.delete_running(batch_uid)
return list(running_processes)
return []
[docs]
def submit_batch_processes(
self,
queue_manager: QueueManager,
worker_name: str,
worker: WorkerBase,
running_batch_processes: list[str],
):
logger.debug("update batch jobs: submit batch processes")
dict_n_jobs = self.job_controller.count_jobs_states(
[JobState.BATCH_SUBMITTED, JobState.BATCH_RUNNING],
worker=worker_name,
)
n_jobs_submitted = dict_n_jobs[JobState.BATCH_SUBMITTED]
n_jobs_running = dict_n_jobs[JobState.BATCH_RUNNING]
n_processes_running = len(running_batch_processes)
n_parallel = worker.batch.parallel_jobs or 1
# TODO for n_parallel > 1 here a new job is submitted to the queue even if only
# one jobflow Job is available to run. This may result in the submitted job
# be partially empty. Would it be better to submit only if more jobs need to be run?
# (in that case a single job may remain dangling in the queue)
# The purpose of dealing with running and submitted jobs separately is to avoid
# submitting processes if a job remains stuck in a SBATCH_RUNNING state.
# In principle, it should not happen but if happening it will keep submitting
# batch processes to the queue.
available_jobs = max(
n_jobs_submitted
- max((n_processes_running * n_parallel) - n_jobs_running, 0),
0,
)
n_processes_to_submit = min(
max(worker.max_jobs - n_processes_running, 0),
math.ceil(available_jobs / n_parallel),
)
logger.debug(
f"submitting {n_processes_to_submit} batch jobs for worker {worker_name}"
)
for _ in range(n_processes_to_submit):
resources = worker.resources or {}
batch_uid = suuid()
remote_path = Path(get_job_path(batch_uid, None, worker.batch.work_dir))
qout_fpath = remote_path / OUT_FNAME
qerr_fpath = remote_path / ERR_FNAME
set_name_out(
resources,
f"batch_{batch_uid}",
out_fpath=qout_fpath,
err_fpath=qerr_fpath,
)
# note that here the worker.work_dir needs to be passed,
# not the worker.batch.work_dir
command = f"jf -fe execution run-batch {worker.work_dir} {worker.batch.jobs_handle_dir} {batch_uid}"
if worker.batch.max_jobs_per_batch:
command += f" -mj {worker.batch.max_jobs_per_batch}"
if worker.batch.max_time:
command += f" -mt {worker.batch.max_time}"
if worker.batch.max_wait:
command += f" -mw {worker.batch.max_wait}"
if worker.batch.parallel_jobs:
command += f" -pj {worker.batch.parallel_jobs}"
if worker.batch.sleep_time:
command += f" -st {worker.batch.sleep_time}"
submit_result = queue_manager.submit(
commands=[command],
pre_run=worker.pre_run,
post_run=worker.post_run,
options=resources,
work_dir=remote_path,
create_submit_dir=True,
)
if submit_result.status == SubmissionStatus.FAILED:
logger.error(f"submission failed. {submit_result!r}")
elif submit_result.status == SubmissionStatus.JOB_ID_UNKNOWN:
logger.error(
f"submission succeeded but ID not known. Job may be running but status cannot be checked. {submit_result!r}"
)
# TODO: should there be anything here to check with the uuid somehow on the worker ?
# More specifically, maybe we could just add the batch process to the batches collection with None as an ID ?
# Is it useful somehow ?
elif submit_result.status == SubmissionStatus.SUCCESSFUL:
logger.debug(
f"Batch job submitted to worker {worker_name} in folder {remote_path}. Queue id: {submit_result.job_id}"
)
self._cache_batch(worker_name, worker, batch_uid, submit_result.job_id)
self.job_controller.add_batch_process(
submit_result.job_id, batch_uid, worker_name
)
else:
logger.error(f"unhandled submission status {submit_result.status}")
[docs]
def batch_update_run_finished_jobs(self, batch_manager, worker_name, worker):
logger.debug("update batch jobs: update jobs which have finished to run")
run_finished_jobs = []
try:
run_finished_jobs = batch_manager.get_run_finished()
except Exception:
logger.warning(
f"error trying to get the list of batch jobs that have finished to run for worker: {worker_name}",
exc_info=True,
)
for job_id, job_index, batch_uid in run_finished_jobs:
lock_filter = {
"uuid": job_id,
"index": job_index,
"state": {
"$in": (
JobState.BATCH_SUBMITTED.value,
JobState.BATCH_RUNNING.value,
)
},
}
with self.job_controller.lock_job_for_update(
query=lock_filter,
max_step_attempts=self.runner_options.max_step_attempts,
delta_retry=self.runner_options.delta_retry,
) as lock:
if lock.locked_document:
if not worker.is_local:
next_state = JobState.RUN_FINISHED
else:
next_state = JobState.DOWNLOADED
set_output = {
"$set": {
"state": next_state.value,
"remote.process_id": self.batch_get_process_id(
[],
batch_uid,
worker_name,
worker,
),
}
}
self.job_controller.update_job_in_batch(
job_id=job_id,
job_index=job_index,
batch_uid=batch_uid,
db_id=lock.locked_document.get("db_id"),
)
lock.update_on_release = set_output
batch_manager.delete_run_finished([(job_id, job_index, batch_uid)])
[docs]
def ping_running_runner(self, run_options: dict | None = None):
ping_data = {
"daemon_id": self.daemon_id,
"runner_id": self.runner_id,
"project_name": self.project_name,
"hostname": self.runner_host,
"run_options": run_options,
"user": getpass.getuser(),
"daemon_dir": self.project.daemon_dir,
}
ping_result, runner_pings = self.job_controller.ping_running_runner(
data=ping_data
)
if not ping_result:
logger.info("Could not ping the running_runner document")
# The returned value include the latest ping from this process,
# Skip it for the first ping, as older pings may be in the DB.
if self.pinged_db and runner_pings and len(runner_pings) > 1:
prev_ping = runner_pings[-2]
daemon_id = prev_ping.get("daemon_id")
# note that here the remote daemon_id could be None, if it was not
# set. This would happen for example in case of runner started without
# a daemon. Warn anyway, since it can still lead to inconsistencies.
if daemon_id != self.daemon_id:
msg = f"A ping from a different set of Runner processes was found in the database: {prev_ping}"
logger.warning(msg)
if runner_pings:
self.pinged_db = True
[docs]
def cleanup(self) -> None:
"""Close all the connections after stopping the Runner."""
for worker_name, host in self.hosts.items():
try:
host.close()
except Exception:
logging.exception(
f"error while closing connection to worker {worker_name}"
)
try:
self.jobstore.close()
except Exception:
logging.exception("error while closing connection to jobstore")
if self.optional_jobstores:
for name, optional_jobstore in self.optional_jobstores.items():
try:
optional_jobstore.close()
except Exception:
logging.exception(
f"error while closing connection to optional jobstore {name}"
)
self.job_controller.close()