Source code for jobflow_remote.jobs.run
from __future__ import annotations
import datetime
import glob
import logging
import os
import subprocess
import time
import traceback
from typing import TYPE_CHECKING
from jobflow import JobStore, initialize_logger
from jobflow.core.flow import get_flow
from monty.os import cd
from monty.serialization import dumpfn, loadfn
from monty.shutil import decompress_file
from jobflow_remote.jobs.batch import LocalBatchManager
from jobflow_remote.jobs.data import IN_FILENAME, OUT_FILENAME
from jobflow_remote.remote.data import get_job_path, get_store_file_paths
from jobflow_remote.utils.log import initialize_remote_run_log
if TYPE_CHECKING:
from pathlib import Path
from jobflow.core.job import Job
logger = logging.getLogger(__name__)
[docs]
def run_remote_job(run_dir: str | Path = ".") -> None:
"""Run the job."""
initialize_remote_run_log()
start_time = datetime.datetime.utcnow()
with cd(run_dir):
error = None
try:
dumpfn({"start_time": start_time}, OUT_FILENAME)
in_data = loadfn(IN_FILENAME)
job: Job = in_data["job"]
store = in_data["store"]
store.connect()
initialize_logger()
try:
response = job.run(store=store)
finally:
# some jobs may have compressed the FW files while being executed,
# try to decompress them if that is the case and files need to be
# decompressed.
decompress_files(store)
# Close the store explicitly, as minimal stores may require it.
try:
store.close()
except Exception:
logger.exception("Error while closing the store")
# The output of the response has already been stored in the store.
response.output = None
# Convert to Flow the dynamic responses before dumping the output.
# This is required so that the response does not need to be
# deserialized and converted by to Flows by the runner.
if response.addition:
response.addition = get_flow(response.addition)
if response.detour:
response.detour = get_flow(response.detour)
if response.replace:
response.replace = get_flow(response.replace)
output = {
"response": response,
"error": error,
"start_time": start_time,
"end_time": datetime.datetime.utcnow(),
}
dumpfn(output, OUT_FILENAME)
except Exception:
# replicate the dump to catch potential errors in
# serializing/dumping the response.
error = traceback.format_exc()
output = {
"response": None,
"error": error,
"start_time": start_time,
"end_time": datetime.datetime.utcnow(),
}
dumpfn(output, OUT_FILENAME)
[docs]
def run_batch_jobs(
base_run_dir: str | Path,
files_dir: str | Path,
process_uuid: str,
max_time: int | None = None,
max_wait: int = 60,
max_jobs: int | None = None,
) -> None:
initialize_remote_run_log()
# TODO the ID should be somehow linked to the queue job
bm = LocalBatchManager(files_dir=files_dir, process_id=process_uuid)
t0 = time.time()
wait = 0
sleep_time = 10
count = 0
while True:
if max_time and max_time < time.time() - t0:
logger.info("Stopping due to max_time")
return
if max_wait and wait > max_wait:
logger.info(
f"No jobs available for more than {max_wait} seconds. Stopping."
)
return
if max_jobs and count >= max_jobs:
logger.info(f"Maximum number of jobs reached ({max_jobs}). Stopping.")
return
job_str = bm.get_job()
if not job_str:
time.sleep(sleep_time)
wait += sleep_time
else:
wait = 0
count += 1
job_id, _index = job_str.split("_")
index: int = int(_index)
logger.info(f"Starting job with id {job_id} and index {index}")
job_path = get_job_path(job_id=job_id, index=index, base_path=base_run_dir)
try:
with cd(job_path):
result = subprocess.run(
["bash", "submit.sh"], # noqa: S603, S607
check=True,
text=True,
capture_output=True,
)
if result.returncode:
logger.warning(
f"Process for job with id {job_id} and index {index} finished with an error"
)
bm.terminate_job(job_id, index)
except Exception:
logger.exception(
"Error while running job with id {job_id} and index {index}"
)
else:
logger.info(f"Completed job with id {job_id} and index {index}")
[docs]
def decompress_files(store: JobStore) -> None:
file_names = [OUT_FILENAME]
file_names.extend(os.path.basename(p) for p in get_store_file_paths(store))
for fn in file_names:
# If the file is already present do not decompress it, even if
# a compressed version is present.
if os.path.isfile(fn):
continue
for f in glob.glob(fn + ".*"):
decompress_file(f)