Source code for jobflow_remote.cli.admin

from typing import Annotated, Optional

import typer
from packaging.version import parse as parse_version
from rich.prompt import Confirm
from rich.text import Text

from jobflow_remote.cli.formatting import format_upgrade_actions
from jobflow_remote.cli.jf import app
from jobflow_remote.cli.jfr_typer import JFRTyper
from jobflow_remote.cli.types import (
    db_ids_opt,
    end_date_opt,
    flow_ids_opt,
    flow_state_opt,
    force_opt,
    foreground_index_opt,
    index_direction_arg,
    index_key_arg,
    job_ids_indexes_opt,
    job_state_opt,
    start_date_opt,
)
from jobflow_remote.cli.utils import (
    IndexDirection,
    check_stopped_runner,
    confirm_project_name,
    exit_with_error_msg,
    exit_with_warning_msg,
    get_config_manager,
    get_job_controller,
    get_job_ids_indexes,
    loading_spinner,
    out_console,
)
from jobflow_remote.jobs.data import DbCollection
from jobflow_remote.jobs.upgrade import DatabaseUpgrader

app_admin = JFRTyper(
    name="admin", help="Commands for administering the database", no_args_is_help=True
)
app.add_typer(app_admin)


[docs] @app_admin.command() def upgrade( no_dry_run: Annotated[ bool, typer.Option( "--no-dry-run", "-ndr", help="Skip the dry-run step", ), ] = False, check_env: Annotated[ bool, typer.Option( "--check-env", "-ce", help="Report the changes in the package installed in the python environment before " "proceeding with the upgrade", ), ] = False, target: Annotated[ Optional[str], typer.Option( "--target", "-t", help="Explicitly sets the target version for upgrade. For testing purposes or development" " versions. Not for standard updates.", ), ] = None, ) -> None: """ Upgrade the jobflow database. WARNING: can modify all the data. Previous version cannot be retrieved anymore. It preferable to perform a backup before upgrading. """ check_stopped_runner(error=True) jc = get_job_controller() upgrader = DatabaseUpgrader(jc) target_version = parse_version(target) if target else upgrader.current_version if target_version.local or target_version.post: # this is likely a developer version installed from source. # get the available upgrades for version higher than the developer one base_version = parse_version(target_version.base_version) upgrades_available = upgrader.collect_upgrades( base_version, upgrader.registered_upgrades[-1] ) msg = ( f"Target version {target_version} is likely a development version. Explicitly " f"specify the target version with the --target option if this is the case." ) if upgrades_available: msg += ( f" Available upgrades larger than {base_version}: " f"{','.join(str(v) for v in upgrades_available)}" ) out_console.print(msg, style="gold1") db_version = jc.get_current_db_version() if db_version >= target_version: exit_with_warning_msg( f"Current DB version: {db_version}. No upgrade required for target version {target_version}" ) jobflow_check = jc.upgrade_check_jobflow() if jobflow_check: out_console.print(jobflow_check) if check_env: full_check = jc.upgrade_full_check() if jobflow_check or full_check: text = Text.from_markup( +full_check + "This does not necessarily pose a problem. Just check if there is any potential " "incompatible upgrade" ) out_console.print(text) if not no_dry_run: actions = upgrader.dry_run(target_version=target) if not actions: out_console.print( f"No actions will be required for upgrading to version {target_version}" ) else: out_console.print( f"In order to upgrade the DB to version {target_version} the following actions will be performed:" ) actions_formatted = format_upgrade_actions(actions) out_console.print(actions_formatted) warn_text = Text.from_markup( "[bold red]Performing the upgrade will permanently modify data in the queue DB. " "It is advisable to perform a backup before proceeding.[/]" ) out_console.print(warn_text) confirm_project_name(style="bold red", exit_on_error=True) with loading_spinner(processing=False) as progress: progress.add_task(description="Upgrading the DB...", total=None) done = upgrader.upgrade(target_version=target) not_text = "" if done else "[bold]NOT [/bold]" out_console.print(f"The database has {not_text}been upgraded")
[docs] @app_admin.command() def reset( validation: Annotated[ Optional[str], typer.Argument( help=( "If the number of flows in the DB exceed 25 it will be required to pass " "today's date in the YYYY-MM-DD to proceed with the reset" ), metavar="DATE", ), ] = None, reset_output: Annotated[ bool, typer.Option( "--output", "-o", help="Also delete all the documents in the current store", ), ] = False, force: force_opt = False, ) -> None: """ Reset the jobflow database. WARNING: deletes all the data. These could not be retrieved anymore. """ from jobflow_remote import SETTINGS check_stopped_runner(error=True) if not force: cm = get_config_manager() project_name = cm.get_project_data().project.name text = Text.from_markup( "[red]This operation will [bold]delete all the Jobs data[/bold] " f"for project [bold]{project_name}[/bold]. Proceed anyway?[/red]" ) confirmed = Confirm.ask(text, default=False) if not confirmed: raise typer.Exit(0) with loading_spinner(processing=False) as progress: progress.add_task(description="Resetting the DB...", total=None) jc = get_job_controller() done = jc.reset(reset_output=reset_output, max_limit=25, validation=validation) not_text = "" if done else "[bold]NOT [/bold]" out_console.print(f"The database was {not_text}reset") if not done and SETTINGS.cli_suggestions: out_console.print( "Check the amount of Flows and set the DATE argument if this is the correct project to reset", style="yellow", )
[docs] @app_admin.command() def unlock( job_id: job_ids_indexes_opt = None, db_id: db_ids_opt = None, state: job_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, force: force_opt = False, ) -> None: """ Forcibly removes the lock from the documents of the selected jobs. If no criteria is specified all the locked jobs will be selected. WARNING: can lead to inconsistencies if the processes is actually running. """ job_ids_indexes = get_job_ids_indexes(job_id) jc = get_job_controller() if not force: with loading_spinner(processing=False) as progress: progress.add_task( description="Checking the number of locked documents...", total=None ) jobs_info = jc.get_jobs_info( job_ids=job_ids_indexes, db_ids=db_id, states=state, start_date=start_date, locked=True, end_date=end_date, ) if not jobs_info: exit_with_error_msg("No data matching the request") text = Text.from_markup( f"[red]This operation will [bold]remove the lock[/bold] for (roughly) [bold]{len(jobs_info)} Job(s)[/bold]. Proceed anyway?[/red]" ) confirmed = Confirm.ask(text, default=False) if not confirmed: raise typer.Exit(0) with loading_spinner(processing=False) as progress: progress.add_task(description="Unlocking jobs...", total=None) num_unlocked = jc.unlock_jobs( job_ids=job_id, db_ids=db_id, states=state, start_date=start_date, end_date=end_date, ) out_console.print(f"{num_unlocked} jobs were unlocked")
[docs] @app_admin.command() def unlock_flow( job_id: job_ids_indexes_opt = None, db_id: db_ids_opt = None, flow_id: flow_ids_opt = None, state: flow_state_opt = None, start_date: start_date_opt = None, end_date: end_date_opt = None, force: force_opt = False, ) -> None: """ Forcibly removes the lock from the documents of the selected jobs. If no criteria is specified all the locked flows will be selected. WARNING: can lead to inconsistencies if the processes is actually running. """ job_ids_indexes = get_job_ids_indexes(job_id) jc = get_job_controller() if not force: with loading_spinner(processing=False) as progress: progress.add_task( description="Checking the number of locked documents...", total=None ) flows_info = jc.get_flows_info( job_ids=job_ids_indexes, db_ids=db_id, flow_ids=flow_id, states=state, start_date=start_date, locked=True, end_date=end_date, ) if not flows_info: exit_with_error_msg("No data matching the request") text = Text.from_markup( f"[red]This operation will [bold]remove the lock[/bold] for (roughly) [bold]{len(flows_info)} Flow(s)[/bold]. Proceed anyway?[/red]" ) confirmed = Confirm.ask(text, default=False) if not confirmed: raise typer.Exit(0) with loading_spinner(processing=False) as progress: progress.add_task(description="Unlocking flows...", total=None) num_unlocked = jc.unlock_flows( job_ids=job_id, db_ids=db_id, flow_ids=flow_id, states=state, start_date=start_date, end_date=end_date, ) out_console.print(f"{num_unlocked} flows were unlocked")
[docs] @app_admin.command() def unlock_runner() -> None: """ Forcibly removes the lock from the runner document. WARNING: can lead to inconsistencies if a runner daemon is actually running. """ jc = get_job_controller() with loading_spinner(processing=False) as progress: progress.add_task(description="Unlocking runner document...", total=None) num_docs, num_unlocked = jc.unlock_runner() if num_docs == 0: out_console.print( "No runner document... " "Consider upgrading your database using 'jf admin upgrade'" ) elif num_unlocked == 0: out_console.print("The runner document was not locked. Nothing changed.") else: out_console.print("The runner document has been unlocked.")
app_index = JFRTyper( name="index", help="Commands for managing the indexes of the queue database", no_args_is_help=True, ) app_admin.add_typer(app_index)
[docs] @app_index.command() def rebuild( foreground: foreground_index_opt = False, ): """ Rebuild all the standard indexes. Old indexes will be dropped, including the custom ones. """ with loading_spinner(processing=False) as progress: progress.add_task(description="Resetting the indexes...", total=None) jc = get_job_controller() jc.build_indexes(background=not foreground, drop=True) if foreground: out_console.print("Indexes rebuilt") else: out_console.print("Indexes rebuild started in background")
[docs] @app_index.command() def create( key: index_key_arg, direction: index_direction_arg = IndexDirection.ASC, foreground: foreground_index_opt = False, collection: Annotated[ Optional[DbCollection], typer.Option( "--collection", "-c", help="The collection where the index should be added", ), ] = DbCollection.JOBS.value, # type: ignore[assignment] unique: Annotated[ bool, typer.Option( "--unique", "-u", help="The index will be unique", ), ] = False, ): """ Add an index to one of the queue collections """ with loading_spinner(processing=False) as progress: progress.add_task(description="Resetting the indexes...", total=None) jc = get_job_controller() jc.create_indexes( indexes=[[(key, direction.as_pymongo)]], unique=unique, collection=collection, background=not foreground, ) if foreground: out_console.print("Index created") else: out_console.print("Index creation started in background")