jobflow_remote.utils.db module#
- exception jobflow_remote.utils.db.FlowLockedError[source]#
Bases:
LockedDocumentError
Exception to signal a problem when locking a Flow document.
- exception jobflow_remote.utils.db.JobLockedError[source]#
Bases:
LockedDocumentError
Exception to signal a problem when locking a Job document.
- exception jobflow_remote.utils.db.LockedDocumentError[source]#
Bases:
Exception
Exception to signal a problem when locking the document.
- exception jobflow_remote.utils.db.MissingDocumentError[source]#
Bases:
Exception
Exception to signal that a document is missing from the DB
- class jobflow_remote.utils.db.MongoLock(collection: Collection, filter: Mapping[str, Any], update: Mapping[str, Any] | None = None, break_lock: bool = False, lock_id: str | None = None, sleep: int | None = None, max_wait: int = 600, projection: Mapping[str, Any] | Iterable[str] | None = None, get_locked_doc: bool = False, **kwargs)[source]#
Bases:
object
Context manager to lock a document in a MongoDB database.
- Main characteristics and functionalities:
Lock is acquired by setting a lock_id and lock_time value in the locked document.
Filter the document to select based on a query and sorting. It uses find_one_and_update, thus resulting in a single document locked.
Can wait for a lock to be released.
Can forcibly break an existing lock
Can return the locked document even if the lock could not be acquired (useful for determining if a document is locked or no document matches the query)
Accepts all the arguments that can be passed to find_one_and_update
A custom update can be performed on the document when acquiring the lock.
Allows to pass properties that will be set in the document at the moment of releasing the lock.
Lock id value can be customized. If not a randomly generated uuid is used.
Examples
Trying to acquire the lock on a document based on the state >>> with MongoLock(collection, {“state”: “READY”}) as lock: … print(lock.locked_document[“state”]) READY
If lock cannot be acquired (no document matching filter or that document is locked) the lock.locked_document is None.
>>> with MongoLock(collection, {"state": "READY"}) as lock: ... print(lock.locked_document) None
Wait for 60 seconds in case the required document is already locked. Check the status every 10 seconds. If lock cannot be acquired locked_document is None
>>> with MongoLock( collection, {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"}, sleep=10, max_wait=60, ) as lock: ... print(lock.locked_document) None
In case lock cannot be acquired expose the locked document
>>> with MongoLock( collection, {"uuid": "5b84228b-d019-47fe-b0a0-564b36aa85ed"}, get_locked_doc=True, ) as lock: ... print(lock.locked_document) None ... print(lock.unavailable_document['lock_id']) 8d68404f-c77a-461b-859c-40bb0af1979f
Set values in the document upon lock release
>>> with MongoLock(collection, {"state": "READY"}) as lock: ... if lock.locked_document: ... # Perform some operations based on the job... ... lock.update_on_release = {"$set": {"state": "CHECKED_OUT"}}
Delete the locked document upon lock release
>>> with MongoLock(collection, {"state": "READY"}) as lock: ... if lock.locked_document: ... # decide if the document should be deleted ... lock.delete_on_release = True
- Parameters:
collection – The MongoDB collection containing the document to lock.
filter – A MongoDB query to select the document.
update – A dictionary that will be passed to find_one_and_update to update the locked document at the moment of acquiring the lock.
break_lock – True if the context manager is allowed to forcibly break a lock.
lock_id – The is used for the lock in the document. If None a randomly generated uuid will be used.
sleep – The amount of second to sleep between consecutive checks while waiting for a lock to be released.
max_wait – The amount of seconds to wait for a lock to be released.
projection – The projection passed to the find_one_and_update that locks the document.
get_locked_doc – If True, if the lock cannot be acquired because the document matching the filter is already locked, the locked document will be fetched and set in the unavailable_document attribute.
kwargs – All the other args are passed to find_one_and_update.
- LOCK_KEY = 'lock_id'#
- LOCK_TIME_KEY = 'lock_time'#
- property delete_on_release: bool#
The delete_on_release property.
- Returns:
Whether the document will be deleted upon lock release.
- Return type:
- exception jobflow_remote.utils.db.RunnerLockedError[source]#
Bases:
LockedDocumentError
Exception to signal a problem when locking a Runner document.
- jobflow_remote.utils.db.mongo_operation(store: MongoStore | MongoURIStore, file_path: str | Path, operation: str, collection: str | None, mongo_bin_path: str | None = None, compress: bool = False) tuple[str, str] [source]#
Execute a mongo operation (mongodump or mongorestore) on a given store.
- Parameters:
store – The store containing the data to be backed up.
file_path – The path of the folder where the backup files are located.
operation – The mongo operation to perform. Can be either ‘mongodump’ or ‘mongorestore’.
collection – The name of the collection to be backed up. If None the collection defined in the store will be used.
mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.
compress – If True, the backup files are compressed.
- Returns:
The stdout and stderr of the executed command.
- Return type:
- jobflow_remote.utils.db.mongodump_from_store(store: MongoStore | MongoURIStore, output_path: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool = False) int [source]#
Use mongodump to dump a collection from a MongoDB store to a file.
- Parameters:
store – The store containing the data to be backed up.
output_path – The path of the folder where the backup files are located.
collection – The name of the collection to be backed up. If None the collection defined in the store will be used.
mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.
compress – If True, the backup files are compressed.
- Returns:
The number of documents dumped.
- Return type:
- jobflow_remote.utils.db.mongorestore_to_store(store: MongoStore | MongoURIStore, input_file: str | Path, collection: str | None = None, mongo_bin_path: str | None = None, compress: bool | None = None) None [source]#
Restore a collection from a BSON file using mongorestore.
- Parameters:
store – The store where the data should be restored.
input_file – The path of the BSON file containing the data to be restored.
collection – The name of the collection to be backed up. If None the collection defined in the store will be used.
mongo_bin_path – The path to the folder containing the mongo executable. If None, the executable is searched in the PATH.
compress – If True, the input file is expected to be compressed. If None it will be determined based on the file extension.