from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from qtoolkit.core.data_objects import CancelResult, QJob, QResources, SubmissionResult
from qtoolkit.io.shell import ShellIO
if TYPE_CHECKING:
from collections.abc import Sequence
from qtoolkit.io.base import BaseSchedulerIO
from jobflow_remote.remote.host import BaseHost
OUT_FNAME = "queue.out"
ERR_FNAME = "queue.err"
[docs]
def set_name_out(
resources: dict | QResources,
name: str,
out_fpath: str | Path = OUT_FNAME,
err_fpath: str | Path = ERR_FNAME,
) -> None:
# sanitize the name
name = name.replace(" ", "_")
if isinstance(resources, QResources):
resources.job_name = name
resources.output_filepath = out_fpath
resources.error_filepath = err_fpath
else:
resources["job_name"] = name
resources["qout_path"] = out_fpath
resources["qerr_path"] = err_fpath
[docs]
class QueueManager:
"""Base class for job queues.
Attributes
----------
scheduler_io : str
Name of the queue
host : BaseHost
Host where the command should be executed.
"""
def __init__(
self,
scheduler_io: BaseSchedulerIO,
host: BaseHost,
timeout_exec: int | None = None,
) -> None:
self.scheduler_io = scheduler_io
self.host = host
self.timeout_exec = timeout_exec
[docs]
def execute_cmd(
self, cmd: str, workdir: str | Path | None = None, timeout: int | None = None
):
"""Execute a command.
Parameters
----------
cmd : str
Command to be executed
workdir: str or None
path where the command will be executed.
Returns
-------
stdout : str
stderr : str
exit_code : int
"""
timeout = timeout if timeout is not None else self.timeout_exec
return self.host.execute(cmd, workdir, timeout)
[docs]
def get_submission_script(
self,
commands: str | list[str] | None,
options: dict | QResources | None = None,
work_dir: str | Path | None = None,
pre_run: str | list[str] | None = None,
post_run: str | list[str] | None = None,
export: dict | None = None,
modules: list[str] | None = None,
) -> str:
""" """
commands_list = []
if change_dir := self.get_change_dir(work_dir):
commands_list.append(change_dir)
if pre_run := self.get_pre_run(pre_run):
commands_list.append(pre_run)
if export_str := self.get_export(export):
commands_list.append(export_str)
if modules_str := self.get_modules(modules):
commands_list.append(modules_str)
if run_commands := self.get_run_commands(commands):
commands_list.append(run_commands)
if post_run := self.get_post_run(post_run):
commands_list.append(post_run)
return self.scheduler_io.get_submission_script(commands_list, options)
[docs]
def get_change_dir(self, dir_path: str | Path | None) -> str:
if dir_path:
return f"cd {dir_path}"
return ""
[docs]
def get_pre_run(self, pre_run: str | list[str] | None) -> str:
if isinstance(pre_run, (list, tuple)):
return "\n".join(pre_run)
return pre_run
[docs]
def get_export(self, exports: dict | None) -> str | None:
if not exports:
return None
exports_str = []
for k, v in exports.items():
exports_str.append(f"export {k}={v}")
return "\n".join(exports_str)
[docs]
def get_modules(self, modules: list[str] | None) -> str | None:
if not modules:
return None
modules_str = [f"module load {m}" for m in modules]
return "\n".join(modules_str)
[docs]
def get_run_commands(self, commands) -> str:
if isinstance(commands, str):
return commands
if isinstance(commands, list):
return "\n".join(commands)
raise ValueError("commands should be a str or a list of str.")
[docs]
def get_post_run(self, post_run: str | list[str] | None) -> str:
if isinstance(post_run, (list, tuple)):
return "\n".join(post_run)
return post_run
[docs]
def submit(
self,
commands: str | list[str] | None,
options=None,
work_dir=None,
pre_run: str | list[str] | None = None,
post_run: str | list[str] | None = None,
export: dict | None = None,
modules: list[str] | None = None,
script_fname="submit.sh",
create_submit_dir: bool = False,
timeout: int | None = None,
) -> SubmissionResult:
script_fpath = self.write_submission_script(
commands=commands,
options=options,
work_dir=work_dir,
pre_run=pre_run,
post_run=post_run,
export=export,
modules=modules,
script_fname=script_fname,
create_submit_dir=create_submit_dir,
)
submit_cmd = self.scheduler_io.get_submit_cmd(script_fpath)
stdout, stderr, returncode = self.execute_cmd(
submit_cmd, work_dir, timeout=timeout
)
return self.scheduler_io.parse_submit_output(
exit_code=returncode, stdout=stdout, stderr=stderr
)
[docs]
def write_submission_script(
self,
commands: str | list[str] | None,
options=None,
work_dir=None,
pre_run: str | list[str] | None = None,
post_run: str | list[str] | None = None,
export: dict | None = None,
modules: list[str] | None = None,
script_fname="submit.sh",
create_submit_dir: bool = False,
):
script_str = self.get_submission_script(
commands=commands,
options=options,
work_dir=work_dir,
pre_run=pre_run,
post_run=post_run,
export=export,
modules=modules,
)
if create_submit_dir and work_dir:
created = self.host.mkdir(work_dir, recursive=True, exist_ok=True)
if not created:
raise RuntimeError("failed to create directory")
script_fpath = Path(work_dir, script_fname) if work_dir else Path(script_fname)
self.host.write_text_file(script_fpath, script_str)
return script_fpath
[docs]
def cancel(self, job: QJob | int | str, timeout: int | None = None) -> CancelResult:
cancel_cmd = self.scheduler_io.get_cancel_cmd(job)
stdout, stderr, returncode = self.execute_cmd(cancel_cmd, timeout=timeout)
return self.scheduler_io.parse_cancel_output(
exit_code=returncode, stdout=stdout, stderr=stderr
)
[docs]
def get_job(self, job: QJob | int | str, timeout: int | None = None) -> QJob | None:
job_cmd = self.scheduler_io.get_job_cmd(job)
stdout, stderr, returncode = self.execute_cmd(job_cmd, timeout=timeout)
return self.scheduler_io.parse_job_output(
exit_code=returncode, stdout=stdout, stderr=stderr
)
[docs]
def get_jobs_list(
self,
jobs: Sequence[QJob | int | str] | None = None,
user: str | None = None,
timeout: int | None = None,
) -> list[QJob]:
job_cmd = self.scheduler_io.get_jobs_list_cmd(jobs, user)
stdout, stderr, returncode = self.execute_cmd(job_cmd, timeout=timeout)
return self.scheduler_io.parse_jobs_list_output(
exit_code=returncode, stdout=stdout, stderr=stderr
)
[docs]
def get_shell_manager(self):
return QueueManager(
scheduler_io=ShellIO(), host=self.host, timeout_exec=self.timeout_exec
)