Source code for jobflow_remote.config.helper

from __future__ import annotations

import logging
import traceback
from typing import TYPE_CHECKING

from jobflow_remote.config.base import (
    ExecutionConfig,
    LocalWorker,
    Project,
    RemoteWorker,
    WorkerBase,
)

if TYPE_CHECKING:
    from jobflow import JobStore
    from maggma.core import Store

    from jobflow_remote.remote.host import BaseHost

logger = logging.getLogger(__name__)


[docs] def generate_dummy_project(name: str, full: bool = False) -> Project: remote_worker = generate_dummy_worker(scheduler_type="slurm", host_type="remote") workers = {"example_worker": remote_worker} exec_config = {} if full: local_worker = generate_dummy_worker(scheduler_type="shell", host_type="local") workers["example_local"] = local_worker exec_config = {"example_config": generate_dummy_exec_config()} queue = {"store": generate_dummy_queue()} jobstore = generate_dummy_jobstore() return Project( name=name, jobstore=jobstore, queue=queue, workers=workers, exec_config=exec_config, )
[docs] def generate_dummy_worker( scheduler_type: str = "slurm", host_type: str = "remote" ) -> WorkerBase: d: dict = dict( scheduler_type=scheduler_type, work_dir="/path/to/run/folder", pre_run="source /path/to/python/environment/activate", ) if host_type == "local": d.update( type="local", timeout_execute=60, ) return LocalWorker(**d) if host_type == "remote": d.update( type="remote", host="remote.host.net", user="bob", timeout_execute=60, ) return RemoteWorker(**d) raise ValueError(f"Unknown/unhandled host type: {host_type}")
[docs] def generate_dummy_jobstore() -> dict: return { "docs_store": { "type": "MongoStore", "database": "db_name", "host": "host.mongodb.com", "port": 27017, "username": "bob", "password": "secret_password", "collection_name": "outputs", }, "additional_stores": { "data": { "type": "GridFSStore", "database": "db_name", "host": "host.mongodb.com", "port": 27017, "username": "bob", "password": "secret_password", "collection_name": "outputs_blobs", } }, }
[docs] def generate_dummy_exec_config() -> ExecutionConfig: return ExecutionConfig( modules=["GCC/10.2.0", "OpenMPI/4.0.5-GCC-10.2.0"], export={"PATH": "/path/to/binaries:$PATH"}, pre_run="conda activate env_name", )
[docs] def generate_dummy_queue() -> dict: return dict( type="MongoStore", host="localhost", database="db_name", username="bob", password="secret_password", collection_name="jobs", )
def _check_workdir(worker: WorkerBase, host: BaseHost) -> str | None: """Check that the configured workdir exists or is writable on the worker. Parameters ---------- worker: The worker configuration. host: A connected host. """ try: host_error = host.test() if host_error: return host_error except Exception: exc = traceback.format_exc() return f"Error while testing worker:\n {exc}" canary_file = worker.work_dir / ".jf_heartbeat" try: # First try to create the folder. The runner will create is anyway and # it should be less confusing for the user. host.mkdir(worker.work_dir) host.write_text_file(canary_file, "\n") return None # noqa: TRY300 except FileNotFoundError as exc: raise FileNotFoundError( f"Could not write to {canary_file}. Does the folder exist on the remote?\nThe folder should be specified as an absolute path with no shell expansions or environment variables." ) from exc except PermissionError as exc: raise PermissionError( f"Could not write to {canary_file}. Do you have the rights to access that folder?" ) from exc finally: # Must be enclosed in quotes with '!r' as the path may contain spaces host.execute(f"rm {str(canary_file)!r}")
[docs] def check_worker(worker: WorkerBase) -> str | None: """Check that a connection to the configured worker can be made.""" host = worker.get_host() try: host.connect() host_error = host.test() if host_error: return host_error from jobflow_remote.remote.queue import QueueManager qm = QueueManager(scheduler_io=worker.get_scheduler_io(), host=host) qm.get_jobs_list() _check_workdir(worker=worker, host=host) except Exception: exc = traceback.format_exc() return f"Error while testing worker:\n {exc}" finally: try: host.close() except Exception: logger.warning(f"error while closing connection to host {host}") return None
def _check_store(store: Store) -> str | None: try: store.connect() store.query_one() except Exception: return traceback.format_exc() finally: store.close() return None
[docs] def check_queue_store(queue_store: Store) -> str | None: err = _check_store(queue_store) if err: return f"Error while checking queue store:\n{err}" return None
[docs] def check_jobstore(jobstore: JobStore) -> str | None: err = _check_store(jobstore.docs_store) if err: return f"Error while checking docs_store store:\n{err}" for store_name, store in jobstore.additional_stores.items(): err = _check_store(store) if err: return f"Error while checking additional store {store_name}:\n{err}" return None