import contextlib
from datetime import datetime
from typing import Annotated
import typer
from dateutil.tz import tzlocal
from jobflow.utils.graph import draw_graph
from rich.prompt import Confirm
from rich.text import Text
from jobflow_remote.cli.formatting import (
format_flow_info,
get_flow_info_table,
get_flow_report_components,
get_single_flow_report_components,
header_name_data_getter_map,
)
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
break_lock_opt,
cli_output_keys_opt,
count_opt,
days_opt,
db_ids_opt,
delete_all_opt,
delete_files_opt,
delete_output_opt,
end_date_opt,
flow_db_id_arg,
flow_ids_opt,
flow_state_opt,
force_opt_deprecated,
hours_opt,
job_flow_id_flag_opt,
job_ids_opt,
locked_flow_opt,
max_results_opt,
metadata_opt,
name_opt,
reverse_sort_flag_opt,
sort_opt,
start_date_opt,
stored_data_keys_opt,
verbosity_opt,
wait_lock_opt,
yes_opt,
)
from jobflow_remote.cli.utils import (
ReportInterval,
SortOption,
check_incompatible_opt,
check_output_stored_data_keys,
check_stopped_runner,
exit_with_error_msg,
exit_with_warning_msg,
get_job_controller,
get_job_db_ids,
get_start_date,
loading_spinner,
out_console,
)
from jobflow_remote.jobs.graph import get_graph, plot_dash
from jobflow_remote.jobs.report import FlowsReport
from jobflow_remote.jobs.state import JobState
app_flow = JFRTyper(
name="flow", help="Commands for managing the flows", no_args_is_help=True
)
app.add_typer(app_flow)
[docs]
@app_flow.command(name="list")
def flows_list(
job_id: job_ids_opt = None,
db_id: db_ids_opt = None,
flow_id: flow_ids_opt = None,
state: flow_state_opt = None,
start_date: start_date_opt = None,
end_date: end_date_opt = None,
name: name_opt = None,
days: days_opt = None,
hours: hours_opt = None,
metadata: metadata_opt = None,
locked: locked_flow_opt = False,
verbosity: verbosity_opt = 0,
count: count_opt = False,
max_results: max_results_opt = 100,
sort: sort_opt = SortOption.UPDATED_ON,
reverse_sort: reverse_sort_flag_opt = False,
) -> None:
"""Get the list of Flows in the database."""
from jobflow_remote import SETTINGS
check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours})
check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours})
jc = get_job_controller()
start_date = get_start_date(start_date, days, hours)
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]
if count:
with loading_spinner():
n_flows = jc.count_flows(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
metadata=metadata,
locked=locked,
)
out_console.print(f"Number of Flows: {n_flows}")
else:
with loading_spinner():
flows_info = jc.get_flows_info(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
metadata=metadata,
locked=locked,
limit=max_results,
sort=db_sort,
with_jobs_info=verbosity > 0,
)
table = get_flow_info_table(flows_info, verbosity=verbosity)
if SETTINGS.cli_suggestions and max_results and len(flows_info) == max_results:
out_console.print(
f"The number of Flows printed is limited by the maximum selected: {max_results}",
style="yellow",
)
out_console.print(table)
[docs]
@app_flow.command()
def delete(
job_id: job_ids_opt = None,
db_id: db_ids_opt = None,
flow_id: flow_ids_opt = None,
state: flow_state_opt = None,
start_date: start_date_opt = None,
end_date: end_date_opt = None,
name: name_opt = None,
days: days_opt = None,
hours: hours_opt = None,
yes_all: yes_opt = False,
force_deprecated: force_opt_deprecated = False,
max_limit: Annotated[
int,
typer.Option(
"--max",
"-m",
help=(
"The Flows will be deleted only if the total number is lower than the specified limit. 0 means no limit"
),
),
] = 10,
verbosity: Annotated[
int,
typer.Option(
"--verbosity",
"-v",
help="Print the list of Flows to be deleted when asking for confirmation. "
"Multiple -v options increase the details on the flow. (e.g. -vvv)",
count=True,
),
] = False,
delete_output: delete_output_opt = False,
delete_files: delete_files_opt = False,
delete_all: delete_all_opt = False,
keep_processes: Annotated[
bool,
typer.Option(
"--keep-processes",
"-kp",
help="Do not attempt to cancel SUBMITTED and RUNNING processes from "
"the worker associated with the deleted Flows",
),
] = False,
wait: wait_lock_opt = None,
break_lock: break_lock_opt = False,
) -> None:
"""Permanently delete Flows from the database"""
check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours})
check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours})
if delete_all:
delete_files = delete_output = True
start_date = get_start_date(start_date, days, hours)
jc = get_job_controller()
# At variance with flows_list, for the amount of details to be printed,
# the verbosity value will be decreased by one: the first is to enable
# initial print
with loading_spinner(processing=False) as progress:
progress.add_task(description="Fetching data...", total=None)
flows_info = jc.get_flows_info(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
with_jobs_info=verbosity > 1,
)
if not flows_info:
exit_with_warning_msg("No flows matching criteria")
if flows_info and not yes_all:
if verbosity:
preamble = Text.from_markup(
f"[red]This operation will [bold]delete the following {len(flows_info)} Flow(s)[/bold][/red]"
)
out_console.print(preamble)
table = get_flow_info_table(flows_info, verbosity=verbosity - 1)
out_console.print(table)
text = Text.from_markup("[red]Proceed anyway?[/red]")
else:
text = Text.from_markup(
f"[red]This operation will [bold]delete {len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]"
)
confirmed = Confirm.ask(text, default=False)
if not confirmed:
raise typer.Exit(0)
to_delete = [fi.flow_id for fi in flows_info]
# if potentially interactive do not start the spinner.
spinner_cm: contextlib.AbstractContextManager
if delete_files and jc.project.has_interactive_workers:
spinner_cm = contextlib.nullcontext()
out_console.print("Deleting flows...")
else:
spinner_cm = loading_spinner(processing=False)
with spinner_cm as progress:
if progress:
progress.add_task(description="Deleting flows...", total=None)
deleted = jc.delete_flows(
flow_ids=to_delete,
delete_output=delete_output,
delete_files=delete_files,
cancel_processes=not keep_processes,
max_limit=max_limit,
wait=wait,
break_lock=break_lock,
)
if not_deleted := set(to_delete) - set(deleted):
out_console.print(
f"Some of the selected Flows were not deleted: {', '.join(i for i in not_deleted)}"
)
if deleted:
out_console.print(
f"Deleted Flow(s) with id: {', '.join(str(i) for i in deleted)}"
)
[docs]
@app_flow.command(name="info")
def flow_info(
flow_db_id: flow_db_id_arg,
job_id_flag: job_flow_id_flag_opt = False,
verbosity: verbosity_opt = 0,
stored_data_keys: stored_data_keys_opt = None,
cli_output_keys: cli_output_keys_opt = None,
sort: sort_opt = SortOption.UPDATED_ON,
jobs_sort: Annotated[
SortOption,
typer.Option(
"--jobs-sort",
help="The field on which the jobs will be sorted. In descending order",
),
] = None,
reverse_sort: reverse_sort_flag_opt = False,
reverse_jobs_sort: Annotated[
bool,
typer.Option(
"--reverse-jobs-sort",
"-jrevs",
help="Reverse the sorting order of the jobs",
),
] = False,
print_report: Annotated[
bool,
typer.Option(
"--report",
"-rep",
help="Show a summary report of the Flow status",
),
] = False,
) -> None:
"""Provide detailed information on a Flow."""
output_keys = check_output_stored_data_keys(
cli_output_keys, stored_data_keys, verbosity, header_name_data_getter_map
)
db_id, jf_id = get_job_db_ids(flow_db_id, None)
db_ids = job_ids = flow_ids = None
if db_id is not None:
db_ids = [db_id]
elif job_id_flag:
job_ids = [jf_id]
else:
flow_ids = [jf_id]
db_sort: list[tuple[str, int]] = [(sort.value, 1 if reverse_sort else -1)]
db_jobs_sort: list[tuple[str, int]] | None = None
if jobs_sort:
db_jobs_sort = [(jobs_sort.value, 1 if reverse_jobs_sort else -1)]
with loading_spinner():
jc = get_job_controller()
with_jobs_info: bool | list[str] = True
if report:
with_jobs_info = ["start_time", "end_time"]
flows_info = jc.get_flows_info(
job_ids=job_ids,
db_ids=db_ids,
flow_ids=flow_ids,
sort=db_sort,
jobs_sort=db_jobs_sort,
limit=1,
with_jobs_info=with_jobs_info,
)
if not flows_info:
exit_with_error_msg("No data matching the request")
if print_report:
out_console.print(*get_single_flow_report_components(flows_info[0]))
else:
out_console.print(
format_flow_info(
flows_info[0],
verbosity=verbosity,
output_keys=output_keys,
stored_data_keys=stored_data_keys,
)
)
[docs]
@app_flow.command()
def graph(
flow_db_id: flow_db_id_arg,
job_id_flag: job_flow_id_flag_opt = False,
label: Annotated[
str | None,
typer.Option(
"--label",
"-l",
help="The label used to identify the nodes",
),
] = "name",
file_path: Annotated[
str | None,
typer.Option(
"--path",
"-p",
help="If defined, the graph will be dumped to a file",
),
] = None,
dash_plot: Annotated[
bool,
typer.Option(
"--dash",
"-d",
help="Show the graph in a dash app",
),
] = False,
print_mermaid: Annotated[
bool,
typer.Option(
"--mermaid",
"-m",
help="Print the mermaid graph",
),
] = False,
) -> None:
"""Provide detailed information on a Flow."""
check_incompatible_opt({"dash": dash_plot, "mermaid": print_mermaid})
db_id, jf_id = get_job_db_ids(flow_db_id, None)
db_ids = job_ids = flow_ids = None
if db_id is not None:
db_ids = [db_id]
elif job_id_flag:
job_ids = [jf_id]
else:
flow_ids = [jf_id]
with loading_spinner():
jc = get_job_controller()
flows_info = jc.get_flows_info(
job_ids=job_ids,
db_ids=db_ids,
flow_ids=flow_ids,
limit=1,
with_jobs_info=True,
)
if not flows_info:
exit_with_error_msg("No data matching the request")
if print_mermaid:
from jobflow_remote.jobs.graph import get_mermaid
print(get_mermaid(flows_info[0]))
elif dash_plot:
plot_dash(flows_info[0])
else:
plt = draw_graph(get_graph(flows_info[0], label=label))
if file_path:
plt.savefig(file_path)
else:
plt.show()
[docs]
@app_flow.command()
def report(
interval: Annotated[
ReportInterval,
typer.Argument(
help="The interval of the trends for the report",
metavar="INTERVAL",
),
] = ReportInterval.DAYS,
num_intervals: Annotated[
int | None,
typer.Argument(
help="The number of intervals to consider. Default depends on the interval type",
metavar="NUM_INTERVALS",
),
] = None,
):
"""
Generate a report about the Flows in the database.
"""
jc = get_job_controller()
timezone = datetime.now(tzlocal()).tzname()
jobs_report = FlowsReport.generate_report(
job_controller=jc,
interval=interval.value,
num_intervals=num_intervals,
timezone=timezone,
)
out_console.print(*get_flow_report_components(jobs_report))
[docs]
@app_flow.command()
def resume(
flow_db_id: flow_db_id_arg,
job_id_flag: job_flow_id_flag_opt = False,
) -> None:
"""Resume a STOPPED or PAUSED Flow."""
job_id = flow_id = None
db_id, jf_id = get_job_db_ids(flow_db_id, None)
if db_id is None:
if job_id_flag:
job_id = jf_id
else:
flow_id = jf_id
with loading_spinner():
jc = get_job_controller()
n_jobs = jc.resume_flow(
job_id=job_id,
db_id=db_id,
flow_id=flow_id,
)
out_console.print(f"{n_jobs} Job(s) resumed")
[docs]
@app_flow.command()
def clean(
job_id: job_ids_opt = None,
db_id: db_ids_opt = None,
flow_id: flow_ids_opt = None,
state: flow_state_opt = None,
start_date: start_date_opt = None,
end_date: end_date_opt = None,
name: name_opt = None,
days: days_opt = None,
hours: hours_opt = None,
metadata: metadata_opt = None,
locked: locked_flow_opt = False,
verbosity: verbosity_opt = 0,
force: Annotated[
bool,
typer.Option(
"--force",
"-f",
help="Do not check if runner is active",
),
] = False,
yes_all: yes_opt = False,
all_states: Annotated[
bool,
typer.Option(
"--all-states",
"-as",
help="Delete files for Jobs in any state",
),
] = False,
):
"""
Remove the files of the executed Jobs.
"""
check_incompatible_opt({"start_date": start_date, "days": days, "hours": hours})
check_incompatible_opt({"end_date": end_date, "days": days, "hours": hours})
if all_states and not force:
check_stopped_runner(error=True)
jc = get_job_controller()
start_date = get_start_date(start_date, days, hours)
with loading_spinner():
with_jobs_info = ["run_dir"]
flows_info = jc.get_flows_info(
job_ids=job_id,
db_ids=db_id,
flow_ids=flow_id,
states=state,
start_date=start_date,
end_date=end_date,
name=name,
metadata=metadata,
locked=locked,
with_jobs_info=with_jobs_info,
)
if not flows_info:
exit_with_warning_msg("No flows matching criteria")
if not yes_all:
if verbosity:
preamble = Text.from_markup(
f"[red]This operation will [bold]delete the files of the following {len(flows_info)} Flow(s)[/bold][/red]"
)
out_console.print(preamble)
table = get_flow_info_table(flows_info, verbosity=verbosity - 1)
out_console.print(table)
text = Text.from_markup("[red]Proceed anyway?[/red]")
else:
text = Text.from_markup(
f"[red]This operation will [bold]delete the files of {len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]"
)
confirmed = Confirm.ask(text, default=False)
if not confirmed:
raise typer.Exit(0)
# if potentially interactive do not start the spinner.
spinner_cm: contextlib.AbstractContextManager
if jc.project.has_interactive_workers:
spinner_cm = contextlib.nullcontext()
out_console.print("Deleting files...")
else:
spinner_cm = loading_spinner(processing=False)
skipped_jobs = []
cleanable_states = (
JobState.COMPLETED,
JobState.FAILED,
JobState.REMOTE_ERROR,
)
with spinner_cm as progress:
if progress:
progress.add_task(description="Deleting files...", total=None)
jobs_info = {}
for fi in flows_info:
for ji in fi.jobs_info:
if ji.run_dir:
if not all_states and ji.state not in cleanable_states:
skipped_jobs.append(ji)
else:
jobs_info[ji.db_id] = ji
deleted = jc.safe_delete_files(list(jobs_info.values()))
deleted_dict = {ji.db_id: ji for ji in deleted}
not_deleted = set(jobs_info) - set(deleted_dict)
if not_deleted:
out_console.print("Folder was not deleted for the following jobs:")
for db_id in not_deleted:
out_console.print(f" - {db_id}: {jobs_info[db_id].run_dir}")
if skipped_jobs:
out_console.print(
f"{len(skipped_jobs)} were not cleaned-up due to their state:"
)
if len(skipped_jobs) < 10:
for ji in skipped_jobs:
out_console.print(f" - {ji.db_id} - {ji.state}")
else:
confirmed = False
if not yes_all:
text = (
"The number of skipped jobs is too large to be printed, the list can be "
"dumped to the `skipped_cleanup.dat` file. Do you want to create the file?"
)
confirmed = Confirm.ask(text, default=False)
if yes_all or confirmed:
with open("skipped_cleanup.dat", "w") as f:
for ji in skipped_jobs:
f.writelines(f" - {ji.db_id} - {ji.state}")
out_console.print(f"Deleted execution folders of {len(deleted)} Jobs")
app_flow_set = JFRTyper(
name="set", help="Commands for setting properties for flows", no_args_is_help=True
)
app_flow.add_typer(app_flow_set)
[docs]
@app_flow_set.command()
def store(
flow_db_id: flow_db_id_arg,
store: Annotated[
str | None,
typer.Argument(
help="The name of the Store to be set. If empty will set the default JobStore",
metavar="STORE",
),
] = None,
job_id_flag: job_flow_id_flag_opt = False,
) -> None:
"""Provide detailed information on a Flow."""
db_id = job_id = flow_id = None
db_id, jf_id = get_job_db_ids(flow_db_id, None)
if db_id is None:
if job_id_flag:
job_id = jf_id
else:
flow_id = jf_id
with loading_spinner():
jc = get_job_controller()
jc.set_flow_store(store=store, flow_id=flow_id, db_id=db_id, job_id=job_id)
out_console.print("Flow has been updated")