Source code for jobflow_remote.jobs.batch

from __future__ import annotations

import logging
import os
import random
from contextlib import ExitStack
from pathlib import Path
from typing import TYPE_CHECKING

from flufl.lock import Lock, LockError
from monty.json import MontyDecoder

if TYPE_CHECKING:
    from jobflow_remote.remote.host import BaseHost

from jobflow_remote.jobs.data import BATCH_INFO_FILENAME

logger = logging.getLogger(__name__)


LOCK_DIR = "lock"

RUN_FINISHED_DIR = "run_finished"

RUNNING_DIR = "running"

SUBMITTED_DIR = "submitted"


[docs] class RemoteBatchManager: """ Manager of remote files containing information about Jobs to be handled by a batch worker. Used by the Runner. """ def __init__( self, host: BaseHost, files_dir: str | Path, ) -> None: """ Parameters ---------- host The host where the files are. files_dir The full path to directory where the files are stored. """ self.host = host self.files_dir = Path(files_dir) self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.run_finished_dir = self.files_dir / RUN_FINISHED_DIR self.lock_dir = self.files_dir / LOCK_DIR # All the directories need to be initialized to check that they exist # and the host connected. # Doing it here has two downsides: 1) it slows down the # start of the runner just for a batch worker being present # 2) if the connection cannot be established the runner may not # even start due to the connection errors. # The initialization is thus done once when the first action is performed. self._dir_initialized = False def _init_files_dir(self) -> None: """Initialize the file directory, creating all the subdirectories.""" self.host.connect() self.host.mkdir(self.files_dir) self.host.mkdir(self.submitted_dir) self.host.mkdir(self.running_dir) self.host.mkdir(self.run_finished_dir) self.host.mkdir(self.lock_dir) self._dir_initialized = True
[docs] def submit_job(self, job_id: str, index: int) -> None: """ Submit a Job by uploading the corresponding file. Parameters ---------- job_id Uuid of the Job. index Index of the Job. """ if not self._dir_initialized: self._init_files_dir() self.host.write_text_file(self.submitted_dir / f"{job_id}_{index}", "")
[docs] def get_submitted(self) -> list[str]: """ Get a list of files present in the submitted directory. Returns ------- The list of file names in the submitted directory. """ if not self._dir_initialized: self._init_files_dir() return self.host.listdir(self.submitted_dir)
[docs] def get_run_finished(self) -> list[tuple[str, int, str]]: """ Get job ids and batch unique ids of the jobs that finished to run from the corresponding directory on the host. Returns ------- list The list of job ids, job indexes and batch process uuids in the host "run_finished" directory. """ if not self._dir_initialized: self._init_files_dir() run_finished = [] for i in self.host.listdir(self.run_finished_dir): job_id, _index, batch_uid = i.split("_") index = int(_index) run_finished.append((job_id, index, batch_uid)) return run_finished
[docs] def get_running(self) -> list[tuple[str, int, str]]: """ Get job ids and batch unique ids of the running jobs from the corresponding directory on the host. Returns ------- list The list of job ids, job indexes and batch process uuids in the host running directory. """ if not self._dir_initialized: self._init_files_dir() running = [] for filename in self.host.listdir(self.running_dir): job_id, _index, batch_uid = filename.split("_") index = int(_index) running.append((job_id, index, batch_uid)) return running
[docs] def delete_run_finished(self, ids: list[tuple[str, int, str]]) -> None: if not self._dir_initialized: self._init_files_dir() for job_id, index, batch_uid in ids: self.host.remove(self.run_finished_dir / f"{job_id}_{index}_{batch_uid}")
[docs] def delete_running(self, batch_uid: str) -> None: """ Remove job files from the running folder for a specific batch unique id. Should be used only for jobs that failed and left dangling running files. Parameters ---------- batch_uid The uuid of the batch process for the running files to be removed. """ if not self._dir_initialized: self._init_files_dir() running_files = self.host.listdir(self.running_dir) for filename in running_files: if filename.endswith(batch_uid): self.host.remove(self.running_dir / filename)
[docs] def cleanup(self) -> bool: """ Remove the files directory on the host. Returns ------- bool True if the directory was successfully deleted or was not existing. """ if self.host.exists(self.files_dir): return self.host.rmtree(self.files_dir, raise_on_error=False) return True
[docs] def get_batch_info( self, batch_dir: Path | str, batch_info_file: str = BATCH_INFO_FILENAME ) -> dict | None: batch_info_path = Path(batch_dir) / batch_info_file if not self.host.exists(batch_info_path): return None json_str = self.host.read_text_file(batch_info_path) return MontyDecoder().decode(json_str)
[docs] class LocalBatchManager: """ Manager of local files containing information about Jobs to be handled by a batch worker. Used in the worker to executes the batch Jobs. """ def __init__( self, files_dir: str | Path, batch_uid: str, multiprocess_lock=None, ) -> None: """ Parameters ---------- files_dir The full path to directory where the files to handle the jobs to be executed in batch processes are stored. batch_uid The uuid associated to the batch process. multiprocess_lock A lock from the multiprocessing module to be used when executing jobs in parallel with other processes of the same worker. """ self.batch_uid = batch_uid self.files_dir = Path(files_dir) self.multiprocess_lock = multiprocess_lock self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.run_finished_dir = self.files_dir / RUN_FINISHED_DIR self.lock_dir = self.files_dir / LOCK_DIR
[docs] def get_job(self) -> str | None: """ Select randomly a job from the submitted directory to be executed. Move the file to the running directory. Locks will prevent the same job from being executed from other processes. If no job can be executed, None is returned. Returns ------- str | None The name of the job that was selected, or None if no job can be executed. """ files = os.listdir(self.submitted_dir) while files: selected = random.choice(files) try: with ExitStack() as lock_stack: # if in a multiprocess execution, avoid concurrent interaction # from processes belonging to the same job if self.multiprocess_lock: lock_stack.enter_context(self.multiprocess_lock) lock_stack.enter_context( Lock( str(self.lock_dir / selected), lifetime=60, default_timeout=0, ) ) os.remove(self.submitted_dir / selected) (self.running_dir / f"{selected}_{self.batch_uid}").touch() return selected except (LockError, FileNotFoundError): logger.exception( f"Error while locking file {selected}. Will be ignored" ) files.remove(selected) return None
[docs] def set_job_finished(self, job_id: str, index: int) -> None: """ Set a job as run_finished by removing the corresponding file from the running directory and adding a new file in the "run_finished" directory. Parameters ---------- job_id The uuid of the job to set as run_finished. index The index of the job to set as run_finished. """ os.remove(self.running_dir / f"{job_id}_{index}_{self.batch_uid}") (self.run_finished_dir / f"{job_id}_{index}_{self.batch_uid}").touch()