from collections import defaultdict
from datetime import datetime
from enum import Enum
from functools import cached_property
from typing import Optional, Union
from jobflow import Flow, Job
from monty.json import jsanitize
from pydantic import BaseModel, Field
from qtoolkit.core.data_objects import QResources, QState
from jobflow_remote.config.base import ExecutionConfig
from jobflow_remote.jobs.state import FlowState, JobState
IN_FILENAME = "jfremote_in.json"
OUT_FILENAME = "jfremote_out.json"
[docs]
def get_initial_job_doc_dict(
job: Job,
parents: Optional[list[str]],
db_id: str,
worker: str,
exec_config: Optional[ExecutionConfig],
resources: Optional[Union[dict, QResources]],
) -> dict:
"""
Generate an instance of JobDoc for initial insertion in the DB.
Parameters
----------
job:
The Job of the JobDoc.
parents
The parents of the Job.
db_id
The db_id.
worker
The worker where the Job should be executed.
exec_config
The ExecutionConfig used for execution.
resources
The resources used to run the Job.
Returns
-------
JobDoc
A new JobDoc.
"""
from monty.json import jsanitize
# take the resources either from the job, if they are defined
# (they can be defined dynamically by the update_config) or the
# defined value
# Since the resources can be dictionaries also allow to be an empty
# dictionary.
config_resources = job.config.manager_config.get("resources")
job_resources = config_resources if config_resources is not None else resources
job_exec_config = job.config.manager_config.get("exec_config") or exec_config
worker = job.config.manager_config.get("worker") or worker
job_doc = JobDoc(
job=jsanitize(job, strict=True, enum_values=True),
uuid=job.uuid,
index=job.index,
db_id=db_id,
state=JobState.WAITING if parents else JobState.READY,
parents=parents,
worker=worker,
exec_config=job_exec_config,
resources=job_resources,
)
return job_doc.as_db_dict()
[docs]
def get_initial_flow_doc_dict(flow: Flow, job_dicts: list[dict]) -> dict:
"""
Generate a serialized FlowDoc for initial insertion in the DB.
Parameters
----------
flow
The Flow used to generate the FlowDoc.
job_dicts
The dictionaries of the Jobs composing the Flow.
Returns
-------
dict
A serialized version of a new FlowDoc.
"""
jobs = [j["uuid"] for j in job_dicts]
ids = [(j["db_id"], j["uuid"], j["index"]) for j in job_dicts]
parents = {j["uuid"]: {"1": j["parents"]} for j in job_dicts}
flow_doc = FlowDoc(
uuid=flow.uuid,
jobs=jobs,
state=FlowState.READY,
name=flow.name,
ids=ids,
parents=parents,
)
return flow_doc.as_db_dict()
[docs]
class RemoteInfo(BaseModel):
"""Model with data describing the remote state of a Job."""
step_attempts: int = 0
queue_state: Optional[QState] = None
process_id: Optional[str] = None
retry_time_limit: Optional[datetime] = None
error: Optional[str] = None
[docs]
class JobInfo(BaseModel):
"""
Model with information extracted from a JobDoc.
Mainly for visualization purposes.
"""
uuid: str
index: int
db_id: str
worker: str
name: str
state: JobState
created_on: datetime
updated_on: datetime
remote: RemoteInfo = RemoteInfo()
parents: Optional[list[str]] = None
previous_state: Optional[JobState] = None
error: Optional[str] = None
lock_id: Optional[str] = None
lock_time: Optional[datetime] = None
run_dir: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
priority: int = 0
metadata: Optional[dict] = None
@property
def is_locked(self) -> bool:
return self.lock_id is not None
@property
def run_time(self) -> Optional[float]:
"""
Calculate the run time based on start and end time.
Returns
-------
float
The run time in seconds
"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return None
@property
def estimated_run_time(self) -> Optional[float]:
"""
Estimate the current run time based on the start time and the current time.
Returns
-------
float
The estimated run time in seconds.
"""
if self.start_time:
return (
datetime.now(tz=self.start_time.tzinfo) - self.start_time
).total_seconds()
return None
[docs]
@classmethod
def from_query_output(cls, d) -> "JobInfo":
"""
Generate an instance from the output of a query to the JobDoc collection.
Parameters
----------
d
The dictionary with the queried data.
Returns
-------
JobInfo
The instance of JobInfo based on the data
"""
job = d.pop("job")
for k in ["name", "metadata"]:
d[k] = job[k]
return cls.model_validate(d)
def _projection_db_info() -> list[str]:
"""
Generate a list of fields used for projection, depending on the JobInfo model.
Returns
-------
list
The list of fields to use in a query.
"""
projection = list(JobInfo.model_fields)
projection.remove("name")
projection.append("job.name")
projection.append("job.metadata")
return projection
# generate the list only once.
projection_job_info = _projection_db_info()
[docs]
class JobDoc(BaseModel):
"""Model for the standard representation of a Job in the queue database."""
# TODO consider defining this as a dict and provide a get_job() method to
# get the real Job. This would avoid (de)serializing jobs if this document
# is used often to interact with the DB.
job: Job
uuid: str
index: int
db_id: str
worker: str
state: JobState
remote: RemoteInfo = RemoteInfo()
# only the uuid as list of parents for a JobDoc (i.e. uuid+index) is
# enough to determine the parents, since once a job with a uuid is
# among the parents, all the index will still be parents.
# Note that for just the uuid this condition is not true: JobDocs with
# the same uuid but different indexes may have different parents
parents: Optional[list[str]] = None
previous_state: Optional[JobState] = None
error: Optional[str] = None # TODO is there a better way to serialize it?
lock_id: Optional[str] = None
lock_time: Optional[datetime] = None
run_dir: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
created_on: datetime = Field(default_factory=datetime.utcnow)
updated_on: datetime = Field(default_factory=datetime.utcnow)
priority: int = 0
# store: Optional[JobStore] = None
exec_config: Optional[Union[ExecutionConfig, str]] = None
resources: Optional[Union[QResources, dict]] = None
stored_data: Optional[dict] = None
# history: Optional[list[str]] = None
[docs]
def as_db_dict(self) -> dict:
"""
Generate a dict representation suitable to be inserted in the database.
Returns
-------
dict
The dict representing the JobDoc.
"""
d = jsanitize(
self.model_dump(mode="python"),
strict=True,
allow_bson=True,
enum_values=True,
)
# required since the resources are not serialized otherwise
if isinstance(self.resources, QResources):
d["resources"] = self.resources.as_dict()
return d
[docs]
class FlowDoc(BaseModel):
"""Model for the standard representation of a Flow in the queue database."""
uuid: str
jobs: list[str]
state: FlowState
name: str
lock_id: Optional[str] = None
lock_time: Optional[datetime] = None
created_on: datetime = Field(default_factory=datetime.utcnow)
updated_on: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)
# parents need to include both the uuid and the index.
# When dynamically replacing a Job with a Flow some new Jobs will
# be parents of the job with index=i+1, but will not be parents of
# the job with index i.
# index is stored as string, since mongodb needs string keys
# This dictionary include {job uuid: {job index: [parent's uuids]}}
parents: dict[str, dict[str, list[str]]] = Field(default_factory=dict)
# ids correspond to db_id, uuid, index for each JobDoc
ids: list[tuple[str, str, int]] = Field(default_factory=list)
[docs]
def as_db_dict(self) -> dict:
"""
Generate a dict representation suitable to be inserted in the database.
Returns
-------
dict
The dict representing the FlowDoc.
"""
return jsanitize(
self.model_dump(mode="python"),
strict=True,
allow_bson=True,
enum_values=True,
)
@cached_property
def int_index_parents(self):
d = defaultdict(dict)
for child_id, index_parents in self.parents.items():
for index, parents in index_parents.items():
d[child_id][int(index)] = parents
return dict(d)
@cached_property
def children(self) -> dict[str, list[tuple[str, int]]]:
d = defaultdict(list)
for job_id, index_parents in self.parents.items():
for index, parents in index_parents.items():
for parent_id in parents:
d[parent_id].append((job_id, int(index)))
return dict(d)
[docs]
def descendants(self, job_uuid: str) -> list[tuple[str, int]]:
descendants = set()
def add_descendants(uuid) -> None:
children = self.children.get(uuid)
if children:
descendants.update(children)
for child in children:
add_descendants(child[0])
add_descendants(job_uuid)
return list(descendants)
@cached_property
def ids_mapping(self) -> dict[str, dict[int, str]]:
d: dict = defaultdict(dict)
for db_id, job_id, index in self.ids:
d[job_id][int(index)] = db_id
return dict(d)
[docs]
class RemoteError(RuntimeError):
"""An exception signaling errors during the update of the remote states."""
def __init__(self, msg, no_retry=False) -> None:
self.msg = msg
self.no_retry = no_retry
projection_flow_info_jobs = [
"db_id",
"uuid",
"index",
"state",
"job.name",
"worker",
"parents",
"job.hosts",
]
[docs]
class FlowInfo(BaseModel):
"""
Model with information extracted from a FlowDoc.
Mainly for visualization purposes.
"""
db_ids: list[str]
job_ids: list[str]
job_indexes: list[int]
flow_id: str
state: FlowState
name: str
created_on: datetime
updated_on: datetime
workers: list[str]
job_states: list[JobState]
job_names: list[str]
parents: list[list[str]]
hosts: list[list[str]]
[docs]
@classmethod
def from_query_dict(cls, d) -> "FlowInfo":
created_on = d["created_on"]
updated_on = d["updated_on"]
flow_id = d["uuid"]
jobs_data = d.get("jobs_list") or []
workers = []
job_states = []
job_names = []
parents = []
job_hosts = []
if jobs_data:
db_ids = []
job_ids = []
job_indexes = []
for job_doc in jobs_data:
db_ids.append(job_doc["db_id"])
job_ids.append(job_doc["uuid"])
job_indexes.append(job_doc["index"])
job_names.append(job_doc["job"]["name"])
state = job_doc["state"]
job_states.append(JobState(state))
workers.append(job_doc["worker"])
parents.append(job_doc["parents"] or [])
job_hosts.append(job_doc["job"]["hosts"] or [])
else:
db_ids, job_ids, job_indexes = list( # type:ignore[assignment]
zip(*d["ids"])
)
# parents could be determined in this case as well from the Flow document.
# However, to match the correct order it would require lopping over them.
# To keep the generation faster add this only if a use case shows up.
state = FlowState(d["state"])
return cls(
db_ids=db_ids,
job_ids=job_ids,
job_indexes=job_indexes,
flow_id=flow_id,
state=state,
name=d["name"],
created_on=created_on,
updated_on=updated_on,
workers=workers,
job_states=job_states,
job_names=job_names,
parents=parents,
hosts=job_hosts,
)
@cached_property
def ids_mapping(self) -> dict[str, dict[int, str]]:
d: dict = defaultdict(dict)
for db_id, job_id, index in zip(self.db_ids, self.job_ids, self.job_indexes):
d[job_id][int(index)] = db_id
return dict(d)
[docs]
def iter_job_prop(self):
n_jobs = len(self.job_ids)
for i in range(n_jobs):
d = {
"db_id": self.db_ids[i],
"uuid": self.job_ids[i],
"index": self.job_indexes[i],
}
if self.job_names:
d["name"] = self.job_names[i]
d["state"] = self.job_states[i]
d["parents"] = self.parents[i]
d["hosts"] = self.hosts[i]
yield d
[docs]
class DynamicResponseType(Enum):
"""Types of dynamic responses in jobflow."""
REPLACE = "replace"
DETOUR = "detour"
ADDITION = "addition"
[docs]
class DbCollection(Enum):
JOBS = "jobs"
FLOWS = "flows"
AUX = "aux"
[docs]
def get_reset_job_base_dict() -> dict:
"""
Generate a dictionary with the basic properties to update in case of reset.
Returns
-------
dict
Data to be reset.
"""
return {
"remote.step_attempts": 0,
"remote.retry_time_limit": None,
"previous_state": None,
"remote.queue_state": None,
"remote.error": None,
"error": None,
"updated_on": datetime.utcnow(),
"start_time": None,
"end_time": None,
}