from typing import TYPE_CHECKING, Annotated
import typer
from rich.prompt import Confirm
from rich.text import Text
from jobflow_remote.cli.formatting import get_exec_config_table, get_worker_table
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import force_opt, serialize_file_format_opt, verbosity_opt
from jobflow_remote.cli.utils import (
SerializeFileFormat,
check_incompatible_opt,
exit_with_error_msg,
exit_with_warning_msg,
get_config_manager,
hide_progress,
loading_spinner,
out_console,
print_success_msg,
)
from jobflow_remote.config import ConfigError, ConfigManager
from jobflow_remote.config.helper import (
check_jobstore,
check_queue_store,
check_worker,
generate_dummy_project,
)
if TYPE_CHECKING:
from collections.abc import Iterable
app_project = JFRTyper(
name="project",
help="Commands concerning the project definition",
# no_args_is_help=True,
)
app.add_typer(app_project)
[docs]
@app_project.command(name="list")
def list_projects(
warn: Annotated[
bool,
typer.Option(
"--warn",
"-w",
help="Print the warning for the files that could not be parsed",
),
] = False,
) -> None:
"""List of available projects."""
cm = ConfigManager(warn=warn)
project_name = None
try:
project_data = cm.get_project_data()
project_name = project_data.project.name
except ConfigError:
pass
full_project_list = cm.project_names_from_files()
if not full_project_list:
exit_with_warning_msg(f"No project available in {cm.projects_folder}")
out_console.print(f"List of projects in {cm.projects_folder}")
for pn in sorted(full_project_list):
out_console.print(f" - {pn}", style="green" if pn == project_name else None)
not_parsed_projects = set(full_project_list).difference(cm.projects_data)
if not_parsed_projects:
out_console.print(
"The following project names exist in files in the project folder, "
"but could not properly parsed as projects: "
f"{', '.join(not_parsed_projects)}.",
style="yellow",
)
from jobflow_remote import SETTINGS
if SETTINGS.cli_suggestions:
out_console.print(
"Run the command with -w option to see the parsing errors",
style="yellow",
)
[docs]
@app_project.callback(invoke_without_command=True)
def current_project(ctx: typer.Context) -> None:
"""Print the list of the project currently selected."""
# only run if no other subcommand is executed
if ctx.invoked_subcommand is None:
out_console.print("Run 'jf project -h' to get the list of available commands")
[docs]
@app_project.command()
def generate(
name: Annotated[str, typer.Argument(help="Name of the project")],
file_format: serialize_file_format_opt = SerializeFileFormat.YAML,
full: Annotated[
bool,
typer.Option(
"--full",
help="Generate a configuration file with all the fields and more elements",
),
] = False,
) -> None:
"""Generate a project configuration file with dummy elements to be edited manually."""
cm = ConfigManager(exclude_unset=not full)
if name in cm.projects_data:
exit_with_error_msg(f"Project with name {name} already exists")
filepath = cm.projects_folder / f"{name}.{file_format.value}"
if filepath.exists():
exit_with_error_msg(
f"Project with name {name} does not exist, but file {filepath!s} does and will not be overwritten"
)
project = generate_dummy_project(name=name, full=full)
cm.create_project(project, ext=file_format.value)
print_success_msg(f"Configuration file for project {name} created in {filepath!s}")
[docs]
@app_project.command()
def check(
jobstore: Annotated[
bool,
typer.Option(
"--jobstore",
"-js",
help="Only check the jobstore connection",
),
] = False,
queue: Annotated[
bool,
typer.Option(
"--queue",
"-q",
help="Only check the queue connection",
),
] = False,
worker: Annotated[
str,
typer.Option(
"--worker",
"-w",
help="Only check the connection for the selected worker",
),
] = None,
print_errors: Annotated[
bool,
typer.Option(
"--errors",
"-e",
help="Print the errors at the end of the checks",
),
] = False,
) -> None:
"""Check that the connection to the different elements of the projects are working."""
check_incompatible_opt({"jobstore": jobstore, "queue": queue, "worker": worker})
cm = get_config_manager()
project = cm.get_project()
check_all = all(not v for v in (jobstore, worker, queue))
workers_to_test: Iterable[str] = []
if check_all:
workers_to_test = project.workers
elif worker:
if worker not in project.workers:
exit_with_error_msg(
f"Worker {worker} does not exists in project {project.name}"
)
workers_to_test = [worker]
tick = "[bold green]✓[/] "
cross = "[bold red]x[/] "
errors = []
with loading_spinner(processing=False) as progress:
task_id = progress.add_task("Checking")
for worker_name in workers_to_test:
progress.update(task_id, description=f"Checking worker {worker_name}")
worker_to_test = project.workers[worker_name]
if worker_to_test.get_host().interactive_login:
with hide_progress(progress):
err = check_worker(worker_to_test)
else:
err = check_worker(worker_to_test)
header = tick
if err:
errors.append((f"Worker {worker_name}", err))
header = cross
progress.print(Text.from_markup(header + f"Worker {worker_name}"))
if check_all or jobstore:
progress.update(task_id, description="Checking jobstore")
err = check_jobstore(project.get_jobstore())
header = tick
if err:
errors.append(("Jobstore", err))
header = cross
progress.print(Text.from_markup(header + "Jobstore"))
if check_all or queue:
progress.update(task_id, description="Checking queue store")
err = check_queue_store(project.get_queue_store())
header = tick
if err:
errors.append(("Queue store", err))
header = cross
progress.print(Text.from_markup(header + "Queue store"))
if print_errors and errors:
out_console.print("Errors:", style="red bold")
for e in errors:
out_console.print(e[0], style="bold")
out_console.print(e[1])
[docs]
@app_project.command()
def remove(
name: Annotated[str, typer.Argument(help="Name of the project")],
keep_folders: Annotated[
bool,
typer.Option(
"--keep-folders",
"-k",
help="Project related folders are not deleted",
),
] = False,
force: force_opt = False,
) -> None:
"""Remove a project from the projects' folder, including the related folders."""
cm = get_config_manager()
if name not in cm.projects_data:
exit_with_warning_msg(f"Project {name} does not exist")
p = cm.get_project(name)
if not keep_folders and not force:
msg = f"This will delete also the folders:\n\t{p.base_dir}\n\t{p.log_dir}\n\t{p.tmp_dir}\n\t{p.daemon_dir}\nProceed anyway?"
if not Confirm.ask(msg):
raise typer.Exit(0)
with loading_spinner(processing=False) as progress:
progress.add_task("Deleting project")
cm.remove_project(project_name=name, remove_folders=not keep_folders)
#####################################
# Exec config app
#####################################
app_exec_config = JFRTyper(
name="exec_config",
help="Commands concerning the Execution configurations",
no_args_is_help=True,
)
app_project.add_typer(app_exec_config)
[docs]
@app_exec_config.command(name="list")
def list_exec_config(
verbosity: verbosity_opt = 0,
) -> None:
cm = get_config_manager()
project = cm.get_project()
table = get_exec_config_table(project.exec_config, verbosity)
out_console.print(table)
#####################################
# Worker app
#####################################
app_worker = JFRTyper(
name="worker",
help="Commands concerning the workers",
no_args_is_help=True,
)
app_project.add_typer(app_worker)
[docs]
@app_worker.command(name="list")
def list_worker(
verbosity: verbosity_opt = 0,
) -> None:
cm = get_config_manager()
project = cm.get_project()
table = get_worker_table(project.workers, verbosity)
out_console.print(table)