jobflow_remote.utils.db module#

exception jobflow_remote.utils.db.FlowLockedError[source]#

Bases: LockedDocumentError

Exception to signal a problem when locking a Flow document.

classmethod from_flow_doc(doc: dict, additional_msg: str | None = None)[source]#
exception jobflow_remote.utils.db.JobLockedError[source]#

Bases: LockedDocumentError

Exception to signal a problem when locking a Job document.

classmethod from_job_doc(doc: dict, additional_msg: str | None = None)[source]#
exception jobflow_remote.utils.db.LockedDocumentError[source]#

Bases: Exception

Exception to signal a problem when locking the document.

exception jobflow_remote.utils.db.MissingDocumentError[source]#

Bases: Exception

Exception to signal that a document is missing from the DB

class jobflow_remote.utils.db.MongoLock(collection: Collection, filter: Mapping[str, Any], update: Mapping[str, Any] | None = None, break_lock: bool = False, lock_id: str | None = None, sleep: int | None = None, max_wait: int = 600, projection: Mapping[str, Any] | Iterable[str] | None = None, get_locked_doc: bool = False, **kwargs)[source]#

Bases: object

Context manager to lock a document in a MongoDB database.

Main characteristics and functionalities:
  • Lock is acquired by setting a lock_id and lock_time value in the locked document.

  • Filter the document to select based on a query and sorting. It uses find_one_and_update, thus resulting in a single document locked.

  • Can wait for a lock to be released.

  • Can forcibly break an existing lock

  • Can return the locked document even if the lock could not be acquired (useful for determining if a document is locked or no document matches the query)

  • Accepts all the arguments that can be passed to find_one_and_update

  • A custom update can be performed on the document when acquiring the lock.

  • Allows to pass properties that will be set in the document at the moment of releasing the lock.

  • Lock id value can be customized. If not a randomly generated uuid is used.

Examples

Trying to acquire the lock on a document based on the state >>> with MongoLock(collection, {“state”: “READY”}) as lock: … print(lock.locked_document[“state”]) READY

If lock cannot be acquired (no document matching filter or that document is locked) the lock.locked_document is None.

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     print(lock.locked_document)
None

Wait for 60 seconds in case the required document is already locked. Check the status every 10 seconds. If lock cannot be acquired locked_document is None

>>> with MongoLock(
        collection,
        {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
        sleep=10,
        max_wait=60,
    ) as lock:
...     print(lock.locked_document)
None

In case lock cannot be acquired expose the locked document

>>> with MongoLock(
        collection,
        {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"},
        get_locked_doc=True,
    ) as lock:
...     print(lock.locked_document)
None
...     print(lock.unavailable_document['lock_id'])
8d68404f-c77a-461b-859c-40bb0af1979f

Set values in the document upon lock release

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     if lock.locked_document:
...         # Perform some operations based on the job...
...         lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}}

Delete the locked document upon lock release

>>> with MongoLock(collection, {"state": "READY"}) as lock:
...     if lock.locked_document:
...         # decide if the document should be deleted
...         lock.delete_on_release = True
Parameters:
  • collection – The MongoDB collection containing the document to lock.

  • filter – A MongoDB query to select the document.

  • update – A dictionary that will be passed to find_one_and_update to update the locked document at the moment of acquiring the lock.

  • break_lock – True if the context manager is allowed to forcibly break a lock.

  • lock_id – The is used for the lock in the document. If None a randomly generated uuid will be used.

  • sleep – The amount of second to sleep between consecutive checks while waiting for a lock to be released.

  • max_wait – The amount of seconds to wait for a lock to be released.

  • projection – The projection passed to the find_one_and_update that locks the document.

  • get_locked_doc – If True, if the lock cannot be acquired because the document matching the filter is already locked, the locked document will be fetched and set in the unavailable_document attribute.

  • kwargs – All the other args are passed to find_one_and_update.

LOCK_KEY = 'lock_id'#
LOCK_TIME_KEY = 'lock_time'#
acquire() None[source]#

Acquire the lock.

property delete_on_release: bool#

The delete_on_release property.

Returns:

Whether the document will be deleted upon lock release.

Return type:

bool

classmethod get_lock_id(d: dict)[source]#

Get the lock id on a dictionary.

classmethod get_lock_time(d: dict)[source]#

Get the time the document was locked on a dictionary.

property is_locked: bool#

Return whether the document was locked before trying to acquire the lock.

Notes

This method should be used only inside the “with” context.

release(exc_type, exc_val, exc_tb) None[source]#

Release the lock.

property update_on_release: dict | list#

The update_on_release value.

Returns:

The value of update_on_release.

Return type:

dict | list

exception jobflow_remote.utils.db.RunnerLockedError[source]#

Bases: LockedDocumentError

Exception to signal a problem when locking a Runner document.

classmethod from_runner_doc(doc: dict, additional_msg: str | None = None)[source]#
jobflow_remote.utils.db.mongo_operation(store: MongoStore | MongoURIStore, file_path: str | Path, operation: str, collection: str | None, mongo_bin_path: str | None = None, compress: bool = False) tuple[str, str][source]#

Execute a mongo operation (mongodump or mongorestore) on a given store.

Parameters:
  • store – The store containing the data to be backed up.

  • file_path – The path of the folder where the backup files are located.

  • operation – The mongo operation to perform. Can be either ‘mongodump’ or ‘mongorestore’.

  • collection – The name of the collection to be backed up. If None the collection defined in the store will be used.

  • mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.

  • compress – If True, the backup files are compressed.

Returns:

The stdout and stderr of the executed command.

Return type:

tuple[str, str]

jobflow_remote.utils.db.mongodump_from_store(store: MongoStore | MongoURIStore, output_path: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool = False) int[source]#

Use mongodump to dump a collection from a MongoDB store to a file.

Parameters:
  • store – The store containing the data to be backed up.

  • output_path – The path of the folder where the backup files are located.

  • collection – The name of the collection to be backed up. If None the collection defined in the store will be used.

  • mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.

  • compress – If True, the backup files are compressed.

Returns:

The number of documents dumped.

Return type:

int

jobflow_remote.utils.db.mongorestore_to_store(store: MongoStore | MongoURIStore, input_file: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool | None = None) None[source]#

Restore a collection from a BSON file using mongorestore.

Parameters:
  • store – The store where the data should be restored.

  • input_file – The path of the BSON file containing the data to be restored.

  • collection – The name of the collection to be backed up. If None the collection defined in the store will be used.

  • mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.

  • compress – If True, the input file is expected to be compressed. If None it will be determined based on the file extension.

jobflow_remote.utils.db.pymongo_dump(collection: Collection, output_path: str | Path, compress: bool = False) int[source]#

Dump the contents of a PyMongo collection to a BSON file.

Parameters:
  • collection – The PyMongo collection to be dumped.

  • output_path – The path of the folder where the BSON file should be saved.

  • compress (bool) – If True, the output file is compressed.

Returns:

The number of documents dumped.

Return type:

int

jobflow_remote.utils.db.pymongo_restore(collection: Collection, input_file: str | Path) None[source]#

Restore the contents of a BSON file to a PyMongo collection.

Parameters:
  • collection – The PyMongo collection to be dumped.

  • input_file – The path of the BSON file used to restore the data.