Source code for jobflow_remote.jobs.graph

from __future__ import annotations

from typing import TYPE_CHECKING

from jobflow_remote.jobs.state import JobState

if TYPE_CHECKING:
    from networkx import DiGraph

    from jobflow_remote.jobs.data import FlowInfo


[docs] def get_graph(flow: FlowInfo, label: str = "name") -> DiGraph: import networkx as nx graph = nx.DiGraph() ids_mapping = flow.ids_mapping # Add nodes for job_prop in flow.iter_job_prop(): db_id = job_prop["db_id"] job_prop["label"] = job_prop[label] # change this as the "name" is used in jobflow's graph plotting util job_prop["job_name"] = job_prop.pop("name") graph.add_node(db_id, **job_prop) # Add edges based on parents for child_node, parents in zip(flow.db_ids, flow.parents): for parent_uuid in parents: for parent_node in ids_mapping[parent_uuid].values(): graph.add_edge(parent_node, child_node) return graph
[docs] def get_graph_elements(flow: FlowInfo): ids_mapping = flow.ids_mapping nodes = {} hosts = {} for job_prop in flow.iter_job_prop(): db_id = job_prop["db_id"] nodes[db_id] = job_prop hosts[job_prop["db_id"]] = job_prop["hosts"] # edges based on parents edges = [ (parent_node, child_node) for child_node, parents in zip(flow.db_ids, flow.parents) for parent_uuid in parents for parent_node in ids_mapping[parent_uuid].values() ] # connections between replacements replace_edges = [] for dct in flow.ids_mapping.values(): # skip index 1 for job_index in sorted(dct)[1:]: job_db_id = dct[job_index] previous_index_db_id = dct[job_index - 1] # likely never entering here, because if a single job is used as a # "replace" it still goes through a get_flow in jobflow in prepare_replace. if hosts[job_db_id][0] == hosts[previous_index_db_id][0]: replace_edges.append((previous_index_db_id, job_db_id)) else: replace_edges.append((previous_index_db_id, hosts[job_db_id][0])) return nodes, edges, hosts, replace_edges
[docs] def plot_dash(flow: FlowInfo): nodes, edges, hosts, replace_edges = get_graph_elements(flow) import dash_cytoscape as cyto from dash import Dash, Input, Output, callback, html app = Dash(f"{flow.name} - {flow.flow_id}") elements: list[dict] = [] # parent elements hosts_hierarchy = {} jobs_inner_hosts = {} hosts_set = set() for db_id, job_hosts in hosts.items(): job_hosts = list(reversed(job_hosts)) # noqa: PLW2901 if len(job_hosts) < 2: continue for i, host in enumerate(job_hosts[1:-1], 1): hosts_hierarchy[job_hosts[i + 1]] = host hosts_set.update(job_hosts[1:]) jobs_inner_hosts[db_id] = job_hosts[-1] elements += [ {"data": {"id": host, "parent": hosts_hierarchy.get(host)}} for host in hosts_set ] for db_id, node_info in nodes.items(): node_info["id"] = str(db_id) node_info["label"] = node_info["name"] node_info["parent"] = jobs_inner_hosts.get(db_id) elements.append( { "data": node_info, } ) elements += [ {"data": {"source": str(edge[0]), "target": str(edge[1])}} for edge in edges ] for edge in replace_edges: elements.append( { "data": {"source": str(edge[0]), "target": str(edge[1])}, "classes": "dashed", } ) stylesheet: list[dict] = [ { "selector": f'[state = "{state}"]', "style": { "background-color": color, }, } for state, color in COLOR_MAPPING.items() ] stylesheet.append( { "selector": "node", "style": { "label": "data(name)", }, } ) stylesheet.append( { "selector": "node:parent", "style": { "background-opacity": 0.2, "background-color": "#2B65EC", "border-color": "#2B65EC", }, } ) stylesheet.append( { "selector": ".dashed", "style": { "line-style": "dashed", }, } ) app.layout = html.Div( [ cyto.Cytoscape( id="flow-graph", layout={"name": "breadthfirst", "directed": True}, # layout={'name': 'cose'}, style={"width": "100%", "height": "500px"}, elements=elements, stylesheet=stylesheet, ), html.P(id="job-info-output"), ] ) @callback( Output("job-info-output", "children"), Input("flow-graph", "mouseoverNodeData") ) def displayTapNodeData(data): if data: return str(data) return None app.run(debug=True)
[docs] def get_mermaid(flow: FlowInfo, show_subflows: bool = True): nodes, edges, hosts, replace_edges = get_graph_elements(flow) from monty.collections import tree hosts_hierarchy = tree() for db_id, job_hosts in hosts.items(): d = hosts_hierarchy for host in reversed(job_hosts): d = d[host] d[db_id] = None lines = ["flowchart TD"] # add style classes for state, color in COLOR_MAPPING.items(): # this could be optimised by compressing in one line and using a # same class for states with the same color lines.append(f" classDef {state} fill:{color}") # add edges for parent_db_id, child_db_id in edges: parent = nodes[parent_db_id] child = nodes[child_db_id] line = ( f" {parent_db_id}({parent['name']}) --> {child_db_id}({child['name']})" ) lines.append(line) # add replace edges for parent_db_id, child_id in replace_edges: line = f" {parent_db_id}({nodes[parent_db_id]['name']}) -.-> {child_id}" lines.append(line) subgraph_styles = [] # add subgraphs def add_subgraph(nested_hosts_hierarchy, indent_level=0) -> None: prefix = " " * indent_level if show_subflows else " " for ref_id in sorted(nested_hosts_hierarchy, key=lambda x: str(x)): subhosts = nested_hosts_hierarchy[ref_id] if subhosts: if indent_level > 0 and show_subflows: # don't put any title lines.append(f"{prefix}subgraph {ref_id}[ ]") subgraph_styles.append( f" style {ref_id} fill:#2B65EC,opacity:0.2" ) add_subgraph(subhosts, indent_level=indent_level + 1) if indent_level > 0 and show_subflows: lines.append(f"{prefix}end") else: job = nodes[ref_id] lines.append(f"{prefix}{ref_id}:::{job['state'].value}") add_subgraph(hosts_hierarchy) lines.extend(subgraph_styles) return "\n".join(lines)
BLUE_COLOR = "#5E6BFF" RED_COLOR = "#fC3737" COLOR_MAPPING = { JobState.WAITING.value: "#aaaaaa", JobState.READY.value: "#DAF7A6", JobState.CHECKED_OUT.value: BLUE_COLOR, JobState.UPLOADED.value: BLUE_COLOR, JobState.SUBMITTED.value: BLUE_COLOR, JobState.RUNNING.value: BLUE_COLOR, JobState.TERMINATED.value: BLUE_COLOR, JobState.DOWNLOADED.value: BLUE_COLOR, JobState.REMOTE_ERROR.value: RED_COLOR, JobState.COMPLETED.value: "#47bf00", JobState.FAILED.value: RED_COLOR, JobState.PAUSED.value: "#EAE200", JobState.STOPPED.value: RED_COLOR, JobState.USER_STOPPED.value: RED_COLOR, JobState.BATCH_SUBMITTED.value: BLUE_COLOR, JobState.BATCH_RUNNING.value: BLUE_COLOR, }