Source code for jobflow_remote.jobs.data

from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from functools import cached_property
from typing import Any

from jobflow import Flow, Job
from monty.json import jsanitize
from pydantic import BaseModel, Field, field_validator
from qtoolkit.core.data_objects import QResources, QState

from jobflow_remote.config.base import ExecutionConfig
from jobflow_remote.jobs.state import (
    BatchState,
    DeprecatedStateError,
    FlowState,
    JobState,
)

IN_FILENAME = "jfremote_in.json"
OUT_FILENAME = "jfremote_out.json"
BATCH_INFO_FILENAME = "batch_info.json"


[docs] def get_initial_job_doc_dict( job: Job, parents: list[str] | None, db_id: str, worker: str, exec_config: ExecutionConfig | None, resources: dict | QResources | None, priority: int, ) -> dict: """ Generate an instance of JobDoc for initial insertion in the DB. Parameters ---------- job: The Job of the JobDoc. parents The parents of the Job. db_id The db_id. worker The worker where the Job should be executed. exec_config The ExecutionConfig used for execution. resources The resources used to run the Job. priority The priority of the Job. Returns ------- JobDoc A new JobDoc. """ # take the resources either from the job, if they are defined # (they can be defined dynamically by the update_config) or the # defined value # Since the resources can be dictionaries also allow to be an empty # dictionary. config_resources = job.config.manager_config.get("resources") job_resources = config_resources if config_resources is not None else resources job_exec_config = job.config.manager_config.get("exec_config") or exec_config worker = job.config.manager_config.get("worker") or worker priority = job.config.manager_config.get("priority") or priority job_doc = JobDoc( job=job, uuid=job.uuid, index=job.index, db_id=db_id, state=JobState.WAITING if parents else JobState.READY, parents=parents, worker=worker, exec_config=job_exec_config, resources=job_resources, priority=priority, ) return job_doc.as_db_dict()
[docs] def get_initial_flow_doc_dict( flow: Flow, job_dicts: list[dict], jobstore: str | None = None ) -> dict: """ Generate a serialized FlowDoc for initial insertion in the DB. Parameters ---------- flow The Flow used to generate the FlowDoc. job_dicts The dictionaries of the Jobs composing the Flow. jobstore The name of the JobStore used for the output of the submitted Flow. If None the default is used. Returns ------- dict A serialized version of a new FlowDoc. """ jobs = [j["uuid"] for j in job_dicts] ids = [(j["db_id"], j["uuid"], j["index"]) for j in job_dicts] parents = {j["uuid"]: {"1": j["parents"]} for j in job_dicts} flow_doc = FlowDoc( uuid=flow.uuid, jobs=jobs, state=FlowState.READY, name=flow.name, ids=ids, parents=parents, metadata=flow.metadata or {}, jobstore=jobstore, ) return flow_doc.as_db_dict()
[docs] def get_initial_batch_doc_dict(batch_uid, process_id, worker): """ Generate a serialized FlowDoc for initial insertion in the DB. Parameters ---------- flow The Flow used to generate the FlowDoc. job_dicts The dictionaries of the Jobs composing the Flow. jobstore The name of the JobStore used for the output of the submitted Flow. If None the default is used. Returns ------- dict A serialized version of a new FlowDoc. """ batch_doc = BatchDoc( batch_uid=batch_uid, process_id=process_id, batch_state=BatchState.SUBMITTED, worker=worker, ) return batch_doc.as_db_dict()
[docs] class RemoteInfo(BaseModel): """Model with data describing the remote state of a Job.""" step_attempts: int = 0 queue_state: QState | None = None process_id: str | None = None retry_time_limit: datetime | None = None error: str | None = None prerun_cleanup: bool = False queue_out: str | None = None queue_err: str | None = None
def _validate_job_state(value: Any) -> Any: """ Shared validation logic for JobState field. Raises DeprecatedStateError if applicable, otherwise lets Pydantic handle standard Enum validation. """ try: JobState(value) except DeprecatedStateError: raise except Exception: pass return value
[docs] class JobInfo(BaseModel): """ Model with information extracted from a JobDoc. Mainly for visualization purposes. """ uuid: str index: int db_id: str worker: str name: str state: JobState created_on: datetime updated_on: datetime remote: RemoteInfo = RemoteInfo() parents: list[str] | None = None previous_state: JobState | None = None error: str | None = None lock_id: str | None = None lock_time: datetime | None = None run_dir: str | None = None start_time: datetime | None = None end_time: datetime | None = None priority: int = 0 metadata: dict | None = None stored_data: dict | None = None hosts: list[str] | None = None @property def is_locked(self) -> bool: return self.lock_id is not None @property def run_time(self) -> float | None: """ Calculate the run time based on start and end time. Returns ------- float The run time in seconds """ if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return None @property def estimated_run_time(self) -> float | None: """ Estimate the current run time based on the start time and the current time. Returns ------- float The estimated run time in seconds. """ if self.start_time: # needs to be timezone aware to compute the difference start_time = self.start_time.replace(tzinfo=timezone.utc) return (datetime.now(timezone.utc) - start_time).total_seconds() return None
[docs] @classmethod def from_query_output(cls, d) -> "JobInfo": """ Generate an instance from the output of a query to the JobDoc collection. Parameters ---------- d The dictionary with the queried data. Returns ------- JobInfo The instance of JobInfo based on the data """ job = d.pop("job") for k in ["name", "metadata", "hosts"]: d[k] = job[k] return cls.model_validate(d)
[docs] @field_validator("state", mode="before") @classmethod def state_validator(cls, value): return _validate_job_state(value)
[docs] @field_validator("previous_state", mode="before") @classmethod def previous_state_validator(cls, value): return _validate_job_state(value)
def _projection_db_info() -> list[str]: """ Generate a list of fields used for projection, depending on the JobInfo model. Returns ------- list The list of fields to use in a query. """ projection = list(JobInfo.model_fields) projection.remove("name") projection.append("job.name") projection.append("job.metadata") projection.append("job.hosts") return projection # generate the list only once. projection_job_info = _projection_db_info()
[docs] class JobDoc(BaseModel): """Model for the standard representation of a Job in the queue database.""" # TODO consider defining this as a dict and provide a get_job() method to # get the real Job. This would avoid (de)serializing jobs if this document # is used often to interact with the DB. job: Job uuid: str index: int db_id: str worker: str state: JobState remote: RemoteInfo = RemoteInfo() # only the uuid as list of parents for a JobDoc (i.e. uuid+index) is # enough to determine the parents, since once a job with a uuid is # among the parents, all the index will still be parents. # Note that for just the uuid this condition is not true: JobDocs with # the same uuid but different indexes may have different parents parents: list[str] | None = None previous_state: JobState | None = None error: str | None = None # TODO is there a better way to serialize it? lock_id: str | None = None lock_time: datetime | None = None run_dir: str | None = None start_time: datetime | None = None end_time: datetime | None = None created_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) priority: int = 0 exec_config: ExecutionConfig | str | None = None resources: QResources | dict | None = None stored_data: dict | None = None
[docs] def as_db_dict(self) -> dict: """ Generate a dict representation suitable to be inserted in the database. Returns ------- dict The dict representing the JobDoc. """ # split the serialization of the job since Enums should stay enums # in the job inputs and they are not pydantic models that reconstructs # them during deserialization. dump = self.model_dump(mode="python") job = dump.pop("job") d = jsanitize( dump, strict=True, allow_bson=True, enum_values=True, ) d["job"] = jsanitize(job, strict=True, allow_bson=True) # required since the resources are not serialized otherwise if isinstance(self.resources, QResources): d["resources"] = self.resources.as_dict() return d
[docs] @field_validator("state", mode="before") @classmethod def state_validator(cls, value): return _validate_job_state(value)
[docs] @field_validator("previous_state", mode="before") @classmethod def previous_state_validator(cls, value): return _validate_job_state(value)
[docs] class FlowDoc(BaseModel): """Model for the standard representation of a Flow in the queue database.""" uuid: str jobs: list[str] state: FlowState name: str lock_id: str | None = None lock_time: datetime | None = None created_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) metadata: dict = Field(default_factory=dict) # parents need to include both the uuid and the index. # When dynamically replacing a Job with a Flow some new Jobs will # be parents of the job with index=i+1, but will not be parents of # the job with index i. # index is stored as string, since mongodb needs string keys # This dictionary include {job uuid: {job index: [parent's uuids]}} parents: dict[str, dict[str, list[str]]] = Field(default_factory=dict) # ids correspond to db_id, uuid, index for each JobDoc ids: list[tuple[str, str, int]] = Field(default_factory=list) jobstore: str | None = None
[docs] def as_db_dict(self) -> dict: """ Generate a dict representation suitable to be inserted in the database. Returns ------- dict The dict representing the FlowDoc. """ return jsanitize( self.model_dump(mode="python"), strict=True, allow_bson=True, enum_values=True, )
@cached_property def int_index_parents(self): d = defaultdict(dict) for child_id, index_parents in self.parents.items(): for index, parents in index_parents.items(): d[child_id][int(index)] = parents return dict(d) @cached_property def children(self) -> dict[str, list[tuple[str, int]]]: d = defaultdict(list) for job_id, index_parents in self.parents.items(): for index, parents in index_parents.items(): for parent_id in parents: d[parent_id].append((job_id, int(index))) return dict(d)
[docs] def descendants(self, job_uuid: str) -> list[tuple[str, int]]: descendants = set() def add_descendants(uuid) -> None: children = self.children.get(uuid) if children: descendants.update(children) for child in children: add_descendants(child[0]) add_descendants(job_uuid) return list(descendants)
@cached_property def ids_mapping(self) -> dict[str, dict[int, str]]: d: dict = defaultdict(dict) for db_id, job_id, index in self.ids: d[job_id][int(index)] = db_id return dict(d)
[docs] class BatchDoc(BaseModel): """Model for the standard representation of a batch process in the batch database.""" batch_uid: str process_id: str batch_state: BatchState worker: str jobs: list = Field(default_factory=list) created_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_on: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) start_time: datetime | None = None end_time: datetime | None = None last_ping_time: datetime | None = None
[docs] def as_db_dict(self) -> dict: """ Generate a dict representation suitable to be inserted in the database. Returns ------- dict The dict representing the BatchDoc. """ return jsanitize( self.model_dump(mode="python"), strict=True, allow_bson=True, enum_values=True, )
[docs] class RemoteError(RuntimeError): """An exception signaling errors during the update of the remote states.""" def __init__(self, msg, no_retry=False) -> None: self.msg = msg self.no_retry = no_retry
projection_flow_info_jobs = [ "db_id", "uuid", "index", "state", "job.metadata", "job.name", "worker", "parents", "job.hosts", "created_on", "updated_on", ]
[docs] class FlowInfo(BaseModel): """ Model with information extracted from a FlowDoc. Mainly for visualization purposes. """ db_ids: list[str] job_ids: list[str] job_indexes: list[int] flow_id: str state: FlowState name: str created_on: datetime updated_on: datetime workers: list[str] job_states: list[JobState] job_names: list[str] parents: list[list[str]] hosts: list[list[str]] flow_metadata: dict jobs_info: list[JobInfo] | None = None
[docs] @classmethod def from_query_dict(cls, d) -> "FlowInfo": created_on = d["created_on"] updated_on = d["updated_on"] flow_id = d["uuid"] jobs_data = d.get("jobs_list") or [] workers = [] job_states = [] job_names = [] parents = [] job_hosts = [] jobs_info = [] if jobs_data: db_ids = [] job_ids = [] job_indexes = [] for job_doc in jobs_data: db_ids.append(job_doc["db_id"]) job_ids.append(job_doc["uuid"]) job_indexes.append(job_doc["index"]) job_names.append(job_doc["job"]["name"]) state = job_doc["state"] job_states.append(JobState(state)) workers.append(job_doc["worker"]) parents.append(job_doc["parents"] or []) job_hosts.append(job_doc["job"]["hosts"] or []) jobs_info.append(JobInfo.from_query_output(job_doc)) else: db_ids, job_ids, job_indexes = list( # type:ignore[assignment] zip(*d["ids"], strict=True) ) # parents could be determined in this case as well from the Flow document. # However, to match the correct order it would require lopping over them. # To keep the generation faster add this only if a use case shows up. state = FlowState(d["state"]) return cls( db_ids=db_ids, job_ids=job_ids, job_indexes=job_indexes, flow_id=flow_id, state=state, name=d["name"], created_on=created_on, updated_on=updated_on, workers=workers, job_states=job_states, job_names=job_names, parents=parents, hosts=job_hosts, flow_metadata=d["metadata"], jobs_info=jobs_info or None, )
@cached_property def ids_mapping(self) -> dict[str, dict[int, str]]: d: dict = defaultdict(dict) for db_id, job_id, index in zip( self.db_ids, self.job_ids, self.job_indexes, strict=True ): d[job_id][int(index)] = db_id return dict(d)
[docs] def iter_job_prop(self): n_jobs = len(self.job_ids) for i in range(n_jobs): d = { "db_id": self.db_ids[i], "uuid": self.job_ids[i], "index": self.job_indexes[i], } if self.job_names: d["name"] = self.job_names[i] d["state"] = self.job_states[i] d["parents"] = self.parents[i] d["hosts"] = self.hosts[i] yield d
[docs] class DynamicResponseType(Enum): """Types of dynamic responses in jobflow.""" REPLACE = "replace" DETOUR = "detour" ADDITION = "addition"
[docs] class DbCollection(Enum): JOBS = "jobs" FLOWS = "flows" AUX = "aux"
[docs] def get_reset_job_base_dict() -> dict: """ Generate a dictionary with the basic properties to update in case of reset. Returns ------- dict Data to be reset. """ return { "remote.step_attempts": 0, "remote.retry_time_limit": None, "previous_state": None, "remote.queue_state": None, "remote.error": None, "remote.queue_out": None, "remote.queue_err": None, "error": None, "updated_on": datetime.now(timezone.utc), "start_time": None, "end_time": None, "stored_data": None, }