Source code for jobflow_remote.cli.project

import difflib
from pathlib import Path
from typing import TYPE_CHECKING, Annotated

import typer
from rich.panel import Panel
from rich.prompt import Confirm
from rich.syntax import Syntax
from rich.text import Text

from jobflow_remote.cli.formatting import get_exec_config_table, get_worker_table
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
    force_opt_deprecated,
    serialize_file_format_opt,
    tree_opt,
    verbosity_opt,
    yes_opt,
)
from jobflow_remote.cli.utils import (
    SerializeFileFormat,
    check_incompatible_opt,
    check_stopped_runner,
    exit_with_error_msg,
    exit_with_warning_msg,
    get_config_manager,
    hide_progress,
    loading_spinner,
    out_console,
    print_success_msg,
)
from jobflow_remote.config import ConfigError, ConfigManager, Project
from jobflow_remote.config.helper import (
    check_jobstore,
    check_queue_store,
    check_worker,
    generate_dummy_project,
)

if TYPE_CHECKING:
    from collections.abc import Iterable

app_project = JFRTyper(
    name="project",
    help="Commands concerning the project definition",
    # no_args_is_help=True,
)
app.add_typer(app_project)


[docs] @app_project.command(name="list") def list_projects( warn: Annotated[ bool, typer.Option( "--warn", "-w", help="Print the warning for the files that could not be parsed", ), ] = False, ) -> None: """List of available projects.""" cm = ConfigManager(warn=warn) project_name = None try: project_data = cm.get_project_data() project_name = project_data.project.name except ConfigError: pass full_project_list, erroneous_files = cm.project_names_from_files( suppress_warnings=True ) if not full_project_list: exit_with_warning_msg(f"No project available in {cm.projects_folder}") out_console.print(f"List of projects in {cm.projects_folder}") for pn in sorted(full_project_list): out_console.print(f" - {pn}", style="green" if pn == project_name else None) not_parsed_projects = set(full_project_list).difference(cm.projects_data) if not_parsed_projects: out_console.print( "The following project names exist in files in the project folder, " "but could not properly parsed as projects: " f"{', '.join(not_parsed_projects)}.", style="yellow", ) if erroneous_files: out_console.print( "The following files exist in the project folder, " "but could not properly parsed as projects: " f"{', '.join(erroneous_files)}.", style="yellow", ) if (not_parsed_projects or erroneous_files) and not warn: from jobflow_remote import SETTINGS if SETTINGS.cli_suggestions: out_console.print( "Run the command with -w option to see the parsing errors", style="yellow", )
[docs] @app_project.callback(invoke_without_command=True) def current_project( ctx: typer.Context, print_tree: tree_opt = False, # If selected will print the tree of the CLI and exit ) -> None: """Print the list of the project currently selected.""" # only run if no other subcommand is executed if ctx.invoked_subcommand is None: out_console.print("Run 'jf project -h' to get the list of available commands")
[docs] @app_project.command() def generate( name: Annotated[str, typer.Argument(help="Name of the project")], file_format: serialize_file_format_opt = SerializeFileFormat.YAML, full: Annotated[ bool, typer.Option( "--full", help="Generate a configuration file with all the fields and more elements", ), ] = False, ) -> None: """Generate a project configuration file with dummy elements to be edited manually.""" cm = ConfigManager(exclude_unset=not full) if name in cm.projects_data: exit_with_error_msg(f"Project with name {name} already exists") filepath = cm.projects_folder / f"{name}.{file_format.value}" if filepath.exists(): exit_with_error_msg( f"Project with name {name} does not exist, but file {filepath!s} does and will not be overwritten" ) project = generate_dummy_project(name=name, full=full) cm.create_project(project, ext=file_format.value) print_success_msg(f"Configuration file for project {name} created in {filepath!s}")
[docs] @app_project.command() def check( jobstore: Annotated[ bool, typer.Option( "--jobstore", "-js", help="Only check the jobstore connection", ), ] = False, queue: Annotated[ bool, typer.Option( "--queue", "-q", help="Only check the queue connection", ), ] = False, worker: Annotated[ str, typer.Option( "--worker", "-w", help="Only check the connection for the selected worker", ), ] = None, print_errors: Annotated[ bool, typer.Option( "--errors", "-e", help="Print the errors at the end of the checks", ), ] = False, full: Annotated[ bool, typer.Option( "--full", "-f", help="Perform a full check", ), ] = False, ) -> None: """Check that the connection to the different elements of the projects are working.""" check_incompatible_opt({"jobstore": jobstore, "queue": queue, "worker": worker}) # Check environment variables starting with jfremote_ prefix import difflib import os from jobflow_remote import SETTINGS prefix = SETTINGS.model_config["env_prefix"] extra_vars = [ k for k in os.environ if k.lower().startswith(prefix) and k[len(prefix) :].lower() not in SETTINGS.model_fields ] if extra_vars: out_console.print( "The following environment variables with the JFREMOTE_ prefix were found, " "but they don't match any recognized configuration variables and may be incorrect.:\n - " ) out_console.print("\n - ".join(extra_vars)) out_console.print( "\nCheck documentation of Jobflow-Remote for the available settings in " "https://matgenix.github.io/jobflow-remote/user/projectconf.html#general-settings-environment-variables\n" ) suggestions = {} for ev in extra_vars: if close_matches := difflib.get_close_matches( ev[len(prefix) :].upper(), [f.upper() for f in SETTINGS.model_fields], n=1, ): suggestions[ev] = close_matches[0] if suggestions: out_console.print("Suggested environment variables:\n - ") out_console.print( "\n - ".join( [ f"{ev} -> JFREMOTE_{suggestion}" for ev, suggestion in suggestions.items() ] ) ) cm = get_config_manager() project = cm.get_project() check_all = all(not v for v in (jobstore, worker, queue)) workers_to_test: Iterable[str] = [] if check_all: workers_to_test = project.workers elif worker: if worker not in project.workers: exit_with_error_msg( f"Worker {worker} does not exists in project {project.name}" ) workers_to_test = [worker] # check that jobstore main Store and the queue Store do not share the same collection if (check_all or (jobstore and queue)) and ( project.get_jobstore().docs_store == project.get_queue_store() ): msg_duplicated_stores = ( "It seems that the main docs_store of the JobStore and the queue store point to the " "same database and collection. This will lead to errors. Choose different collection names." ) out_console.print(msg_duplicated_stores, style="red bold") tick = "[bold green]✓[/] " tick_warn = "[bold yellow]✓[/] " cross = "[bold red]x[/] " errors = [] with loading_spinner(processing=False) as progress: task_id = progress.add_task("Checking") for worker_name in workers_to_test: progress.update(task_id, description=f"Checking worker {worker_name}") worker_to_test = project.workers[worker_name] if worker_to_test.get_host().interactive_login: with hide_progress(progress): err, worker_warn = check_worker(worker_to_test, full_check=full) else: err, worker_warn = check_worker(worker_to_test, full_check=full) header = tick # At the moment the check_worker should return either an error or a # warning. The code below also deals with the case where both are # returned in the future. if worker_warn: errors.append((f"Worker {worker_name} warning ", worker_warn)) header = tick_warn if err: errors.append((f"Worker {worker_name} ", err)) header = cross progress.print(Text.from_markup(header + f"Worker {worker_name}")) if check_all or jobstore: progress.update(task_id, description="Checking jobstore") err = check_jobstore(project.get_jobstore()) header = tick if err: errors.append(("Jobstore", err)) header = cross progress.print(Text.from_markup(header + "Jobstore")) if project.optional_jobstores: progress.update(task_id, description="Checking optional jobstores") for jobstore_name in project.optional_jobstores: err = check_jobstore(project.get_jobstore(jobstore_name)) header = tick if err: errors.append((f"Jobstore {jobstore_name}", err)) header = cross progress.print( Text.from_markup(header + f"Optional jobstore {jobstore_name}") ) if check_all or queue: progress.update(task_id, description="Checking queue store") err = check_queue_store(project.get_queue_store()) header = tick if err: errors.append(("Queue store", err)) header = cross progress.print(Text.from_markup(header + "Queue store")) if print_errors and errors: out_console.print("Errors:", style="red bold") for e in errors: out_console.print(e[0], style="bold") out_console.print(e[1])
[docs] @app_project.command() def remove( name: Annotated[str, typer.Argument(help="Name of the project")], keep_folders: Annotated[ bool, typer.Option( "--keep-folders", "-k", help="Project related folders are not deleted", ), ] = False, yes_all: yes_opt = False, force_deprecated: force_opt_deprecated = False, ) -> None: """Remove a project from the projects' folder, including the related folders.""" cm = get_config_manager() if name not in cm.projects_data: exit_with_warning_msg(f"Project {name} does not exist") p = cm.get_project(name) if not keep_folders and not yes_all: msg = f"This will delete also the folders:\n\t{p.base_dir}\n\t{p.log_dir}\n\t{p.tmp_dir}\n\t{p.daemon_dir}\nProceed anyway?" if not Confirm.ask(msg): raise typer.Exit(0) with loading_spinner(processing=False) as progress: progress.add_task("Deleting project") cm.remove_project(project_name=name, remove_folders=not keep_folders)
##################################### # Exec config app ##################################### app_exec_config = JFRTyper( name="exec_config", help="Commands concerning the Execution configurations", no_args_is_help=True, ) app_project.add_typer(app_exec_config)
[docs] @app_exec_config.command(name="list") def list_exec_config( verbosity: verbosity_opt = 0, ) -> None: """ The list of defined Execution configurations """ cm = get_config_manager() project = cm.get_project() table = get_exec_config_table(project.exec_config, verbosity) out_console.print(table)
##################################### # Worker app ##################################### app_worker = JFRTyper( name="worker", help="Commands concerning the workers", no_args_is_help=True, ) app_project.add_typer(app_worker)
[docs] @app_worker.command(name="list") def list_worker( verbosity: verbosity_opt = 0, ) -> None: """ The list of defined workers """ cm = get_config_manager() project = cm.get_project() table = get_worker_table(project.workers, verbosity) out_console.print(table)
##################################### # Edit app ##################################### app_edit = JFRTyper( name="edit", help="Edit the project files", no_args_is_help=True, ) app_project.add_typer(app_edit)
[docs] @app_edit.command() def replace( old_string: Annotated[str, typer.Argument(help="String to search for and replace")], new_string: Annotated[str, typer.Argument(help="String to replace with")], all_projects: Annotated[ bool, typer.Option( "--all", "-a", help="Apply replacement to all project files", ), ] = False, yes_all: yes_opt = False, force_deprecated: force_opt_deprecated = False, no_backup: Annotated[ bool, typer.Option( "--no-backup", "-nb", help="Avoid creating a backup copy of the project file", ), ] = False, ) -> None: """ Replace a string in one or more project files. This command performs a text replacement in the YAML project files, replacing all instances of old_string with new_string. By default, creates a backup of the original file. """ import json import tomlkit from ruamel.yaml import YAML cm = get_config_manager() # Determine which projects to process if all_projects: # sort to make it reproducible projects_to_process = sorted(cm.projects_data) if not projects_to_process: exit_with_error_msg("No valid project files found that can be parsed") else: project_data = cm.get_project_data() projects_to_process = [project_data.project.name] check_stopped_runner(error=True) modified_count = 0 for project_name in projects_to_process: try: project_data = cm.get_project_data(project_name) filepath = Path(project_data.filepath) # Read the file as text original_content = filepath.read_text() modified_content = original_content.replace(old_string, new_string) if original_content != modified_content: # Show diff and ask for confirmation if not forced if not yes_all: out_console.print( f"\n[bold]File: {filepath.name} (Project: {project_name})[/bold]" ) out_console.print(f"[dim]Path: {filepath}[/dim]\n") diff_lines = list( difflib.unified_diff( original_content.splitlines(keepends=True), modified_content.splitlines(keepends=True), fromfile=f"{filepath.name} (original)", tofile=f"{filepath.name} (modified)", ) ) diff_text = "".join(diff_lines) syntax = Syntax( diff_text, "diff", theme="monokai", line_numbers=False ) out_console.print( Panel( syntax, title="[bold]Changes Preview[/bold]", border_style="blue", ) ) try: if project_data.ext == "yaml": model = YAML().load(modified_content) elif project_data.ext == "json": model = json.loads(modified_content) elif project_data.ext == "toml": model = tomlkit.parse(modified_content) else: out_console.print( f"Unknown file format for project: {project_data.ext}" ) continue Project.model_validate(model) except Exception as e: out_console.print( "[bold]WARNING: The modification to the project file will result in " f"an invalid file/project [/bold]: {getattr(e, 'message', str(e))}", style="red", ) if not Confirm.ask(f"Apply these changes to {project_name}?"): continue # create a backup of the project file before overwriting if not no_backup: cm.backup_project(project_name) # Write back the modified content filepath.write_text(modified_content) modified_count += 1 out_console.print( f" ✓ Modified: {project_name} ({filepath.name})", style="green" ) else: out_console.print( f" - No changes: {project_name} ({filepath.name})", style="yellow" ) except Exception as e: out_console.print(f" ✗ Error processing {project_name}: {e}", style="red") if modified_count > 0: print_success_msg( f"Successfully modified {modified_count} project file(s). For the changes " "to be registered the runner of all the modified projects needs to be restarted" ) if modified_count == 0: out_console.print("No replacements were made in any files", style="yellow")