from __future__ import annotations
import getpass
import logging
import subprocess
import time
from enum import Enum
from pathlib import Path
from string import Template
from typing import Callable
from xmlrpc.client import Fault
import psutil
from monty.os import makedirs_p
from supervisor import childutils, states, xmlrpc
from supervisor.compat import xmlrpclib
from supervisor.options import ClientOptions
from supervisor.states import RUNNING_STATES, STOPPED_STATES, ProcessStates
from supervisor.supervisorctl import Controller, fgthread
from supervisor.xmlrpc import Faults
from jobflow_remote.config import ConfigManager, Project
logger = logging.getLogger(__name__)
supervisord_conf_str = """
[unix_http_server]
file=$sock_file
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisord]
logfile=$log_file
logfile_maxbytes=10MB
logfile_backups=5
loglevel=$loglevel
pidfile=$pid_file
nodaemon=$nodaemon
[supervisorctl]
serverurl=unix://$sock_file
[program:runner_daemon]
priority=100
command=jf -p $project runner run -pid -log $loglevel $connect_interactive
autostart=true
autorestart=false
numprocs=1
process_name=run_jobflow%(process_num)s
stopwaitsecs=86400
"""
supervisord_conf_str_split = """
[unix_http_server]
file=$sock_file
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisord]
logfile=$log_file
logfile_maxbytes=10MB
logfile_backups=5
loglevel=$loglevel
pidfile=$pid_file
nodaemon=$nodaemon
[supervisorctl]
serverurl=unix://$sock_file
[program:runner_daemon_checkout]
priority=100
command=jf -p $project runner run -pid --checkout -log $loglevel $connect_interactive
autostart=true
autorestart=false
numprocs=1
process_name=run_jobflow_checkout
stopwaitsecs=86400
[program:runner_daemon_transfer]
priority=100
command=jf -p $project runner run -pid --transfer -log $loglevel $connect_interactive
autostart=true
autorestart=false
numprocs=$num_procs_transfer
process_name=run_jobflow_transfer%(process_num)s
stopwaitsecs=86400
[program:runner_daemon_queue]
priority=100
command=jf -p $project runner run -pid --queue -log $loglevel $connect_interactive
autostart=true
autorestart=false
numprocs=1
process_name=run_jobflow_queue
stopwaitsecs=86400
[program:runner_daemon_complete]
priority=100
command=jf -p $project runner run -pid --complete -log $loglevel $connect_interactive
autostart=true
autorestart=false
numprocs=$num_procs_complete
process_name=run_jobflow_complete%(process_num)s
stopwaitsecs=86400
"""
[docs]
class DaemonError(Exception):
pass
[docs]
class DaemonStatus(Enum):
SHUT_DOWN = "SHUT_DOWN"
STOPPED = "STOPPED"
STOPPING = "STOPPING"
PARTIALLY_RUNNING = "PARTIALLY_RUNNING"
STARTING = "STARTING"
RUNNING = "RUNNING"
[docs]
class DaemonManager:
conf_template_single = Template(supervisord_conf_str)
conf_template_split = Template(supervisord_conf_str_split)
def __init__(
self,
daemon_dir: str | Path,
log_dir: str | Path,
project: Project,
) -> None:
self.project = project
self.daemon_dir = Path(daemon_dir).absolute()
self.log_dir = Path(log_dir).absolute()
[docs]
@classmethod
def from_project(cls, project: Project):
daemon_dir = project.daemon_dir
log_dir = project.log_dir
return cls(daemon_dir, log_dir, project)
[docs]
@classmethod
def from_project_name(cls, project_name: str | None = None):
config_manager = ConfigManager()
project = config_manager.get_project(project_name)
return cls.from_project(project)
@property
def conf_filepath(self) -> Path:
return self.daemon_dir / "supervisord.conf"
@property
def pid_filepath(self) -> Path:
return self.daemon_dir / "supervisord.pid"
@property
def log_filepath(self) -> Path:
return self.log_dir / "supervisord.log"
@property
def sock_filepath(self) -> Path:
path = self.daemon_dir / "s.sock"
if len(str(path)) > 97:
msg = f"socket path {path} is too long for UNIX systems. Set the daemon_dir value "
"in the project configuration so that the socket path is shorter"
raise DaemonError(msg)
return path
[docs]
def clean_files(self) -> None:
self.pid_filepath.unlink(missing_ok=True)
self.sock_filepath.unlink(missing_ok=True)
[docs]
def get_interface(self):
env = {
"SUPERVISOR_SERVER_URL": f"unix://{self.sock_filepath!s}",
"SUPERVISOR_USERNAME": "",
"SUPERVISOR_PASSWORD": "",
}
return childutils.getRPCInterface(env)
[docs]
def get_supervisord_pid(self) -> int | None:
pid_fp = self.pid_filepath
if not pid_fp.is_file():
return None
try:
with open(pid_fp) as f:
pid = int(f.read().strip())
except ValueError:
logger.warning(f"The pid file {pid_fp} could not be parsed")
return None
return pid
[docs]
def check_supervisord_process(self) -> bool:
pid = self.get_supervisord_pid()
running = True
if pid is None:
running = False
try:
process = psutil.Process(pid)
for cmdline_element in process.cmdline():
if cmdline_element.endswith("supervisord"):
break
else:
running = False
if process.username() != psutil.Process().username():
logger.warning(
f"pid {pid} is running supervisord, but belongs to a different user"
)
running = False
except psutil.NoSuchProcess:
running = False
if not running and pid is not None:
logger.warning(
f"Process with pid {pid} is not running but daemon files are present. Cleaning them up."
)
self.clean_files()
return running
[docs]
def check_status(self) -> DaemonStatus:
process_active = self.check_supervisord_process()
if not process_active:
return DaemonStatus.SHUT_DOWN
if not self.sock_filepath.is_socket():
raise DaemonError(
"the supervisord process is alive, but the socket is missing"
)
interface = self.get_interface()
try:
proc_info = interface.supervisor.getAllProcessInfo()
except Fault as exc:
# catch this exception as it may be raised if the status is queried while
# the supervisord process is shutting down. The error is quite cryptic, so
# replace with one that is clearer. Also see a related issue in supervisord:
# https://github.com/Supervisor/supervisor/issues/48
if exc.faultString == "SHUTDOWN_STATE":
raise DaemonError(
"The daemon is likely shutting down and the actual state cannot be determined"
) from exc
raise
if not proc_info:
raise DaemonError(
"supervisord process is running but no daemon process is present"
)
if all(pi.get("state") in RUNNING_STATES for pi in proc_info):
if any(pi.get("state") == ProcessStates.STARTING for pi in proc_info):
return DaemonStatus.STARTING
return DaemonStatus.RUNNING
if any(pi.get("state") in RUNNING_STATES for pi in proc_info):
return DaemonStatus.PARTIALLY_RUNNING
if all(pi.get("state") in STOPPED_STATES for pi in proc_info):
return DaemonStatus.STOPPED
if all(
pi.get("state") in (ProcessStates.STOPPED, ProcessStates.STOPPING)
for pi in proc_info
):
return DaemonStatus.STOPPING
raise DaemonError("Could not determine the current status of the daemon")
[docs]
def get_processes_info(self) -> dict[str, dict] | None:
process_active = self.check_supervisord_process()
if not process_active:
return None
pids = {
"supervisord": {
"pid": self.get_supervisord_pid(),
"statename": "RUNNING",
"state": ProcessStates.RUNNING,
"group": None,
}
}
if not self.sock_filepath.is_socket():
raise DaemonError(
"the supervisord process is alive, but the socket is missing"
)
interface = self.get_interface()
proc_info = interface.supervisor.getAllProcessInfo()
if not proc_info:
raise DaemonError(
"supervisord process is running but no daemon process is present"
)
for pi in proc_info:
name = f"{pi.get('group')}:{pi.get('name')}"
pids[name] = pi
return pids
[docs]
def write_config(
self,
num_procs_transfer: int = 1,
num_procs_complete: int = 1,
single: bool = True,
log_level: str = "info",
nodaemon: bool = False,
connect_interactive: bool = False,
) -> None:
if single:
conf = self.conf_template_single.substitute(
sock_file=str(self.sock_filepath),
pid_file=str(self.pid_filepath),
log_file=str(self.log_filepath),
nodaemon="true" if nodaemon else "false",
project=self.project.name,
loglevel=log_level,
connect_interactive=(
"--connect-interactive" if connect_interactive else ""
),
)
else:
conf = self.conf_template_split.substitute(
sock_file=str(self.sock_filepath),
pid_file=str(self.pid_filepath),
log_file=str(self.log_filepath),
num_procs_transfer=num_procs_transfer,
num_procs_complete=num_procs_complete,
nodaemon="true" if nodaemon else "false",
project=self.project.name,
loglevel=log_level,
connect_interactive=(
"--connect-interactive" if connect_interactive else ""
),
)
with open(self.conf_filepath, "w") as f:
f.write(conf)
[docs]
def start_supervisord(
self,
num_procs_transfer: int = 1,
num_procs_complete: int = 1,
single: bool = True,
log_level: str = "info",
nodaemon: bool = False,
connect_interactive: bool = False,
) -> str | None:
makedirs_p(self.daemon_dir)
makedirs_p(self.log_dir)
self.write_config(
num_procs_transfer=num_procs_transfer,
num_procs_complete=num_procs_complete,
single=single,
log_level=log_level,
nodaemon=nodaemon,
connect_interactive=connect_interactive,
)
cp = subprocess.run(
f"supervisord -c {self.conf_filepath!s}",
shell=True,
capture_output=True,
text=True,
check=False,
)
if cp.returncode != 0:
return f"Error staring the supervisord process. stdout: {cp.stdout}. stderr: {cp.stderr}"
# TODO check if actually started?
return None
[docs]
def start_processes(self) -> str | None:
interface = self.get_interface()
result = interface.supervisor.startAllProcesses()
if not result:
return "No process started"
failed = [r for r in result if r.get("status") == Faults.SUCCESS]
if len(failed) == 0:
return None
if len(failed) != len(result):
msg = "Not all the daemon processes started correctly. Details: \n"
for f in failed:
msg += f" - {f.get('description')}\n"
return msg
return None
[docs]
def start(
self,
num_procs_transfer: int = 1,
num_procs_complete: int = 1,
single: bool = True,
log_level: str = "info",
raise_on_error: bool = False,
connect_interactive: bool = False,
) -> bool:
status = self.check_status()
if status == DaemonStatus.RUNNING:
error = "Daemon process is already running"
elif status == DaemonStatus.SHUT_DOWN:
error = self.start_supervisord(
num_procs_transfer=num_procs_transfer,
num_procs_complete=num_procs_complete,
single=single,
log_level=log_level,
connect_interactive=connect_interactive,
)
elif status == DaemonStatus.STOPPED:
self.shut_down(raise_on_error=raise_on_error)
error = self.start_supervisord(
num_procs_transfer=num_procs_transfer,
num_procs_complete=num_procs_complete,
single=single,
log_level=log_level,
connect_interactive=connect_interactive,
)
# else:
# error = self.start_processes()
elif status == DaemonStatus.STOPPING:
error = "Daemon process are stopping. Cannot start."
else:
error = f"Daemon status {status} could not be handled"
if error is not None:
if raise_on_error:
raise DaemonError(error)
logger.error(error)
return False
return True
[docs]
def stop(self, wait: bool = False, raise_on_error: bool = False) -> bool:
status = self.check_status()
if status in (
DaemonStatus.STOPPED,
DaemonStatus.STOPPING,
DaemonStatus.SHUT_DOWN,
):
return True
if status in (DaemonStatus.RUNNING, DaemonStatus.PARTIALLY_RUNNING):
interface = self.get_interface()
if wait:
result = interface.supervisor.stopAllProcesses()
else:
result = interface.supervisor.signalAllProcesses(15)
error = self._verify_call_result(result, "stop", raise_on_error)
return error is None
raise DaemonError(f"Daemon status {status} could not be handled")
def _verify_call_result(
self, result, action: str, raise_on_error: bool = False
) -> str | None:
error = None
if not result:
error = f"The action {action} was not applied to the processes"
else:
failed = [r for r in result if r.get("status") == Faults.SUCCESS]
if len(failed) != len(result):
error = f"The action {action} was not applied to all the processes. Details: \n"
for f in failed:
error += f" - {f.get('description')}\n"
if error is not None:
if raise_on_error:
raise DaemonError(error)
logger.error(error)
return error
return None
[docs]
def kill(self, raise_on_error: bool = False) -> bool:
# If the daemon is shutting down supervisord may not be able to identify
# the state. Try proceeding in that case, since we really want to kill
# the process
status = None
try:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is not running. No process is running")
return True
if status == DaemonStatus.STOPPED:
logger.info("Processes are already stopped.")
return True
except DaemonError as e:
msg = (
f"Error while determining the state of the runner: {getattr(e, 'message', str(e))}."
f"Proceeding with the kill command."
)
logger.warning(msg)
if status in (
None,
DaemonStatus.RUNNING,
DaemonStatus.STOPPING,
DaemonStatus.PARTIALLY_RUNNING,
):
interface = self.get_interface()
result = interface.supervisor.signalAllProcesses(9)
error = self._verify_call_result(result, "kill", raise_on_error)
return error is None
raise DaemonError(f"Daemon status {status} could not be handled")
[docs]
def shut_down(self, raise_on_error: bool = False) -> bool:
status = self.check_status()
if status == DaemonStatus.SHUT_DOWN:
logger.info("supervisord is already shut down.")
return True
interface = self.get_interface()
try:
interface.supervisor.shutdown()
except Exception:
if raise_on_error:
raise
return False
return True
[docs]
def wait_start(self, timeout: int = 30) -> None:
time_limit = time.time() + timeout
while True:
processes_info = self.get_processes_info()
all_started = True
for name, proc_info in processes_info.items():
if proc_info["state"] not in RUNNING_STATES:
raise DaemonError(f"Process {name} is not in a running state")
if proc_info["state"] != ProcessStates.RUNNING:
all_started = False
break
if all_started:
break
if time.time() > time_limit:
raise DaemonError(
f"The processes did not start within {timeout} seconds"
)
time.sleep(2)
[docs]
def foreground_processes(
self,
processes_names: list | None = None,
print_function: Callable | None = None,
) -> None:
processes_info = self.get_processes_info()
if processes_names is None:
processes_names = [pn for pn in processes_info if pn != "supervisord"]
for name in processes_names:
if name not in processes_info:
raise ValueError(
f"Process with name {name} is not among the available processes"
)
if processes_info[name]["state"] != ProcessStates.RUNNING:
raise RuntimeError(
f"Process {name} is not running. Cannot attach to it."
)
self.foreground_process(name, print_function)
[docs]
def foreground_process(self, name, print_function: Callable | None = None) -> None:
# This is adapted from supervisor.supervisorctl.DefaultControllerPlugin.do_fg
a = None
if print_function is None:
print_function = print
try:
ctl = self.get_controller()
supervisor = ctl.get_supervisor()
print_function(f"Entering foreground for process {name} (CTRL+C to exit)")
# this thread takes care of the output/error messages
a = fgthread(name, ctl)
a.start()
# this takes care of the user input
while True:
# Always avoid echoing the output. It may not be a password, but since
# the daemon process cannot control the choice here, it is safer to
# hide everything
# inp = raw_input() + '\n'
inp = getpass.getpass("") + "\n"
try:
supervisor.sendProcessStdin(name, inp)
except xmlrpclib.Fault as e:
if e.faultCode == xmlrpc.Faults.NOT_RUNNING:
print_function("Process got killed")
else:
print_function("ERROR: " + str(e))
print_function("Exiting foreground")
a.kill()
return
info = supervisor.getProcessInfo(name)
if info["state"] != states.ProcessStates.RUNNING:
print_function("Process got killed")
print_function("Exiting foreground")
a.kill()
return
except (KeyboardInterrupt, EOFError):
print_function("Exiting foreground")
if a:
a.kill()
[docs]
def get_controller(self):
options = ClientOptions()
args = ["-c", self.conf_filepath]
options.realize(args, doc="")
return Controller(options)