Source code for jobflow_remote.config.jobconfig

from __future__ import annotations

from typing import TYPE_CHECKING, Callable

from jobflow_remote.config import ConfigManager

if TYPE_CHECKING:
    from jobflow import Flow, Job, JobStore
    from qtoolkit.core.data_objects import QResources

    from jobflow_remote.config.base import ExecutionConfig


[docs] def set_run_config( flow_or_job: Flow | Job, name_filter: str = None, function_filter: Callable = None, exec_config: str | ExecutionConfig | None = None, resources: dict | QResources | None = None, worker: str | None = None, dynamic: bool = True, ) -> Flow | Job: """ Modify in place a Flow or a Job by setting the properties in the "manager_config" entry in the JobConfig associated to each Job matching the filter. The values left as None will not be set and their value may be filled by those set in the submit_flow. If you want/need to leave resources or exec_config empty pass an empty QResources (or an empty dict) or an empty ExecutionConfig. Uses the Flow/Job update_config() method, so follows the same conventions, also setting the options in the config_updates of the Job, to allow setting the same properties also in dynamically generated Jobs, if the dynamic option is True. Note that calling this function will override all previously set values for matching Jobs, even if not specified here. Parameters ---------- flow_or_job A Flow or a Job to be modified name_filter A filter for the job name. Only jobs with a matching name will be updated. Includes partial matches, e.g. "ad" will match a job with the name "adder". function_filter A filter for the job function. Only jobs with a matching function will be updated. exec_config The execution configuration to be added to the selected Jobs. resources The resources to be set for the selected Jobs. worker The worker where the selected Jobs will be executed. dynamic The updates will be propagated to Jobs/Flows dynamically generated at runtime. Returns ------- Flow or Job The modified object. """ if not exec_config and not resources and not worker: return flow_or_job config: dict = {"manager_config": {}} if exec_config is not None: config["manager_config"]["exec_config"] = exec_config if resources is not None: config["manager_config"]["resources"] = resources if worker is not None: config["manager_config"]["worker"] = worker flow_or_job.update_config( config=config, name_filter=name_filter, function_filter=function_filter, dynamic=dynamic, ) return flow_or_job
[docs] def load_job_store(project: str | None = None) -> JobStore: """ Load the JobStore for the current project. Parameters ---------- project Returns ------- """ cm = ConfigManager() p = cm.get_project(project) return p.get_jobstore()