Source code for

from __future__ import annotations

import logging
import os
import random
from pathlib import Path
from typing import TYPE_CHECKING

from flufl.lock import Lock, LockError

    from import BaseHost

logger = logging.getLogger(__name__)

LOCK_DIR = "lock"

TERMINATED_DIR = "terminated"

RUNNING_DIR = "running"

SUBMITTED_DIR = "submitted"

[docs] class RemoteBatchManager: """ Manager of remote files containing information about Jobs to be handled by a batch worker. Used by the Runner. """ def __init__( self, host: BaseHost, files_dir: str | Path, ) -> None: """ Parameters ---------- host The host where the files are. files_dir The full path to directory where the files are stored. """ = host self.files_dir = Path(files_dir) self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.terminated_dir = self.files_dir / TERMINATED_DIR self.lock_dir = self.files_dir / LOCK_DIR self._init_files_dir() def _init_files_dir(self) -> None: """Initialize the file directory, creating all the subdiretories.""" # Note that the check of the creation of the folders on a remote host # slows down the start of the runner by a few seconds. # If this proves to be an issue the folder creation should be moved # somewhere else and guaranteed in some other way (e.g. a CLI command # for the user?).
[docs] def submit_job(self, job_id: str, index: int) -> None: """ Submit a Job by uploading the corresponding file. Parameters ---------- job_id Uuid of the Job. index Index of the Job. """ / f"{job_id}_{index}", "")
[docs] def get_submitted(self) -> list[str]: """ Get a list of files present in the submitted directory. Returns ------- The list of file names in the directory. """ return
[docs] def get_terminated(self) -> list[tuple[str, int, str]]: """ Get job ids and process ids of the terminated jobs from the corresponding directory. Returns ------- """ terminated = [] for i in job_id, _index, process_uuid = i.split("_") index = int(_index) terminated.append((job_id, index, process_uuid)) return terminated
[docs] def get_running(self) -> list[tuple[str, int, str]]: running = [] for filename in job_id, _index, process_uuid = filename.split("_") index = int(_index) running.append((job_id, index, process_uuid)) return running
[docs] def delete_terminated(self, ids: list[tuple[str, int, str]]) -> None: for job_id, index, process_uuid in ids: / f"{job_id}_{index}_{process_uuid}")
[docs] class LocalBatchManager: """ Manager of local files containing information about Jobs to be handled by a batch worker. Used in the worker to executes the batch Jobs. """ def __init__(self, files_dir: str | Path, process_id: str) -> None: self.process_id = process_id self.files_dir = Path(files_dir) self.submitted_dir = self.files_dir / SUBMITTED_DIR self.running_dir = self.files_dir / RUNNING_DIR self.terminated_dir = self.files_dir / TERMINATED_DIR self.lock_dir = self.files_dir / LOCK_DIR
[docs] def get_job(self) -> str | None: files = os.listdir(self.submitted_dir) while files: selected = random.choice(files) try: with Lock( str(self.lock_dir / selected), lifetime=60, default_timeout=0 ): os.remove(self.submitted_dir / selected) (self.running_dir / f"{selected}_{self.process_id}").touch() return selected except (LockError, FileNotFoundError): logger.exception( f"Error while locking file {selected}. Will be ignored" ) files.remove(selected) return None
[docs] def terminate_job(self, job_id: str, index: int) -> None: os.remove(self.running_dir / f"{job_id}_{index}_{self.process_id}") (self.terminated_dir / f"{job_id}_{index}_{self.process_id}").touch()