Source code for jobflow_remote.utils.db

from __future__ import annotations

import copy
import logging
import os
import re
import subprocess
import time
import warnings
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from shutil import which
from typing import TYPE_CHECKING, Any

import bson
from maggma.stores.mongolike import MongoStore, MongoURIStore
from monty.io import zopen
from pymongo import ReturnDocument
from pymongo.errors import PyMongoError

from jobflow_remote.utils.data import deep_merge_dict, suuid

if TYPE_CHECKING:
    from collections.abc import Iterable, Mapping

    from pymongo.collection import Collection

logger = logging.getLogger(__name__)


[docs] class MongoLock: """ Context manager to lock a document in a MongoDB database. Main characteristics and functionalities: * Lock is acquired by setting a lock_id and lock_time value in the locked document. * Filter the document to select based on a query and sorting. It uses find_one_and_update, thus resulting in a single document locked. * Can wait for a lock to be released. * Can forcibly break an existing lock * Can return the locked document even if the lock could not be acquired (useful for determining if a document is locked or no document matches the query) * Accepts all the arguments that can be passed to find_one_and_update * A custom update can be performed on the document when acquiring the lock. * Allows to pass properties that will be set in the document at the moment of releasing the lock. * Lock id value can be customized. If not a randomly generated uuid is used. Examples -------- Trying to acquire the lock on a document based on the state >>> with MongoLock(collection, {"state": "READY"}) as lock: ... print(lock.locked_document["state"]) READY If lock cannot be acquired (no document matching filter or that document is locked) the `lock.locked_document` is None. >>> with MongoLock(collection, {"state": "READY"}) as lock: ... print(lock.locked_document) None Wait for 60 seconds in case the required document is already locked. Check the status every 10 seconds. If lock cannot be acquired locked_document is None >>> with MongoLock( collection, {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"}, sleep=10, max_wait=60, ) as lock: ... print(lock.locked_document) None In case lock cannot be acquired expose the locked document >>> with MongoLock( collection, {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"}, get_locked_doc=True, ) as lock: ... print(lock.locked_document) None ... print(lock.unavailable_document['lock_id']) 8d68404f-c77a-461b-859c-40bb0af1979f Set values in the document upon lock release >>> with MongoLock(collection, {"state": "READY"}) as lock: ... if lock.locked_document: ... # Perform some operations based on the job... ... lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}} Delete the locked document upon lock release >>> with MongoLock(collection, {"state": "READY"}) as lock: ... if lock.locked_document: ... # decide if the document should be deleted ... lock.delete_on_release = True """ LOCK_KEY = "lock_id" LOCK_TIME_KEY = "lock_time" def __init__( self, collection: Collection, filter: Mapping[str, Any], # noqa: A002 update: Mapping[str, Any] | None = None, break_lock: bool = False, lock_id: str | None = None, sleep: int | None = None, max_wait: int = 600, projection: Mapping[str, Any] | Iterable[str] | None = None, get_locked_doc: bool = False, **kwargs, ) -> None: """ Parameters ---------- collection The MongoDB collection containing the document to lock. filter A MongoDB query to select the document. update A dictionary that will be passed to find_one_and_update to update the locked document at the moment of acquiring the lock. break_lock True if the context manager is allowed to forcibly break a lock. lock_id The is used for the lock in the document. If None a randomly generated uuid will be used. sleep The amount of second to sleep between consecutive checks while waiting for a lock to be released. max_wait The amount of seconds to wait for a lock to be released. projection The projection passed to the find_one_and_update that locks the document. get_locked_doc If True, if the lock cannot be acquired because the document matching the filter is already locked, the locked document will be fetched and set in the unavailable_document attribute. kwargs All the other args are passed to find_one_and_update. """ self.collection = collection self.filter = filter or {} self.update = update self.break_lock = break_lock self.locked_document = None self.unavailable_document = None self.lock_id = lock_id or suuid() self.kwargs = kwargs self._update_on_release: dict | list = {} self._delete_on_release: bool = False self.sleep = sleep self.max_wait = max_wait self.projection = projection self.get_locked_doc = get_locked_doc @property def update_on_release(self) -> dict | list: """ The update_on_release value. Returns: dict | list: The value of update_on_release. """ return self._update_on_release @update_on_release.setter def update_on_release(self, value: dict | list): """ Set the value of update_on_release. If set its value will be used to update the document with additional properties upon lock release. For example: lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}} Cannot be set together with delete_on_release. Parameters ---------- value : dict | list The value to set for update_on_release. """ if self.delete_on_release: raise ValueError( "delete_on_release and update_on_release cannot be set simultaneously" ) self._update_on_release = value @property def delete_on_release(self) -> bool: """ The delete_on_release property. Returns: bool: Whether the document will be deleted upon lock release. """ return self._delete_on_release @delete_on_release.setter def delete_on_release(self, value: bool): """ Set the value of delete_on_release. If True the document will be deleted upon lock release. For example: lock.delete_on_release = True Cannot be set together with update_on_release. Parameters ---------- value : bool The value to set for delete_on_release. """ if self.update_on_release: raise ValueError( "delete_on_release and update_on_release cannot be set simultaneously" ) self._delete_on_release = value @property def is_locked(self) -> bool: """Return whether the document was locked before trying to acquire the lock. Notes ----- This method should be used only inside the "with" context. """ return self.locked_document is None
[docs] @classmethod def get_lock_time(cls, d: dict): """Get the time the document was locked on a dictionary.""" return d.get(cls.LOCK_TIME_KEY)
[docs] @classmethod def get_lock_id(cls, d: dict): """Get the lock id on a dictionary.""" return d.get(cls.LOCK_KEY)
[docs] def acquire(self) -> None: """Acquire the lock.""" # Set the lock expiration time now = datetime.utcnow() db_filter = copy.deepcopy(dict(self.filter)) projection = self.projection # if projecting always get the lock as well if projection: projection = list(projection) projection.extend([self.LOCK_KEY, self.lock_id]) # Modify the filter if the document should not be fetched if # the lock cannot be acquired. Otherwise, keep the original filter. if not self.break_lock and not self.sleep and not self.get_locked_doc: db_filter.update({self.LOCK_KEY: None}) # Prepare the update to be performed when acquiring the lock. # A combination of the input update and the setting of the lock. lock_set = {self.LOCK_KEY: self.lock_id, self.LOCK_TIME_KEY: now} update: dict[str, dict] = defaultdict(dict) if self.update: update.update(copy.deepcopy(self.update)) update["$set"].update(lock_set) # If the document should be fetched even if the lock could not be acquired # the updates should be made conditional. # Note: sleep needs to fetch the document, otherwise it is impossible to # determine if the filter did not return any document or if the document # was locked. if (self.sleep or self.get_locked_doc) and not self.break_lock: for operation, dict_vals in update.items(): for k, v in dict_vals.items(): cond = { "$cond": { "if": {"$gt": [f"${self.LOCK_KEY}", None]}, "then": f"${k}", "else": v, } } update[operation][k] = cond update = [dict(update)] # type: ignore[assignment] # Try to acquire the lock by updating the document with a unique identifier # and the lock expiration time logger.debug(f"try acquiring lock with filter: {db_filter}") t0 = time.time() while True: result = self.collection.find_one_and_update( db_filter, update, upsert=False, return_document=ReturnDocument.AFTER, **self.kwargs, ) if result: lock_acquired = self.get_lock_id(result) == self.lock_id if lock_acquired: self.locked_document = result break # if the lock could not be acquired optionally sleep or # exit if waited for enough time. if self.sleep and (time.time() - t0) < self.max_wait: logger.debug("sleeping") time.sleep(self.sleep) else: self.unavailable_document = result break else: # If no document the conditions could not be met. # Either the requested filter does not find match a document # or those fitting are locked. break
[docs] def release(self, exc_type, exc_val, exc_tb) -> None: """Release the lock.""" # TODO if failed to release the lock maybe retry before failing if self.locked_document is None: return # Release the lock by removing the unique identifier and lock expiration time update: list | dict = {"$set": {self.LOCK_KEY: None, self.LOCK_TIME_KEY: None}} # TODO maybe set on release only if no exception was raised? if self.update_on_release: # if an exception raised inside the context manager do not update the document if exc_type is not None: logger.warning( f"A {type(exc_type)} exception was raised while the document was locked. " f"The update_on_release {self.update_on_release} will not be applied." ) elif isinstance(self.update_on_release, list): update = [update, *self.update_on_release] else: update = deep_merge_dict(update, self.update_on_release) logger.debug(f"release lock with update: {update}") # if an exception raised inside the context manager do not delete the document if self.delete_on_release and exc_type is None: result = self.collection.delete_one( {"_id": self.locked_document["_id"], self.LOCK_KEY: self.lock_id} ) if result.deleted_count == 0: raise RuntimeError("Could not delete the locked document upon release") else: if self.delete_on_release and exc_type is not None: logger.warning( f"A {type(exc_type)} exception was raised while the document was locked. " f"The document will not be deleted, as instead requested by delete_on_release." ) result = self.collection.update_one( {"_id": self.locked_document["_id"], self.LOCK_KEY: self.lock_id}, update, upsert=False, ) # Check if the lock was successfully released if result.modified_count == 0: msg = ( f"Could not release lock for document {self.locked_document['_id']}" ) warnings.warn(msg, stacklevel=2) self.locked_document = None
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.locked_document: self.release(exc_type, exc_val, exc_tb)
[docs] class LockedDocumentError(Exception): """Exception to signal a problem when locking the document."""
[docs] class RunnerLockedError(LockedDocumentError): """Exception to signal a problem when locking a Runner document."""
[docs] @classmethod def from_runner_doc(cls, doc: dict, additional_msg: str | None = None): lock_id = doc[MongoLock.LOCK_KEY] lock_date = doc[MongoLock.LOCK_TIME_KEY] date_str = lock_date.isoformat(timespec="seconds") if lock_date else None msg = f"Runner document is locked with lock_id {lock_id} since {date_str} UTC." if additional_msg: msg += " " + additional_msg return cls(msg)
[docs] class JobLockedError(LockedDocumentError): """Exception to signal a problem when locking a Job document."""
[docs] @classmethod def from_job_doc(cls, doc: dict, additional_msg: str | None = None): lock_id = doc[MongoLock.LOCK_KEY] lock_date = doc[MongoLock.LOCK_TIME_KEY] date_str = lock_date.isoformat(timespec="seconds") if lock_date else None msg = f"Job with db_id {doc['db_id']} is locked with lock_id {lock_id} since {date_str} UTC." if additional_msg: msg += " " + additional_msg return cls(msg)
[docs] class FlowLockedError(LockedDocumentError): """Exception to signal a problem when locking a Flow document."""
[docs] @classmethod def from_flow_doc(cls, doc: dict, additional_msg: str | None = None): lock_id = doc[MongoLock.LOCK_KEY] lock_date = doc[MongoLock.LOCK_TIME_KEY] date_str = lock_date.isoformat(timespec="seconds") if lock_date else None msg = f"Flow with uuid {doc['uuid']} is locked with lock_id {lock_id} since {date_str} UTC." if additional_msg: msg += " " + additional_msg return cls(msg)
[docs] class MissingDocumentError(Exception): """Exception to signal that a document is missing from the DB"""
[docs] def mongo_operation( store: MongoStore | MongoURIStore, file_path: str | Path, operation: str, collection: str | None, mongo_bin_path: str | None = None, compress: bool = False, ) -> tuple[str, str]: """ Execute a mongo operation (mongodump or mongorestore) on a given store. Parameters ---------- store The store containing the data to be backed up. file_path The path of the folder where the backup files are located. operation The mongo operation to perform. Can be either 'mongodump' or 'mongorestore'. collection The name of the collection to be backed up. If None the collection defined in the store will be used. mongo_bin_path The path to the folder containing the mongo executable. If None, the executable is searched in the PATH. compress If True, the backup files are compressed. Returns ------- tuple[str, str] The stdout and stderr of the executed command. """ if operation not in ("mongodump", "mongorestore"): raise ValueError(f"Operation {operation} not supported") if mongo_bin_path: operation = os.path.join(mongo_bin_path, operation) if not which(operation): raise RuntimeError( f"It looks like the command {operation} is not available. Check the path or consider " f"installing the mongodb database tools. Alternatively use the pure python implementation." ) # here is not checked with isinstance(), because the current other subclasses will fail if type(store) is MongoURIStore: cmd = [ operation, "--uri", store.uri, ] elif type(store) is MongoStore: cmd = [ operation, "--host", store.host, "--port", str(store.port), ] if store.username and store.password: cmd.extend(["--username", store.username, "--password", store.password]) if store.auth_source: cmd.extend(["--authenticationDatabase", store.auth_source]) elif store.username or store.password: raise ValueError( "To use the mongotools the username and password in the queue store should be " "either both present or both absent." ) else: raise ValueError( f"Unsupported store type {type(store).__name__}. Consider using the python version." ) # check this afterwards, since other stores may not have ssh_tunnel attribute if store.ssh_tunnel is not None: raise NotImplementedError( "SSH tunnel is not supported. Consider using the python version." ) cmd.extend(["--db", store.database, "--collection", collection]) if compress: cmd.append("--gzip") if operation.endswith("mongodump"): cmd.extend(["--out", file_path]) elif operation.endswith("mongorestore"): cmd.append(file_path) str_cmd = " ".join(str(s) for s in cmd) result = subprocess.run( str_cmd, check=True, capture_output=True, text=True, shell=True ) logger.debug( f"output during execution of '{str_cmd}'. Stdout: {result.stdout}. Stderr: {result.stderr}" ) return result.stdout, result.stderr
[docs] def mongodump_from_store( store: MongoStore | MongoURIStore, output_path: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool = False, ) -> int: """ Use mongodump to dump a collection from a MongoDB store to a file. Parameters ---------- store The store containing the data to be backed up. output_path The path of the folder where the backup files are located. collection The name of the collection to be backed up. If None the collection defined in the store will be used. mongo_bin_path The path to the folder containing the mongo executable. If None, the executable is searched in the PATH. compress If True, the backup files are compressed. Returns ------- int The number of documents dumped. """ stdout, stderr = mongo_operation( store=store, collection=collection, file_path=output_path, operation="mongodump", mongo_bin_path=mongo_bin_path, compress=compress, ) # Extract the number of documents dumped match = re.search(r"done dumping .*\((\d+) documents*\)", stderr) return int(match.group(1)) if match else 0
[docs] def mongorestore_to_store( store: MongoStore | MongoURIStore, input_file: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool | None = None, ) -> None: """ Restore a collection from a BSON file using mongorestore. Parameters ---------- store The store where the data should be restored. input_file The path of the BSON file containing the data to be restored. collection The name of the collection to be backed up. If None the collection defined in the store will be used. mongo_bin_path The path to the folder containing the mongo executable. If None, the executable is searched in the PATH. compress If True, the input file is expected to be compressed. If None it will be determined based on the file extension. """ if compress is None: compress = Path(input_file).name.endswith(".gz") mongo_operation( store=store, collection=collection, file_path=input_file, operation="mongorestore", mongo_bin_path=mongo_bin_path, compress=compress, )
[docs] def pymongo_dump( collection: Collection, output_path: str | Path, compress: bool = False ) -> int: """ Dump the contents of a PyMongo collection to a BSON file. Parameters ---------- collection The PyMongo collection to be dumped. output_path The path of the folder where the BSON file should be saved. compress : bool If True, the output file is compressed. Returns ------- int The number of documents dumped. """ dir_path = Path(output_path) / collection.database.name dir_path.mkdir(exist_ok=True, parents=True) # add the db name for consistency with mongodump and use the collection name as file name file_name = f"{collection.name}.bson" if compress: file_name += ".gz" with zopen(dir_path / file_name, "wb") as f: num_documents = 0 for doc in collection.find(): f.write(bson.BSON.encode(doc)) num_documents += 1 return num_documents
[docs] def pymongo_restore(collection: Collection, input_file: str | Path) -> None: """ Restore the contents of a BSON file to a PyMongo collection. Parameters ---------- collection The PyMongo collection to be dumped. input_file The path of the BSON file used to restore the data. """ try: with zopen(input_file, "rb") as f: collection.insert_many(bson.decode_all(f.read())) except PyMongoError as e: raise RuntimeError(f"Error during PyMongo restore: {e!s}") from e except OSError as e: raise RuntimeError(f"Error reading from file: {e!s}") from e