jobflow_remote.utils.db module#

exception jobflow_remote.utils.db.FlowLockedError[source]#

Bases: LockedDocumentError

Exception to signal a problem when locking a Flow document.

classmethod from_flow_doc(doc: dict, additional_msg: str | None = None)[source]#
exception jobflow_remote.utils.db.JobLockedError[source]#

Bases: LockedDocumentError

Exception to signal a problem when locking a Job document.

classmethod from_job_doc(doc: dict, additional_msg: str | None = None)[source]#
exception jobflow_remote.utils.db.LockedDocumentError[source]#

Bases: Exception

Exception to signal a problem when locking the document.

class jobflow_remote.utils.db.MongoLock(collection: Collection, filter: Mapping[str, Any], update: Mapping[str, Any] | None = None, break_lock: bool = False, lock_id: str | None = None, sleep: int | None = None, max_wait: int = 600, projection: Mapping[str, Any] | Iterable[str] | None = None, get_locked_doc: bool = False, **kwargs)[source]#

Bases: object

Context manager to lock a document in a MongoDB database.

Main characteristics and functionalities:
  • Lock is acquired by setting a lock_id and lock_time value in the locked document.

  • Filter the document to select based on a query and sorting. It uses find_one_and_update, thus resulting in a single document locked.

  • Can wait for a lock to be released.

  • Can forcibly break an existing lock

  • Can return the locked document even if the lock could not be acquired (useful for determining if a document is locked or no document matches the query)

  • Accepts all the arguments that can be passed to find_one_and_update

  • A custom update can be performed on the document when acquiring the lock.

  • Allows to pass properties that will be set in the document at the moment of releasing the lock.

  • Lock id value can be customized. If not a randomly generated uuid is used.

Examples

Trying to acquire the lock on a document based on the state >>> with MongoLock(collection, {“state”: “READY”}) as lock: … print(lock.locked_document[“state”]) READY

If lock cannot be acquired (no document matching filter or that document is locked) the lock.locked_document is None.

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     print(lock.locked_document)
None

Wait for 60 seconds in case the required document is already locked. Check the status every 10 seconds. If lock cannot be acquired locked_document is None

>>> with MongoLock(
        collection,
        {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
        sleep=10,
        max_wait=60,
    ) as lock:
...     print(lock.locked_document)
None

In case lock cannot be acquired expose the locked document

>>> with MongoLock(
        collection,
        {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
        get_locked_doc=True,
    ) as lock:
...     print(lock.locked_document)
None
...     print(lock.unavailable_document['lock_id'])
8d68404f-c77a-461b-859c-40bb0af1979f

Set values in the document upon lock release

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     if lock.locked_document:
...         # Perform some operations based on the job...
...         lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}}

Delete the locked document upon lock release

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     if lock.locked_document:
...         # decide if the document should be deleted
...         lock.delete_on_release = True
Parameters:
  • collection – The MongoDB collection containing the document to lock.

  • filter – A MongoDB query to select the document.

  • update – A dictionary that will be passed to find_one_and_update to update the locked document at the moment of acquiring the lock.

  • break_lock – True if the context manager is allowed to forcibly break a lock.

  • lock_id – The is used for the lock in the document. If None a randomly generated uuid will be used.

  • sleep – The amount of second to sleep between consecutive checks while waiting for a lock to be released.

  • max_wait – The amount of seconds to wait for a lock to be released.

  • projection – The projection passed to the find_one_and_update that locks the document.

  • get_locked_doc – If True, if the lock cannot be acquired because the document matching the filter is already locked, the locked document will be fetched and set in the unavailable_document attribute.

  • kwargs – All the other args are passed to find_one_and_update.

LOCK_KEY = 'lock_id'#
LOCK_TIME_KEY = 'lock_time'#
acquire() None[source]#

Acquire the lock.

property delete_on_release: bool#

The delete_on_release property.

Returns:

Whether the document will be deleted upon lock release.

Return type:

bool

classmethod get_lock_id(d: dict)[source]#

Get the lock id on a dictionary.

classmethod get_lock_time(d: dict)[source]#

Get the time the document was locked on a dictionary.

release(exc_type, exc_val, exc_tb) None[source]#

Release the lock.

property update_on_release: dict | list#

The update_on_release value.

Returns:

The value of update_on_release.

Return type:

dict | list