importiofrompathlibimportPathfromtypingimportAnnotatedimporttyperfrommonty.jsonimportjsanitizefrommonty.serializationimportdumpfnfromqtoolkit.core.data_objectsimportQResourcesfromrich.prettyimportpprintfromjobflow_remoteimportSETTINGSfromjobflow_remote.cli.formattingimportformat_job_info,get_job_info_tablefromjobflow_remote.cli.jfimportappfromjobflow_remote.cli.jfr_typerimportJFRTyperfromjobflow_remote.cli.typesimport(OptionalStr,break_lock_opt,days_opt,db_ids_opt,delete_all_opt,delete_files_opt,delete_output_opt,end_date_opt,flow_ids_opt,hours_opt,job_db_id_arg,job_ids_indexes_opt,job_index_arg,job_index_opt,job_state_arg,job_state_opt,locked_opt,max_results_opt,metadata_opt,name_opt,query_opt,raise_on_error_opt,reverse_sort_flag_opt,sort_opt,start_date_opt,verbosity_opt,wait_lock_opt,worker_name_opt,)fromjobflow_remote.cli.utilsimport(SortOption,check_incompatible_opt,check_query_incompatibility,check_stopped_runner,execute_multi_jobs_cmd,exit_with_error_msg,exit_with_warning_msg,get_config_manager,get_job_controller,get_job_db_ids,get_job_ids_indexes,get_start_date,hide_progress,loading_spinner,out_console,print_success_msg,str_to_dict,)fromjobflow_remote.jobs.stateimportJobStatefromjobflow_remote.remote.queueimportERR_FNAME,OUT_FNAMEapp_job=JFRTyper(name="job",help="Commands for managing the jobs",no_args_is_help=True)app.add_typer(app_job)
[docs]@app_job.command(name="list")defjobs_list(job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,days:days_opt=None,hours:hours_opt=None,worker_name:worker_name_opt=None,verbosity:verbosity_opt=0,max_results:max_results_opt=100,sort:sort_opt=SortOption.UPDATED_ON,reverse_sort:reverse_sort_flag_opt=False,locked:locked_opt=False,custom_query:query_opt=None,error:Annotated[bool,typer.Option("--error","-e",help="Select the jobs in FAILED and REMOTE_ERROR state. Incompatible with the --state option",),]=False,):""" Get the list of Jobs in the database """check_incompatible_opt({"start_date":start_date,"days":days,"hours":hours})check_incompatible_opt({"end_date":end_date,"days":days,"hours":hours})check_incompatible_opt({"state":state,"error":error})check_query_incompatibility(custom_query,[job_id,db_id,flow_id,state,start_date,end_date,name,metadata,days,hours,worker_name,],)job_ids_indexes=get_job_ids_indexes(job_id)jc=get_job_controller()start_date=get_start_date(start_date,days,hours)db_sort:list[tuple[str,int]]=[(sort.value,1ifreverse_sortelse-1)]iferror:state=[JobState.REMOTE_ERROR,JobState.FAILED]withloading_spinner():ifcustom_query:jobs_info=jc.get_jobs_info_query(query=custom_query,limit=max_results,sort=db_sort,)else:jobs_info=jc.get_jobs_info(job_ids=job_ids_indexes,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,locked=locked,end_date=end_date,name=name,metadata=metadata,workers=worker_name,limit=max_results,sort=db_sort,)table=get_job_info_table(jobs_info,verbosity=verbosity)out_console.print(table)ifSETTINGS.cli_suggestions:ifmax_resultsandlen(jobs_info)==max_results:out_console.print(f"The number of Jobs printed may be limited by the maximum selected: {max_results}",style="yellow",)remote_errors=Falseifany(ji.remote.retry_time_limitisnotNoneandji.state!=JobState.REMOTE_ERRORforjiinjobs_info):text=("Some jobs (state in orange) have failed while interacting with"" the worker, but will be retried.")out_console.print(text,style="yellow")remote_errors=Trueifremote_errorsorany(ji.statein(JobState.REMOTE_ERROR,JobState.FAILED)forjiinjobs_info):text="Get more information about the errors with 'jf job info JOB_ID'"out_console.print(text,style="yellow")
[docs]@app_job.command(name="info")defjob_info(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,pid:Annotated[str,typer.Option("--pid",help="The process ID of the job in the queue system (e.g. Slurm job ID)""The -v flag will be ignored if --pid is used.",),]=None,show_none:Annotated[bool,typer.Option("--show-none","-n",help="Show the data whose values are None. Usually hidden",),]=False,verbosity:verbosity_opt=0,)->None:"""Detailed information on a specific job."""ifpidisnotNoneandjob_db_idisnotNone:raisetyper.BadParameter("Cannot specify both job ID/index and process ID")withloading_spinner():jc=get_job_controller()ifjob_db_idisnotNone:db_id,job_id=get_job_db_ids(job_db_id,job_index)ifverbosity>0:job_data=jc.get_job_doc(job_id=job_id,job_index=job_index,db_id=db_id,)else:job_data=jc.get_job_info(job_id=job_id,job_index=job_index,db_id=db_id,)elifpidisnotNone:job_data=jc.get_job_info_by_pid(pid)else:raisetyper.BadParameter("Must specify either job ID/index or process ID")ifnotjob_data:exit_with_error_msg("No data matching the request")out_console.print(format_job_info(job_data,verbosity,show_none=show_none))
[docs]@app_job.command()defset_state(state:job_state_arg,job_db_id:job_db_id_arg,job_index:job_index_arg=None,)->None:""" Sets the state of a Job to an arbitrary value. WARNING: No checks. This can lead to inconsistencies in the DB. Use with care. """db_id,job_id=get_job_db_ids(job_db_id,job_index)withloading_spinner():jc=get_job_controller()succeeded=jc.set_job_state(state=state,job_id=job_id,job_index=job_index,db_id=db_id,)ifnotsucceeded:exit_with_error_msg("Could not change the job state")print_success_msg()
[docs]@app_job.command()defrerun(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,days:days_opt=None,hours:hours_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,break_lock:break_lock_opt=False,force:Annotated[bool,typer.Option("--force","-f",help=("Force the rerun even if some conditions would normally prevent it (e.g. ""state usually not allowed or children already executed). Can lead to ""inconsistencies. Advanced users."),),]=False,raise_on_error:raise_on_error_opt=False,)->None:""" Rerun a Job. By default, this is limited to jobs that failed and children did not start or jobs that are running. The rerun Job is set to READY and children Jobs to WAITING. If possible, the associated job submitted to the remote queue will be cancelled. Most of the limitations can be overridden by the 'force' option. This could lead to inconsistencies in the overall state of the Jobs of the Flow. """ifforceorbreak_lock:check_stopped_runner(error=False)jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.rerun_job,multi_cmd=jc.rerun_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,workers=worker_name,custom_query=custom_query,days=days,hours=hours,verbosity=verbosity,wait=wait,break_lock=break_lock,force=force,raise_on_error=raise_on_error,)
[docs]@app_job.command()defretry(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,break_lock:break_lock_opt=False,raise_on_error:raise_on_error_opt=False,)->None:""" Retry to perform the operation that failed for a job in a REMOTE_ERROR state or reset the number of attempts at remote action, in order to allow the runner to try it again immediately. """ifbreak_lock:check_stopped_runner(error=False)jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.retry_job,multi_cmd=jc.retry_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,wait=wait,break_lock=break_lock,raise_on_error=raise_on_error,)
[docs]@app_job.command()defpause(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,raise_on_error:raise_on_error_opt=False,)->None:"""Pause a Job. Only READY and WAITING Jobs can be paused. The operation is reversible."""jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.pause_job,multi_cmd=jc.pause_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,wait=wait,raise_on_error=raise_on_error,)
[docs]@app_job.command()defplay(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,raise_on_error:raise_on_error_opt=False,)->None:"""Resume a Job that was previously PAUSED."""jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.play_job,multi_cmd=jc.play_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,wait=wait,raise_on_error=raise_on_error,)
[docs]@app_job.command()defstop(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,break_lock:break_lock_opt=False,raise_on_error:raise_on_error_opt=False,)->None:""" Stop a Job. Only Jobs that did not complete or had an error can be stopped. The operation is irreversible. If possible, the associated job submitted to the remote queue will be cancelled. """ifbreak_lock:check_stopped_runner(error=False)jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.stop_job,multi_cmd=jc.stop_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,wait=wait,break_lock=break_lock,raise_on_error=raise_on_error,)
[docs]@app_job.command()defdelete(job_db_id:job_db_id_arg=None,job_index:job_index_arg=None,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,wait:wait_lock_opt=None,raise_on_error:raise_on_error_opt=False,delete_output:delete_output_opt=False,delete_files:delete_files_opt=False,delete_all:delete_all_opt=False,)->None:""" Delete Jobs individually. The Flow document will be updated accordingly but no consistency check is performed. The Flow may be left in an inconsistent state. For advanced users only. """ifdelete_all:delete_files=delete_output=Truejc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.delete_job,multi_cmd=jc.delete_jobs,job_db_id=job_db_id,job_index=job_index,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,wait=wait,raise_on_error=raise_on_error,delete_output=delete_output,delete_files=delete_files,)
[docs]@app_job.command()defqueue_out(job_db_id:job_db_id_arg,job_index:job_index_arg=None,)->None:"""Print the content of the output files produced by the queue manager."""db_id,job_id=get_job_db_ids(job_db_id,job_index)cm=get_config_manager()withloading_spinner(processing=False)asprogress:progress.add_task(description="Retrieving info...",total=None)jc=get_job_controller()job_info=jc.get_job_info(job_id=job_id,job_index=job_index,db_id=db_id,)ifnotjob_info:exit_with_error_msg("No data matching the request")remote_dir=job_info.run_dirifnotremote_dir:exit_with_warning_msg("The remote folder has not been created yet")out_path=Path(remote_dir,OUT_FNAME)err_path=Path(remote_dir,ERR_FNAME)out=Noneerr=Noneout_error=Noneerr_error=Nonewithloading_spinner(processing=False)asprogress:progress.add_task(description="Retrieving files...",total=None)worker=cm.get_worker(job_info.worker)host=worker.get_host()try:ifworker.get_host().interactive_login:withhide_progress(progress):host.connect()else:host.connect()try:out_bytes=io.BytesIO()host.get(out_path,out_bytes)out=out_bytes.getvalue().decode("utf-8")exceptExceptionase:out_error=getattr(e,"message",str(e))try:err_bytes=io.BytesIO()host.get(err_path,err_bytes)err=err_bytes.getvalue().decode("utf-8")exceptExceptionase:err_error=getattr(e,"message",str(e))finally:try:host.close()exceptException:passifout_error:out_console.print(f"Error while fetching queue output from {out_path!s}: {out_error}",style="red",)else:out_console.print(f"Queue output from {out_path!s}:\n")out_console.print(out)iferr_error:out_console.print(f"Error while fetching queue error from {err_path!s}: {err_error}",style="red",)else:out_console.print(f"Queue error from {err_path!s}:\n")out_console.print(err)
app_job_set=JFRTyper(name="set",help="Commands for managing the jobs",no_args_is_help=True)app_job.add_typer(app_job_set)
[docs]@app_job_set.command()defworker(worker_name:Annotated[str,typer.Argument(help="The name of the worker",metavar="WORKER",),],job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,select_worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,raise_on_error:raise_on_error_opt=False,):""" Set the worker for the selected Jobs. Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...). """jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.set_job_run_properties,multi_cmd=jc.set_job_run_properties,job_db_id=None,job_index=None,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=select_worker_name,custom_query=custom_query,verbosity=verbosity,raise_on_error=raise_on_error,worker=worker_name,)
[docs]@app_job_set.command()defexec_config(exec_config_value:Annotated[str,typer.Argument(help="The name of the exec_config",metavar="EXEC_CONFIG",),],job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,raise_on_error:raise_on_error_opt=False,):""" Set the exec_config for the selected Jobs. Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...). """jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.set_job_run_properties,multi_cmd=jc.set_job_run_properties,job_db_id=None,job_index=None,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,raise_on_error=raise_on_error,exec_config=exec_config_value,)
[docs]@app_job_set.command()defresources(resources_value:Annotated[str,typer.Argument(help="The resources to be specified. Can be either a list of""comma separated key=value pairs or a string with the JSON ""representation of a dictionary "'(e.g \'{"key1.key2": 1, "key3": "test"}\')',metavar="RESOURCES",),],replace:Annotated[bool,typer.Option("--replace","-r",help="If present the value will replace entirely those present ""instead of updating the DB, otherwise only the selected keys ""will be updated",),]=False,qresources:Annotated[bool,typer.Option("--qresources","-qr",help="If present the values in `resources_value` will be interpreted as arguments for a QResources object",),]=False,job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,verbosity:verbosity_opt=0,raise_on_error:raise_on_error_opt=False,):""" Set the resources for the selected Jobs. Only Jobs not in an evolving state (e.g. CHECKED_OUT, UPLOADED, ...) """resources=str_to_dict(resources_value)ifqresources:resources=QResources(**resources)jc=get_job_controller()execute_multi_jobs_cmd(single_cmd=jc.set_job_run_properties,multi_cmd=jc.set_job_run_properties,job_db_id=None,job_index=None,job_ids=job_id,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,days=days,hours=hours,workers=worker_name,custom_query=custom_query,verbosity=verbosity,raise_on_error=raise_on_error,resources=resources,update=notreplace,)
[docs]@app_job.command(name="dump",hidden=True)defjob_dump(job_id:job_ids_indexes_opt=None,db_id:db_ids_opt=None,flow_id:flow_ids_opt=None,state:job_state_opt=None,start_date:start_date_opt=None,end_date:end_date_opt=None,name:name_opt=None,metadata:metadata_opt=None,worker_name:worker_name_opt=None,custom_query:query_opt=None,days:days_opt=None,hours:hours_opt=None,file_path:Annotated[str,typer.Option("--path","-p",help="Path to where the file should be dumped",),]="jobs_dump.json",)->None:"""Dump to json the documents of the selected Jobs from the DB. For debugging."""check_incompatible_opt({"start_date":start_date,"days":days,"hours":hours})check_incompatible_opt({"end_date":end_date,"days":days,"hours":hours})check_query_incompatibility(custom_query,[job_id,db_id,flow_id,state,start_date,end_date,name,metadata,days,hours,worker_name,],)job_ids_indexes=get_job_ids_indexes(job_id)jc=get_job_controller()start_date=get_start_date(start_date,days,hours)withloading_spinner():ifcustom_query:jobs_doc=jc.get_jobs_doc_query(query=custom_query,)else:jobs_doc=jc.get_jobs_doc(job_ids=job_ids_indexes,db_ids=db_id,flow_ids=flow_id,states=state,start_date=start_date,end_date=end_date,name=name,metadata=metadata,workers=worker_name,)ifjobs_doc:dumpfn(jsanitize(jobs_doc,strict=True,enum_values=True),file_path)ifnotjobs_doc:exit_with_error_msg("No data matching the request")
[docs]@app_job.command()defoutput(job_db_id:job_db_id_arg,job_index:job_index_arg=None,file_path:Annotated[OptionalStr,typer.Option("--path","-p",help="If defined, the output will be dumped to this file based on the extension (json or yaml)",),]=None,load:Annotated[bool,typer.Option("--load","-l",help="If enabled all the data from additional stores are also loaded ",),]=False,)->None:"""Detailed information on a specific job."""db_id,job_id=get_job_db_ids(job_db_id,job_index)withloading_spinner():jc=get_job_controller()ifdb_id:job_info=jc.get_job_info(job_id=job_id,job_index=job_index,db_id=db_id,)ifjob_info:job_id=job_info.uuidjob_index=job_info.indexjob_output=Noneifjob_id:job_output=jc.jobstore.get_output(job_id,job_indexor"last",load=load)ifnotjob_output:exit_with_error_msg("No data matching the request")iffile_path:dumpfn(job_output,file_path)else:pprint(job_output)
app_job_files=JFRTyper(name="files",help="Commands for managing the files associated to a job",no_args_is_help=True,)app_job.add_typer(app_job_files)
[docs]@app_job_files.command(name="ls")deffiles_list(job_db_id:job_db_id_arg,job_index:job_index_arg=None,)->None:"""List of files in the run_dir of the selected Job."""db_id,job_id=get_job_db_ids(job_db_id,job_index)cm=get_config_manager()withloading_spinner(processing=False)asprogress:progress.add_task(description="Retrieving info...",total=None)jc=get_job_controller()job_info=jc.get_job_info(job_id=job_id,job_index=job_index,db_id=db_id,)ifnotjob_info:exit_with_error_msg("No data matching the request")remote_dir=job_info.run_dirifnotremote_dir:exit_with_warning_msg("The remote folder has not been created yet")remote_files=Nonewithloading_spinner(processing=False)asprogress:progress.add_task(description="Retrieving information...",total=None)worker=cm.get_worker(job_info.worker)host=worker.get_host()try:host.connect()try:remote_files=host.listdir(remote_dir)exceptExceptionase:raiseRuntimeError(f"Error while fetching the list of files from {remote_dir!s}: {getattr(e,'message',str(e))}")fromefinally:try:host.close()exceptException:passout_console.print(f"List of files in {remote_dir}:")forfninremote_files:out_console.print(fn)
[docs]@app_job_files.command(name="get")deffiles_get(job_db_id:job_db_id_arg,filenames:Annotated[list[str],typer.Argument(help="A list of file names to be retrieved from the job run dir",metavar="FILE_NAMES",),],job_index:job_index_opt=None,path:Annotated[OptionalStr,typer.Option("--path","-p",help="If defined, the files will be copied to this path. Otherwise in the local folder.",),]=None,)->None:db_id,job_id=get_job_db_ids(job_db_id,job_index)cm=get_config_manager()ifnotpath:path="."save_path=Path(path)ifnotsave_path.is_dir():raisetyper.BadParameter("The path is not an existing directory")withloading_spinner(processing=False)asprogress:progress.add_task(description="Retrieving info...",total=None)jc=get_job_controller()job_info=jc.get_job_info(job_id=job_id,job_index=job_index,db_id=db_id,)ifnotjob_info:exit_with_error_msg("No data matching the request")remote_dir=job_info.run_dirifnotremote_dir:exit_with_warning_msg("The remote folder has not been created yet")withloading_spinner(processing=False)asprogress:task_id=progress.add_task(description="Retrieving files...",total=None)worker=cm.get_worker(job_info.worker)host=worker.get_host()file_name:strtry:host.connect()forfile_nameinfilenames:progress.update(task_id,description=f"Retrieving {file_name}")host.get(str(Path(remote_dir)/file_name),str(save_path/file_name))exceptExceptionasexc:raiseRuntimeError(f"Error while fetching file {file_name} from {remote_dir!s}: {getattr(exc,'message',str(exc))}")fromexcfinally:try:host.close()exceptException:pass