Source code for jobflow_remote.cli.flow

from typing import Annotated, Optional

import typer
from jobflow.utils.graph import draw_graph
from rich.prompt import Confirm
from rich.text import Text

from jobflow_remote import SETTINGS
from jobflow_remote.cli.formatting import format_flow_info, get_flow_info_table
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
    days_opt,
    db_ids_opt,
    delete_all_opt,
    delete_files_opt,
    delete_output_opt,
    end_date_opt,
    flow_db_id_arg,
    flow_ids_opt,
    flow_state_opt,
    force_opt,
    hours_opt,
    job_flow_id_flag_opt,
    job_ids_opt,
    max_results_opt,
    name_opt,
    reverse_sort_flag_opt,
    sort_opt,
    start_date_opt,
    verbosity_opt,
)
from jobflow_remote.cli.utils import (
    SortOption,
    check_incompatible_opt,
    exit_with_error_msg,
    exit_with_warning_msg,
    get_job_controller,
    get_job_db_ids,
    get_start_date,
    loading_spinner,
    out_console,
)
from jobflow_remote.jobs.graph import get_graph, plot_dash

app_flow = JFRTyper(
    name="flow", help="Commands for managing the flows", no_args_is_help=True
)
app.add_typer(app_flow)


[docs] @app_flow.command(name="list") def flows_list( job_id: job_ids_opt = None, db_id: db_ids_opt = None, flow_id: flow_ids_opt = None, state: flow_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, name: name_opt = None, days: days_opt = None, hours: hours_opt = None, verbosity: verbosity_opt = 0, max_results: max_results_opt = 100, sort: sort_opt = SortOption.UPDATED_ON, reverse_sort: reverse_sort_flag_opt = False, ) -> None: """Get the list of Jobs in the database.""" check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours}) check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours}) jc = get_job_controller() start_date = get_start_date(start_date, days, hours) db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)] with loading_spinner(): flows_info = jc.get_flows_info( job_ids=job_id, db_ids=db_id, flow_ids=flow_id, states=state, start_date=start_date, end_date=end_date, name=name, limit=max_results, sort=db_sort, full=verbosity > 0, ) table = get_flow_info_table(flows_info, verbosity=verbosity) if SETTINGS.cli_suggestions and max_results and len(flows_info) == max_results: out_console.print( f"The number of Flows printed is limited by the maximum selected: {max_results}", style="yellow", ) out_console.print(table)
[docs] @app_flow.command() def delete( job_id: job_ids_opt = None, db_id: db_ids_opt = None, flow_id: flow_ids_opt = None, state: flow_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, name: name_opt = None, days: days_opt = None, hours: hours_opt = None, force: force_opt = False, max_limit: Annotated[ int, typer.Option( "--max", "-m", help=( "The Flows will be deleted only if the total number is lower than the specified limit. 0 means no limit" ), ), ] = 10, verbosity: Annotated[ int, typer.Option( "--verbosity", "-v", help="Print the list of Flows to be deleted when asking for confirmation. " "Multiple -v options increase the details on the flow. (e.g. -vvv)", count=True, ), ] = False, delete_output: delete_output_opt = False, delete_files: delete_files_opt = False, delete_all: delete_all_opt = False, ) -> None: """Permanently delete Flows from the database""" check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours}) check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours}) if delete_all: delete_files = delete_output = True start_date = get_start_date(start_date, days, hours) jc = get_job_controller() # At variance with flows_list, for the amount of details to be printed, # the verbosity value will be decreased by one: the first is to enable # initial print with loading_spinner(processing=False) as progress: progress.add_task(description="Fetching data...", total=None) flows_info = jc.get_flows_info( job_ids=job_id, db_ids=db_id, flow_ids=flow_id, states=state, start_date=start_date, end_date=end_date, name=name, full=verbosity > 1, ) if not flows_info: exit_with_warning_msg("No flows matching criteria") if flows_info and not force: if verbosity: preamble = Text.from_markup( f"[red]This operation will [bold]delete the following {len(flows_info)} Flow(s)[/bold][/red]" ) out_console.print(preamble) table = get_flow_info_table(flows_info, verbosity=verbosity - 1) out_console.print(table) text = Text.from_markup("[red]Proceed anyway?[/red]") else: text = Text.from_markup( f"[red]This operation will [bold]delete {len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]" ) confirmed = Confirm.ask(text, default=False) if not confirmed: raise typer.Exit(0) to_delete = [fi.flow_id for fi in flows_info] with loading_spinner(processing=False) as progress: progress.add_task(description="Deleting flows...", total=None) jc.delete_flows( flow_ids=to_delete, delete_output=delete_output, delete_files=delete_files, max_limit=max_limit, ) out_console.print( f"Deleted Flow(s) with id: {', '.join(str(i) for i in to_delete)}" )
[docs] @app_flow.command(name="info") def flow_info( flow_db_id: flow_db_id_arg, job_id_flag: job_flow_id_flag_opt = False, ) -> None: """Provide detailed information on a Flow.""" db_id, jf_id = get_job_db_ids(flow_db_id, None) db_ids = job_ids = flow_ids = None if db_id is not None: db_ids = [db_id] elif job_id_flag: job_ids = [jf_id] else: flow_ids = [jf_id] with loading_spinner(): jc = get_job_controller() flows_info = jc.get_flows_info( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, limit=1, full=True, ) if not flows_info: exit_with_error_msg("No data matching the request") out_console.print(format_flow_info(flows_info[0]))
[docs] @app_flow.command() def graph( flow_db_id: flow_db_id_arg, job_id_flag: job_flow_id_flag_opt = False, label: Annotated[ Optional[str], typer.Option( "--label", "-l", help="The label used to identify the nodes", ), ] = "name", file_path: Annotated[ Optional[str], typer.Option( "--path", "-p", help="If defined, the graph will be dumped to a file", ), ] = None, dash_plot: Annotated[ bool, typer.Option( "--dash", "-d", help="Show the graph in a dash app", ), ] = False, print_mermaid: Annotated[ bool, typer.Option( "--mermaid", "-m", help="Print the mermaid graph", ), ] = False, ) -> None: """Provide detailed information on a Flow.""" check_incompatible_opt({"dash": dash_plot, "mermaid": print_mermaid}) db_id, jf_id = get_job_db_ids(flow_db_id, None) db_ids = job_ids = flow_ids = None if db_id is not None: db_ids = [db_id] elif job_id_flag: job_ids = [jf_id] else: flow_ids = [jf_id] with loading_spinner(): jc = get_job_controller() flows_info = jc.get_flows_info( job_ids=job_ids, db_ids=db_ids, flow_ids=flow_ids, limit=1, full=True, ) if not flows_info: exit_with_error_msg("No data matching the request") if print_mermaid: from jobflow_remote.jobs.graph import get_mermaid print(get_mermaid(flows_info[0])) elif dash_plot: plot_dash(flows_info[0]) else: plt = draw_graph(get_graph(flows_info[0], label=label)) if file_path: plt.savefig(file_path) else: plt.show()