Source code for jobflow_remote.config.manager

from __future__ import annotations

import glob
import logging
import os
import shutil
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple

import tomlkit
from monty.json import jsanitize
from monty.os import makedirs_p
from monty.serialization import dumpfn, loadfn

from jobflow_remote.config.base import (
    ConfigError,
    ExecutionConfig,
    Project,
    ProjectUndefinedError,
    WorkerBase,
)
from jobflow_remote.utils.data import deep_merge_dict

if TYPE_CHECKING:
    from jobflow import JobStore
    from maggma.stores import MongoStore

logger = logging.getLogger(__name__)


[docs] class ProjectData(NamedTuple): filepath: str | Path project: Project ext: str
[docs] class WorkerData(NamedTuple): name: str worker: str
[docs] class ConfigManager: """ A manager for the projects configuration files. Provides tool to parse project information from the selected projects folder as well as methods to update the properties of each project. """ projects_ext = ("json", "yaml", "toml") def __init__( self, exclude_unset: bool = False, exclude_none: bool = False, warn: bool = False, ) -> None: """ Parameters ---------- exclude_unset when dumping projects determine whether fields which were not explicitly set when creating the model should be excluded from the dictionary exclude_none when dumping projects determine whether fields which are equal to None should be excluded from the dictionary warn if True print warnings related to the parsing of the files in the projects folder """ from jobflow_remote import SETTINGS self.exclude_unset = exclude_unset self.exclude_none = exclude_none self.warn = warn self.projects_folder = Path(SETTINGS.projects_folder) self.projects_data = self.load_projects_data() @property def projects(self) -> dict[str, Project]: """ Returns ------- dict Dictionary with project name as key and Project as value. """ return {name: pd.project for name, pd in self.projects_data.items()}
[docs] def load_projects_data(self) -> dict[str, ProjectData]: """ Load projects from the selected projects folder. Returns ------- dict Dictionary with project name as key and ProjectData as value. """ projects_data: dict[str, ProjectData] = {} for ext in self.projects_ext: for filepath in self.projects_folder.glob(str(f"*.{ext}")): try: if ext in ["json", "yaml"]: d = loadfn(filepath) else: with open(filepath) as f: d = tomlkit.parse(f.read()) project = Project.parse_obj(d) except Exception: if self.warn: logger.warning( f"File {filepath} could not be parsed as a Project. Error: {traceback.format_exc()}" ) continue if project.name in projects_data: msg = f"Two projects with the same name '{project.name}' have been defined: {filepath}, {projects_data[project.name].filepath}" raise ConfigError(msg) projects_data[project.name] = ProjectData(filepath, project, ext) return projects_data
[docs] def select_project_name(self, project_name: str | None = None) -> str: """ Determine the project name to be used based on the passed value and on the general settings. Parameters ---------- project_name The name of the project or None to use the value from the settings Returns ------- str The name of the selected project. """ from jobflow_remote import SETTINGS project_name = project_name or SETTINGS.project if not project_name: if len(self.projects_data) == 1: project_name = next(iter(self.projects_data)) else: raise ProjectUndefinedError( f"A project name should be defined, known projects: {list(self.projects_data)}" ) return project_name
[docs] def get_project_data(self, project_name: str | None = None) -> ProjectData: """ Get the ProjectData object based from the project name. Parameters ---------- project_name The name of the project or None to use the value from the settings Returns ------- ProjectData The selected ProjectData """ project_name = self.select_project_name(project_name) if project_name not in self.projects_data: raise ConfigError( f"The selected project {project_name} does not exist " "or could not be parsed correctly" ) return self.projects_data[project_name]
[docs] def get_project(self, project_name: str | None = None) -> Project: """ Get the Project object based from the project name. Parameters ---------- project_name The name of the project or None to use the value from the settings Returns ------- Project The selected Project """ return self.get_project_data(project_name).project
[docs] def dump_project(self, project_data: ProjectData) -> None: """ Dump the project to filepath specified in the ProjectData. Parameters ---------- project_data The project data to be dumped """ exclude_none = True if project_data.ext == "toml" else self.exclude_none d = jsanitize( project_data.project.dict( exclude_none=exclude_none, exclude_unset=self.exclude_unset ), enum_values=True, ) if project_data.ext in ["json", "yaml"]: dumpfn(d, project_data.filepath) elif project_data.ext == "toml": with open(project_data.filepath, "w") as f: tomlkit.dump(d, f)
[docs] def create_project(self, project: Project, ext="yaml") -> None: """ Create a new Project in the project folder by dumping the project to file. Parameters ---------- project The data of the project to be created. ext The extension of the file to which the project will be dumped (yaml, json or toml) """ if project.name in self.projects_data: raise ConfigError(f"Project with name {project.name} already exists") makedirs_p(project.base_dir) makedirs_p(project.tmp_dir) makedirs_p(project.log_dir) filepath = self.projects_folder / f"{project.name}.{ext}" if filepath.exists(): raise ConfigError( f"Project with name {project.name} does not exist, but file {filepath!s} does" ) project_data = ProjectData(filepath, project, ext) self.dump_project(project_data) self.projects_data[project.name] = project_data
[docs] def remove_project(self, project_name: str, remove_folders: bool = True) -> None: """ Remove a project from the projects folder. Parameters ---------- project_name Name of the project to be removed. remove_folders Optionally remove the folders related to the project (e.g. tmp, log). """ if project_name not in self.projects_data: return project_data = self.projects_data.pop(project_name) if remove_folders: shutil.rmtree(project_data.project.base_dir, ignore_errors=True) os.remove(project_data.filepath)
[docs] def update_project(self, config: dict, project_name: str) -> None: """ Update the project values. The passed dict with values will be recursively merged in the current project. Parameters ---------- config Dictionary with the project values to be updated. project_name Name of the project to be updated """ project_data = self.projects_data.pop(project_name) proj_dict = project_data.project.dict() new_project = Project.parse_obj(deep_merge_dict(proj_dict, config)) project_data = ProjectData(project_data.filepath, new_project, project_data.ext) self.dump_project(project_data) self.projects_data[project_data.project.name] = project_data
[docs] def project_names_from_files(self) -> list[str]: """ Parses all the prasable files and only checks for the "name" attribute to return a list of potential project file names. Useful in case some projects cannot be properly parsed, but the full list needs to be returned. Returns ------- list List of project names. """ project_names = [] for ext in self.projects_ext: for filepath in glob.glob(str(self.projects_folder / f"*.{ext}")): try: if ext in ["json", "yaml"]: d = loadfn(filepath) else: with open(filepath) as f: d = tomlkit.parse(f.read()) if "name" in d: project_names.append(d["name"]) except Exception: logger.warning( f"File {filepath} could not be parsed as a Project. Error: {traceback.format_exc()}" ) continue return project_names
[docs] def set_worker( self, name: str, worker: WorkerBase, project_name: str | None = None, replace: bool = False, ) -> None: """ Set a worker in the selected project. Can add a new worker or replace an existing one. Parameters ---------- name Name of the worker to be added or replaced. worker Worker to be set. project_name Name of the project where the Worker is set, or None to use the one from the settings. replace Raise an exception if False and a Worker with the chosen name already exists. """ project_data = self.get_project_data(project_name) if not replace and name in project_data.project.workers: raise ConfigError(f"Worker with name {name} is already defined") project_data.project.workers[name] = worker self.dump_project(project_data)
[docs] def remove_worker(self, worker_name: str, project_name: str | None = None) -> None: """ Remove a worker from the selected project. Parameters ---------- worker_name Name of the worker to be removed project_name Name of the project from which the Worker should be removed, or None to use the one from the settings. """ project_data = self.get_project_data(project_name) project_data.project.workers.pop(worker_name) self.dump_project(project_data)
[docs] def get_worker( self, worker_name: str, project_name: str | None = None ) -> WorkerBase: """ Return the worker object based on the name. Parameters ---------- worker_name Name of the worker to retrieve. project_name Name of the project from which the Worker should be retrieved, or None to use the one from the settings. Returns ------- WorkerBase The selected Worker. """ project = self.get_project(project_name) if worker_name not in project.workers: raise ConfigError(f"Worker with name {worker_name} is not defined") return project.workers[worker_name]
[docs] def set_queue_db(self, store: MongoStore, project_name: str | None = None) -> None: """ Set the project specific store used for managing the queue. Parameters ---------- store A maggma Store project_name Name of the project where the Store is set, or None to use the one from the settings. """ project_data = self.get_project_data(project_name) project_data.project.queue = store.as_dict() self.dump_project(project_data)
[docs] def set_jobstore(self, jobstore: JobStore, project_name: str | None = None) -> None: """ Set the project specific store used for jobflow. Parameters ---------- jobstore A maggma Store project_name Name of the project where the Store is set, or None to use the one from the settings. """ project_data = self.get_project_data(project_name) project_data.project.jobstore = jobstore.as_dict() self.dump_project(project_data)
[docs] def set_exec_config( self, exec_config_name: str, exec_config: ExecutionConfig, project_name: str | None = None, replace: bool = False, ) -> None: """ Set an ExecutionConfig in the selected project. Can add a new ExecutionConfig or replace an existing one. Parameters ---------- exec_config_name Name of the ExecutionConfig to be added or replaced. exec_config The ExecutionConfig. project_name Name of the project where the ExecutionConfig is set, or None to use the one from the settings. replace Raise an exception if False and an ExecutionConfig with the chosen name already exists. """ project_data = self.get_project_data(project_name) if not replace and exec_config_name in project_data.project.exec_config: raise ConfigError(f"Host with name {exec_config_name} is already defined") project_data.project.exec_config[exec_config_name] = exec_config self.dump_project(project_data)
[docs] def remove_exec_config( self, exec_config_name: str, project_name: str | None = None ) -> None: """ Remove an ExecutionConfig from the selected project. Parameters ---------- exec_config_name Name of the ExecutionConfig to be removed project_name Name of the project from which the ExecutionConfig should be removed, or None to use the one from the settings. """ project_data = self.get_project_data(project_name) project_data.project.exec_config.pop(exec_config_name, None) self.dump_project(project_data)
[docs] def get_exec_config( self, exec_config_name: str, project_name: str | None = None ) -> ExecutionConfig: """ Return the ExecutionConfig object based on the name. Parameters ---------- exec_config_name Name of the ExecutionConfig. project_name Name of the project from which the ExecutionConfig should be retrieved, or None to use the one from the settings. Returns ------- ExecutionConfig The selected ExecutionConfig """ project = self.get_project(project_name) if exec_config_name not in project.exec_config: raise ConfigError( f"ExecutionConfig with id {exec_config_name} is not defined" ) return project.exec_config[exec_config_name]