Source code for jobflow_remote.remote.host.base
from __future__ import annotations
import abc
import logging
import re
import traceback
from typing import TYPE_CHECKING
from monty.json import MSONable
if TYPE_CHECKING:
from pathlib import Path
logger = logging.getLogger(__name__)
SANITIZE_KEY = r"_-_-_-_-_### JFREMOTE SANITIZE ###_-_-_-_-_"
[docs]
class BaseHost(MSONable):
"""Base Host class."""
def __init__(self, sanitize: bool = False):
"""
Parameters
----------
sanitize
If True text a string will be prepended and appended to the output
of the commands, to ease the parsing and avoid failures due to spurious
text coming from the host shell.
"""
self.sanitize = sanitize
self._sanitize_regex: re.Pattern | None = None
[docs]
@abc.abstractmethod
def execute(
self,
command: str | list[str],
workdir: str | Path | None = None,
timeout: int | None = None,
) -> tuple[str, str, int]:
"""Execute the given command on the host.
Parameters
----------
command: str or list of str
Command to execute, as a str or list of str
workdir: str or None
path where the command will be executed.
timeout
Timeout for the execution of the commands.
"""
raise NotImplementedError
[docs]
@abc.abstractmethod
def mkdir(
self, directory: str | Path, recursive: bool = True, exist_ok: bool = True
) -> bool:
"""Create directory on the host."""
raise NotImplementedError
[docs]
@abc.abstractmethod
def write_text_file(self, filepath, content):
"""Write content to a file on the host."""
raise NotImplementedError
[docs]
@abc.abstractmethod
def connect(self):
raise NotImplementedError
[docs]
@abc.abstractmethod
def close(self) -> bool:
raise NotImplementedError
@property
@abc.abstractmethod
def is_connected(self) -> bool:
raise NotImplementedError
[docs]
@abc.abstractmethod
def put(self, src, dst):
raise NotImplementedError
[docs]
@abc.abstractmethod
def get(self, src, dst):
raise NotImplementedError
[docs]
@abc.abstractmethod
def copy(self, src, dst):
raise NotImplementedError
[docs]
def test(self) -> str | None:
msg = None
try:
cmd = "echo 'test'"
stdout, stderr, returncode = self.execute(cmd)
if returncode != 0:
msg = f"Command was executed but return code was different from zero.\nstdoud: {stdout}\nstderr: {stderr}"
elif stdout.strip() != "test" or stderr.strip() != "":
msg = (
"Command was executed but the output is not the expected one (i.e. a single 'test' "
f"string in both stdout and stderr).\nstdoud: {stdout}\nstderr: {stderr}"
)
if not self.sanitize:
msg += (
"\nIf the output contains additional text the problem may be solved by setting "
"the 'sanitize_command' option to True in the project configuration."
)
except Exception:
exc = traceback.format_exc()
msg = f"Error while executing command:\n {exc}"
return msg
[docs]
@abc.abstractmethod
def listdir(self, path: str | Path) -> list[str]:
raise NotImplementedError
[docs]
@abc.abstractmethod
def remove(self, path: str | Path):
raise NotImplementedError
[docs]
@abc.abstractmethod
def rmtree(self, path: str | Path, raise_on_error: bool = False) -> bool:
"""Recursively delete a directory tree on a host.
This method must be implemented by subclasses of `BaseHost`.
It is intended to remove an entire directory tree, including all files
and subdirectories, on the host represented by the subclass.
Parameters
----------
path : str or Path
The path to the directory tree to be removed.
raise_on_error : bool
If set to `False` (default), errors will be ignored, and the method will
attempt to continue removing remaining files and directories.
Otherwise, any errors encountered during the removal process
will raise an exception.
Returns
-------
bool
True if the directory tree was successfully removed, False otherwise.
"""
raise NotImplementedError
@property
def interactive_login(self) -> bool:
"""
True if the host requires interactive actions upon login.
False by default. Subclasses should override the method to customize the value.
"""
return False
@property
def sanitize_regex(self) -> re.Pattern:
"""
Regular expression to sanitize sensitive info in command outputs.
"""
if not self._sanitize_regex:
escaped_key = re.escape(SANITIZE_KEY)
# Optionally match the newline that comes from the "echo" command.
# The -n option for echo to suppress the newline seems to not be
# supported on all systems
self._sanitize_regex = re.compile(
f"{escaped_key}\r?\n?(.*?)(?:{escaped_key}\r?\n?|$)", re.DOTALL
)
return self._sanitize_regex
[docs]
def sanitize_command(self, cmd: str) -> str:
"""
Sanitizes a command by adding a prefix and suffix to the command string if
sanitization is enabled.
The prefix and suffix are the same and are used to mark the parts of the output
that should be sanitized. The prefix and suffix are defined by `SANITIZE_KEY`.
Parameters
----------
cmd
The command string to be sanitized
Returns
-------
str
The sanitized command string
"""
if self.sanitize:
echo_cmd = f'echo "{SANITIZE_KEY}" | tee /dev/stderr'
cmd = f"{echo_cmd};{cmd};{echo_cmd}"
return cmd
[docs]
def sanitize_output(self, output: str) -> str:
"""
Sanitizes the output of a command by selecting the section between the
SANITIZE_KEY strings.
If the second instance of the key is not found, the part of the output after the key is returned.
If the key is not present, the entire output is returned.
Parameters
----------
output
The output of the command to be sanitized
Returns
-------
str
The sanitized output
"""
if self.sanitize:
match = self.sanitize_regex.search(output)
if not match:
logger.warning(
f"Even if sanitization was required, there was no match for the output: {output}. Returning the complete output"
)
return output
return match.group(1)
return output
[docs]
class HostError(Exception):
pass