jobflow_remote.remote.data module#

jobflow_remote.remote.data.JOB_INIT_ARGS = {'config', 'config_updates', 'function', 'function_args', 'function_kwargs', 'hosts', 'index', 'metadata', 'metadata_updates', 'name', 'name_updates', 'output_schema', 'uuid'}#

A set of the arguments of the Job constructor which can be used to detect additional custom arguments

class jobflow_remote.remote.data.MinimalFileStore(path: str, **kwargs)[source]#

Bases: Store

A Minimal Store for access to a single file. Only methods required by jobflow-remote are implemented.

Parameters:

path – paths for json files to turn into a Store.

close() None[source]#

Closes any connections.

connect(force_reset: bool = False) None[source]#

Loads the files into the collection in memory.

count(criteria: dict | None = None) int[source]#

Counts the number of documents matching the query criteria.

Parameters:

criteria – PyMongo filter for documents to count in

ensure_index(key: str, unique: bool = False) bool[source]#

Tries to create an index and return true if it succeeded.

Parameters:
  • key – single key to index

  • unique – Whether or not this index contains only unique keys

Returns:

bool indicating if the index exists/was created

groupby(keys: list[str] | str, criteria: dict | None = None, properties: dict | list | None = None, sort: dict[str, Sort | int] | None = None, skip: int = 0, limit: int = 0) Iterator[tuple[dict, list[dict]]][source]#

Simple grouping function that will group documents by keys.

Parameters:
  • keys – fields to group documents

  • criteria – PyMongo filter for documents to search in

  • properties – properties to return in grouped documents

  • sort – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

  • skip – number documents to skip

  • limit – limit on total number of documents returned

Returns:

generator returning tuples of (dict, list of docs)

query(criteria: dict | None = None, properties: dict | list | None = None, sort: dict[str, Sort | int] | None = None, skip: int = 0, limit: int = 0) Iterator[dict][source]#

Queries the Store for a set of documents.

Parameters:
  • criteria – PyMongo filter for documents to search in

  • properties – properties to return in grouped documents

  • sort – Dictionary of sort order for fields. Keys are field names and values are 1 for ascending or -1 for descending.

  • skip – number documents to skip

  • limit – limit on total number of documents returned

read_file() list[source]#
remove_docs(criteria: dict)[source]#

Remove docs matching the query dictionary.

For a file-writable JSONStore, the json file is updated.

Parameters:

criteria – query dictionary to match

update(docs: list[dict] | dict, key: list | str | None = None) None[source]#

Update documents into the Store.

For a file-writable JSONStore, the json file is updated.

Parameters:
  • docs – the document or list of documents to update

  • key – field name(s) to determine uniqueness for a document, can be a list of multiple fields, a single field, or None if the Store’s key field is to be used

update_file()[source]#
class jobflow_remote.remote.data.MinimalMsgpackStore(path: str, **kwargs)[source]#

Bases: MinimalFileStore

Parameters:

path – paths for json files to turn into a Store.

property name: str#

Return a string representing this data source.

read_file() list[source]#

Helper method to read the contents of a msgpack file and generate a list of docs.

update_file() None[source]#

Updates the msgpack file when a write-like operation is performed.

class jobflow_remote.remote.data.MinimalMsgspecJSONStore(path: str, **kwargs)[source]#

Bases: MinimalFileStore

Parameters:

path – paths for json files to turn into a Store.

property name: str#

Return a string representing this data source.

read_file() list[source]#

Helper method to read the contents of a JSON file and generate a list of docs.

update_file() None[source]#

Updates the json file when a write-like operation is performed.

class jobflow_remote.remote.data.MinimalORJSONStore(path: str, **kwargs)[source]#

Bases: MinimalFileStore

Parameters:

path – paths for json files to turn into a Store.

property name: str#

Return a string representing this data source.

read_file() list[source]#

Helper method to read the contents of a JSON file and generate a list of docs.

update_file() None[source]#

Updates the json file when a write-like operation is performed.

class jobflow_remote.remote.data.StdJSONStore(paths, **kwargs)[source]#

Bases: JSONStore

Simple subclass of the JSONStore defining the serialization_default that cannot be dumped to json.

Parameters:
  • paths – paths for json files to turn into a Store

  • read_only

    whether this JSONStore is read only. When read_only=True, the JSONStore can still apply MongoDB-like writable operations (e.g. an update) because it behaves like a MemoryStore, but it will not write those changes to the file. On the other hand, if read_only=False (i.e., it is writeable), the JSON file will be automatically updated every time a write-like operation is performed.

    Note that when read_only=False, JSONStore only supports a single JSON file. If the file does not exist, it will be automatically created when the JSONStore is initialized.

  • serialization_option – option that will be passed to the orjson.dump when saving to the json the file.

  • serialization_default – default that will be passed to the orjson.dump when saving to the json the file.

  • encoding – Character encoding of files to be tracked by the store. The default (None) follows python’s default behavior, which is to determine the character encoding from the platform. This should work in the great majority of cases. However, if you encounter a UnicodeDecodeError, consider setting the encoding explicitly to ‘utf8’ or another encoding as appropriate.

jobflow_remote.remote.data.check_additional_stores(job: dict | Job, store: JobStore) list[str][source]#

Check if all the required additional stores have been defined in the output JobStore. If some are missing return the names of the missing Stores.

Parameters:
  • job – A Job or its serialized version.

  • store – The JobStore from where the references should be resolved.

Returns:

  • The list of names of the missing additional stores.

  • An empty list if no store is missing.

jobflow_remote.remote.data.decode_datetime(obj)[source]#
jobflow_remote.remote.data.default_orjson_serializer(obj: Any) Any[source]#
jobflow_remote.remote.data.encode_datetime(obj)[source]#
jobflow_remote.remote.data.get_job_path(job_id: str, index: int | None, base_path: str | Path | None = None) str[source]#
jobflow_remote.remote.data.get_remote_in_file(job, remote_store)[source]#
jobflow_remote.remote.data.get_remote_store(store: JobStore, work_dir: str | Path, config_dict: dict | None) JobStore[source]#
jobflow_remote.remote.data.get_remote_store_filenames(store: JobStore, config_dict: dict | None) list[str][source]#
jobflow_remote.remote.data.get_single_store(config_dict: dict | None, file_name: str, dir_path: str | Path) Store[source]#
jobflow_remote.remote.data.get_single_store_file_name(config_dict: dict | None, file_name: str) str[source]#
jobflow_remote.remote.data.get_store_file_paths(store: JobStore) list[str][source]#
jobflow_remote.remote.data.resolve_job_dict_args(job_dict: dict, store: JobStore) dict[source]#

Resolve the references in a serialized Job.

Similar to Job.resolve_args, but without the need to deserialize the Job. The references are resolved inplace.

Parameters:
  • job_dict – The serialized version of a Job.

  • store – The JobStore from where the references should be resolved.

Return type:

The updated version of the input dictionary with references resolved.

jobflow_remote.remote.data.update_store(store: JobStore, remote_store: JobStore, db_id: int) None[source]#