jobflow_remote.cli.utils module#

class jobflow_remote.cli.utils.IndexDirection(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

ASC = 'asc'#
DESC = 'desc'#
property as_pymongo#
class jobflow_remote.cli.utils.ReprStr[source]#

Bases: str

Helper class that overrides the standard __repr__ to return the string itself and not its repr(). Used mainly to allow printing of strings with newlines instead of ā€˜nā€™ when repr is used in rich.

class jobflow_remote.cli.utils.SerializeFileFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

JSON = 'json'#
TOML = 'toml'#
YAML = 'yaml'#
class jobflow_remote.cli.utils.SortOption(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

CREATED_ON = 'created_on'#
DB_ID = 'db_id'#
UPDATED_ON = 'updated_on'#
jobflow_remote.cli.utils.check_at_least_one_opt(d: dict) None[source]#
jobflow_remote.cli.utils.check_incompatible_opt(d: dict) None[source]#
jobflow_remote.cli.utils.check_only_one_opt(d: dict) None[source]#
jobflow_remote.cli.utils.check_query_incompatibility(query, incompatible_options)[source]#
jobflow_remote.cli.utils.check_stopped_runner(error: bool = True) None[source]#
jobflow_remote.cli.utils.check_valid_uuid(uuid_str, raise_on_error: bool = True) bool[source]#
jobflow_remote.cli.utils.cleanup_job_controller() None[source]#
jobflow_remote.cli.utils.cli_error_handler(func)[source]#
jobflow_remote.cli.utils.complete_profiling() None[source]#
jobflow_remote.cli.utils.execute_multi_jobs_cmd(single_cmd: Callable, multi_cmd: Callable, job_db_id: str | None = None, job_index: int | None = None, job_ids: list[str] | None = None, db_ids: str | list[str] | None = None, flow_ids: str | list[str] | None = None, states: JobState | list[JobState] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, name: str | None = None, metadata: dict | None = None, days: int | None = None, hours: int | None = None, workers: list[str] | None = None, custom_query: dict | None = None, verbosity: int = 0, raise_on_error: bool = False, **kwargs) None[source]#
jobflow_remote.cli.utils.exit_with_error_msg(message: str, code: int = 1, **kwargs) NoReturn[source]#
jobflow_remote.cli.utils.exit_with_warning_msg(message: str, code: int = 0, **kwargs) NoReturn[source]#
jobflow_remote.cli.utils.get_config_manager() ConfigManager[source]#
jobflow_remote.cli.utils.get_job_controller()[source]#
jobflow_remote.cli.utils.get_job_db_ids(job_db_id: str, job_index: int | None)[source]#
jobflow_remote.cli.utils.get_job_ids_indexes(job_ids: list[str] | None) list[tuple[str, int]] | None[source]#
jobflow_remote.cli.utils.get_start_date(start_date: datetime | None, days: int | None, hours: int | None)[source]#
jobflow_remote.cli.utils.hide_progress(progress: Progress)[source]#

Hide the progress bar or spinning icon if an input is required from the user.

Adapted from a related github issue for rich: Textualize/rich#1535

Parameters:

progress ā€“ The Progress object in use

jobflow_remote.cli.utils.initialize_config_manager(*args, **kwargs) None[source]#
jobflow_remote.cli.utils.loading_spinner(processing: bool = True)[source]#
jobflow_remote.cli.utils.print_success_msg(message: str = 'operation completed', **kwargs) None[source]#
jobflow_remote.cli.utils.start_profiling() None[source]#
jobflow_remote.cli.utils.str_to_dict(string: str | None) dict | None[source]#