Source code for jobflow_remote.jobs.submit
from __future__ import annotations
from typing import TYPE_CHECKING
from jobflow.core.flow import get_flow
from jobflow_remote.config.base import ConfigError, ExecutionConfig
from jobflow_remote.config.manager import ConfigManager
from jobflow_remote.remote.data import check_additional_stores
if TYPE_CHECKING:
import jobflow
from qtoolkit.core.data_objects import QResources
[docs]
def submit_flow(
flow: jobflow.Flow | jobflow.Job | list[jobflow.Job],
worker: str | None = None,
project: str | None = None,
exec_config: str | ExecutionConfig | None = None,
resources: dict | QResources | None = None,
allow_external_references: bool = False,
) -> list[int]:
"""
Submit a flow for calculation to the selected Worker.
This will not start the calculation but just add to the database of the
calculation to be executed.
Parameters
----------
flow
A flow or job.
worker
The name of the Worker where the calculation will be submitted. If None, use the
first configured worker for this project.
project
the name of the project to which the Flow should be submitted. If None the
current project will be used.
exec_config: str or ExecutionConfig
the options to set before the execution of the job in the submission script.
In addition to those defined in the Worker.
resources: Dict or QResources
information passed to qtoolkit to require the resources for the submission
to the queue.
allow_external_references
If False all the references to other outputs should be from other Jobs
of the Flow.
Returns
-------
List of int
The list of db_ids of the submitted Jobs.
"""
config_manager = ConfigManager()
proj_obj = config_manager.get_project(project)
if worker is None:
if not proj_obj.workers:
raise ConfigError("No workers configured for this project.")
worker = next(iter(proj_obj.workers))
# try to load the worker and exec_config to check that the values are well defined
config_manager.get_worker(worker_name=worker, project_name=project)
if isinstance(exec_config, str):
config_manager.get_exec_config(
exec_config_name=exec_config, project_name=project
)
flow = get_flow(flow, allow_external_references=allow_external_references)
# check that all the additional stores are properly defined
jobstore = proj_obj.get_jobstore()
for job, _ in flow.iterflow():
missing_stores = check_additional_stores(job, jobstore)
if missing_stores:
raise ConfigError(
f"Additional stores {missing_stores!r} are not configured for this project."
)
jc = proj_obj.get_job_controller()
return jc.add_flow(
flow=flow,
worker=worker,
exec_config=exec_config,
resources=resources,
allow_external_references=allow_external_references,
)