from __future__ import annotations
import copy
import logging
import time
import warnings
from collections import defaultdict
from datetime import datetime
from typing import TYPE_CHECKING, Any
from pymongo import ReturnDocument
from jobflow_remote.utils.data import deep_merge_dict, suuid
if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
from pymongo.collection import Collection
logger = logging.getLogger(__name__)
[docs]
class MongoLock:
"""
Context manager to lock a document in a MongoDB database.
Main characteristics and functionalities:
* Lock is acquired by setting a lock_id and lock_time value in the
locked document.
* Filter the document to select based on a query and sorting.
It uses find_one_and_update, thus resulting in a single document locked.
* Can wait for a lock to be released.
* Can forcibly break an existing lock
* Can return the locked document even if the lock could not be acquired
(useful for determining if a document is locked or no document matches
the query)
* Accepts all the arguments that can be passed to find_one_and_update
* A custom update can be performed on the document when acquiring the lock.
* Allows to pass properties that will be set in the document at the moment
of releasing the lock.
* Lock id value can be customized. If not a randomly generated uuid is used.
Examples
--------
Trying to acquire the lock on a document based on the state
>>> with MongoLock(collection, {"state": "READY"}) as lock:
... print(lock.locked_document["state"])
READY
If lock cannot be acquired (no document matching filter or that
document is locked) the `lock.locked_document` is None.
>>> with MongoLock(collection, {"state": "READY"}) as lock:
... print(lock.locked_document)
None
Wait for 60 seconds in case the required document is already locked.
Check the status every 10 seconds. If lock cannot be acquired
locked_document is None
>>> with MongoLock(
collection,
{"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
sleep=10,
max_wait=60,
) as lock:
... print(lock.locked_document)
None
In case lock cannot be acquired expose the locked document
>>> with MongoLock(
collection,
{"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
get_locked_doc=True,
) as lock:
... print(lock.locked_document)
None
... print(lock.unavailable_document['lock_id'])
8d68404f-c77a-461b-859c-40bb0af1979f
Set values in the document upon lock release
>>> with MongoLock(collection, {"state": "READY"}) as lock:
... if lock.locked_document:
... # Perform some operations based on the job...
... lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}}
Delete the locked document upon lock release
>>> with MongoLock(collection, {"state": "READY"}) as lock:
... if lock.locked_document:
... # decide if the document should be deleted
... lock.delete_on_release = True
"""
LOCK_KEY = "lock_id"
LOCK_TIME_KEY = "lock_time"
def __init__(
self,
collection: Collection,
filter: Mapping[str, Any], # noqa: A002
update: Mapping[str, Any] | None = None,
break_lock: bool = False,
lock_id: str | None = None,
sleep: int | None = None,
max_wait: int = 600,
projection: Mapping[str, Any] | Iterable[str] | None = None,
get_locked_doc: bool = False,
**kwargs,
) -> None:
"""
Parameters
----------
collection
The MongoDB collection containing the document to lock.
filter
A MongoDB query to select the document.
update
A dictionary that will be passed to find_one_and_update to update the
locked document at the moment of acquiring the lock.
break_lock
True if the context manager is allowed to forcibly break a lock.
lock_id
The is used for the lock in the document. If None a randomly generated
uuid will be used.
sleep
The amount of second to sleep between consecutive checks while waiting
for a lock to be released.
max_wait
The amount of seconds to wait for a lock to be released.
projection
The projection passed to the find_one_and_update that locks the document.
get_locked_doc
If True, if the lock cannot be acquired because the document matching
the filter is already locked, the locked document will be fetched and
set in the unavailable_document attribute.
kwargs
All the other args are passed to find_one_and_update.
"""
self.collection = collection
self.filter = filter or {}
self.update = update
self.break_lock = break_lock
self.locked_document = None
self.unavailable_document = None
self.lock_id = lock_id or suuid()
self.kwargs = kwargs
self._update_on_release: dict | list = {}
self._delete_on_release: bool = False
self.sleep = sleep
self.max_wait = max_wait
self.projection = projection
self.get_locked_doc = get_locked_doc
@property
def update_on_release(self) -> dict | list:
"""
The update_on_release value.
Returns:
dict | list: The value of update_on_release.
"""
return self._update_on_release
@update_on_release.setter
def update_on_release(self, value: dict | list):
"""
Set the value of update_on_release.
If set its value will be used to update the document with additional
properties upon lock release.
For example:
lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}}
Cannot be set together with delete_on_release.
Parameters
----------
value : dict | list
The value to set for update_on_release.
"""
if self.delete_on_release:
raise ValueError(
"delete_on_release and update_on_release cannot be set simultaneously"
)
self._update_on_release = value
@property
def delete_on_release(self) -> bool:
"""
The delete_on_release property.
Returns:
bool: Whether the document will be deleted upon lock release.
"""
return self._delete_on_release
@delete_on_release.setter
def delete_on_release(self, value: bool):
"""
Set the value of delete_on_release.
If True the document will be deleted upon lock release.
For example:
lock.delete_on_release = True
Cannot be set together with update_on_release.
Parameters
----------
value : bool
The value to set for delete_on_release.
"""
if self.update_on_release:
raise ValueError(
"delete_on_release and update_on_release cannot be set simultaneously"
)
self._delete_on_release = value
[docs]
@classmethod
def get_lock_time(cls, d: dict):
"""Get the time the document was locked on a dictionary."""
return d.get(cls.LOCK_TIME_KEY)
[docs]
@classmethod
def get_lock_id(cls, d: dict):
"""Get the lock id on a dictionary."""
return d.get(cls.LOCK_KEY)
[docs]
def acquire(self) -> None:
"""Acquire the lock."""
# Set the lock expiration time
now = datetime.utcnow()
db_filter = copy.deepcopy(dict(self.filter))
projection = self.projection
# if projecting always get the lock as well
if projection:
projection = list(projection)
projection.extend([self.LOCK_KEY, self.lock_id])
# Modify the filter if the document should not be fetched if
# the lock cannot be acquired. Otherwise, keep the original filter.
if not self.break_lock and not self.sleep and not self.get_locked_doc:
db_filter.update({self.LOCK_KEY: None})
# Prepare the update to be performed when acquiring the lock.
# A combination of the input update and the setting of the lock.
lock_set = {self.LOCK_KEY: self.lock_id, self.LOCK_TIME_KEY: now}
update: dict[str, dict] = defaultdict(dict)
if self.update:
update.update(copy.deepcopy(self.update))
update["$set"].update(lock_set)
# If the document should be fetched even if the lock could not be acquired
# the updates should be made conditional.
# Note: sleep needs to fetch the document, otherwise it is impossible to
# determine if the filter did not return any document or if the document
# was locked.
if (self.sleep or self.get_locked_doc) and not self.break_lock:
for operation, dict_vals in update.items():
for k, v in dict_vals.items():
cond = {
"$cond": {
"if": {"$gt": [f"${self.LOCK_KEY}", None]},
"then": f"${k}",
"else": v,
}
}
update[operation][k] = cond
update = [dict(update)] # type: ignore[assignment]
# Try to acquire the lock by updating the document with a unique identifier
# and the lock expiration time
logger.debug(f"try acquiring lock with filter: {db_filter}")
t0 = time.time()
while True:
result = self.collection.find_one_and_update(
db_filter,
update,
upsert=False,
return_document=ReturnDocument.AFTER,
**self.kwargs,
)
if result:
lock_acquired = self.get_lock_id(result) == self.lock_id
if lock_acquired:
self.locked_document = result
break
# if the lock could not be acquired optionally sleep or
# exit if waited for enough time.
if self.sleep and (time.time() - t0) < self.max_wait:
logger.debug("sleeping")
time.sleep(self.sleep)
else:
self.unavailable_document = result
break
else:
# If no document the conditions could not be met.
# Either the requested filter does not find match a document
# or those fitting are locked.
break
[docs]
def release(self, exc_type, exc_val, exc_tb) -> None:
"""Release the lock."""
# TODO if failed to release the lock maybe retry before failing
if self.locked_document is None:
return
# Release the lock by removing the unique identifier and lock expiration time
update: list | dict = {"$set": {self.LOCK_KEY: None, self.LOCK_TIME_KEY: None}}
# TODO maybe set on release only if no exception was raised?
if self.update_on_release:
# if an exception raised inside the context manager do not update the document
if exc_type is not None:
logger.warning(
f"A {type(exc_type)} exception was raised while the document was locked. "
f"The update_on_release {self.update_on_release} will not be applied."
)
elif isinstance(self.update_on_release, list):
update = [update, *self.update_on_release]
else:
update = deep_merge_dict(update, self.update_on_release)
logger.debug(f"release lock with update: {update}")
# if an exception raised inside the context manager do not delete the document
if self.delete_on_release and exc_type is None:
result = self.collection.delete_one(
{"_id": self.locked_document["_id"], self.LOCK_KEY: self.lock_id}
)
if result.deleted_count == 0:
raise RuntimeError("Could not delete the locked document upon release")
else:
if self.delete_on_release and exc_type is not None:
logger.warning(
f"A {type(exc_type)} exception was raised while the document was locked. "
f"The document will not be deleted, as instead requested by delete_on_release."
)
result = self.collection.update_one(
{"_id": self.locked_document["_id"], self.LOCK_KEY: self.lock_id},
update,
upsert=False,
)
# Check if the lock was successfully released
if result.modified_count == 0:
msg = (
f"Could not release lock for document {self.locked_document['_id']}"
)
warnings.warn(msg, stacklevel=2)
self.locked_document = None
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.locked_document:
self.release(exc_type, exc_val, exc_tb)
[docs]
class LockedDocumentError(Exception):
"""Exception to signal a problem when locking the document."""
[docs]
class JobLockedError(LockedDocumentError):
"""Exception to signal a problem when locking a Job document."""
[docs]
@classmethod
def from_job_doc(cls, doc: dict, additional_msg: str | None = None):
lock_id = doc[MongoLock.LOCK_KEY]
lock_date = doc[MongoLock.LOCK_TIME_KEY]
date_str = lock_date.isoformat(timespec="seconds") if lock_date else None
msg = f"Job with db_id {doc['db_id']} is locked with lock_id {lock_id} since {date_str} UTC."
if additional_msg:
msg += " " + additional_msg
return cls(msg)
[docs]
class FlowLockedError(LockedDocumentError):
"""Exception to signal a problem when locking a Flow document."""
[docs]
@classmethod
def from_flow_doc(cls, doc: dict, additional_msg: str | None = None):
lock_id = doc[MongoLock.LOCK_KEY]
lock_date = doc[MongoLock.LOCK_TIME_KEY]
date_str = lock_date.isoformat(timespec="seconds") if lock_date else None
msg = f"Flow with uuid {doc['uuid']} is locked with lock_id {lock_id} since {date_str} UTC."
if additional_msg:
msg += " " + additional_msg
return cls(msg)