Source code for jobflow_remote.jobs.batch

from __future__ import annotations

import logging
import os
import random
from contextlib import ExitStack
from pathlib import Path
from typing import TYPE_CHECKING

from flufl.lock import Lock, LockError

if TYPE_CHECKING:
    from jobflow_remote.remote.host import BaseHost


logger = logging.getLogger(__name__)


LOCK_DIR = "lock"

TERMINATED_DIR = "terminated"

RUNNING_DIR = "running"

SUBMITTED_DIR = "submitted"


[docs] class RemoteBatchManager: """ Manager of remote files containing information about Jobs to be handled by a batch worker. Used by the Runner. """ def __init__( self, host: BaseHost, files_dir: str | Path, ) -> None: """ Parameters ---------- host The host where the files are. files_dir The full path to directory where the files are stored. """ self.host = host self.files_dir = Path(files_dir) self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.terminated_dir = self.files_dir / TERMINATED_DIR self.lock_dir = self.files_dir / LOCK_DIR # All the directories need to be initialized to check that they exist # and the host connected. # Doing it here has two downsides: 1) it slows down the # start of the runner just for a batch worker being present # 2) if the connection cannot be established the runner may not # even start due to the connection errors. # The initialization is thus done once when the first action is performed. self._dir_initialized = False def _init_files_dir(self) -> None: """Initialize the file directory, creating all the subdirectories.""" self.host.connect() self.host.mkdir(self.files_dir) self.host.mkdir(self.submitted_dir) self.host.mkdir(self.running_dir) self.host.mkdir(self.terminated_dir) self.host.mkdir(self.lock_dir) self._dir_initialized = True
[docs] def submit_job(self, job_id: str, index: int) -> None: """ Submit a Job by uploading the corresponding file. Parameters ---------- job_id Uuid of the Job. index Index of the Job. """ if not self._dir_initialized: self._init_files_dir() self.host.write_text_file(self.submitted_dir / f"{job_id}_{index}", "")
[docs] def get_submitted(self) -> list[str]: """ Get a list of files present in the submitted directory. Returns ------- The list of file names in the submitted directory. """ if not self._dir_initialized: self._init_files_dir() return self.host.listdir(self.submitted_dir)
[docs] def get_terminated(self) -> list[tuple[str, int, str]]: """ Get job ids and process ids of the terminated jobs from the corresponding directory on the host. Returns ------- list The list of job ids, job indexes and batch process uuids in the host terminated directory. """ if not self._dir_initialized: self._init_files_dir() terminated = [] for i in self.host.listdir(self.terminated_dir): job_id, _index, process_uuid = i.split("_") index = int(_index) terminated.append((job_id, index, process_uuid)) return terminated
[docs] def get_running(self) -> list[tuple[str, int, str]]: """ Get job ids and process ids of the running jobs from the corresponding directory on the host. Returns ------- list The list of job ids, job indexes and batch process uuids in the host running directory. """ if not self._dir_initialized: self._init_files_dir() running = [] for filename in self.host.listdir(self.running_dir): job_id, _index, process_uuid = filename.split("_") index = int(_index) running.append((job_id, index, process_uuid)) return running
[docs] def delete_terminated(self, ids: list[tuple[str, int, str]]) -> None: if not self._dir_initialized: self._init_files_dir() for job_id, index, process_uuid in ids: self.host.remove(self.terminated_dir / f"{job_id}_{index}_{process_uuid}")
[docs] class LocalBatchManager: """ Manager of local files containing information about Jobs to be handled by a batch worker. Used in the worker to executes the batch Jobs. """ def __init__( self, files_dir: str | Path, process_id: str, multiprocess_lock=None, ) -> None: """ Parameters ---------- files_dir The full path to directory where the files to handle the jobs to be executed in batch processes are stored. process_id The uuid associated to the batch process. multiprocess_lock A lock from the multiprocessing module to be used when executing jobs in parallel with other processes of the same worker. """ self.process_id = process_id self.files_dir = Path(files_dir) self.multiprocess_lock = multiprocess_lock self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.terminated_dir = self.files_dir / TERMINATED_DIR self.lock_dir = self.files_dir / LOCK_DIR
[docs] def get_job(self) -> str | None: """ Select randomly a job from the submitted directory to be executed. Move the file to the running directory. Locks will prevent the same job from being executed from other processes. If no job can be executed, None is returned. Returns ------- str | None The name of the job that was selected, or None if no job can be executed. """ files = os.listdir(self.submitted_dir) while files: selected = random.choice(files) try: with ExitStack() as lock_stack: # if in a multiprocess execution, avoid concurrent interaction # from processes belonging to the same job if self.multiprocess_lock: lock_stack.enter_context(self.multiprocess_lock) lock_stack.enter_context( Lock( str(self.lock_dir / selected), lifetime=60, default_timeout=0, ) ) os.remove(self.submitted_dir / selected) (self.running_dir / f"{selected}_{self.process_id}").touch() return selected except (LockError, FileNotFoundError): logger.exception( f"Error while locking file {selected}. Will be ignored" ) files.remove(selected) return None
[docs] def terminate_job(self, job_id: str, index: int) -> None: """ Terminate a job by removing the corresponding file from the running directory and adding a new file in the terminated directory. Parameters ---------- job_id The uuid of the job to terminate. index The index of the job to terminate. """ os.remove(self.running_dir / f"{job_id}_{index}_{self.process_id}") (self.terminated_dir / f"{job_id}_{index}_{self.process_id}").touch()