Source code for jobflow_remote.utils.data

from __future__ import annotations

import os
from collections.abc import Mapping, MutableMapping
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any
from uuid import UUID

import maggma.stores  # required to enable subclass searching
from maggma.core.store import Store
from monty.json import MontyDecoder


[docs] def deep_merge_dict( d1: MutableMapping, d2: Mapping, path: list[str] | None = None, raise_on_conflicts: bool = True, inplace: bool = True, ) -> MutableMapping: """ Merge a dictionary d2 into a dictionary d1 recursively. Parameters ---------- d1 d2 path raise_on_conflicts inplace Returns ------- """ if not inplace: d1 = deepcopy(d1) if path is None: path = [] for key in d2: if key in d1: if isinstance(d1[key], Mapping) and isinstance(d2[key], Mapping): deep_merge_dict(d1[key], d2[key], [*path, str(key)]) elif d1[key] == d2[key]: pass # same leaf value elif raise_on_conflicts: raise ValueError(f"Conflict at {'.'.join([*path, str(key)])}") else: d1[key] = d2[key] else: d1[key] = d2[key] return d1
[docs] def remove_none(obj): if isinstance(obj, (list, tuple, set)): return type(obj)(remove_none(x) for x in obj if x is not None) if isinstance(obj, dict): return type(obj)( (remove_none(k), remove_none(v)) for k, v in obj.items() if k is not None and v is not None ) return obj
[docs] def check_dict_keywords(obj: Any, keywords: list[str]) -> bool: if isinstance(obj, (list, tuple, set)): return any(check_dict_keywords(x, keywords) for x in obj) if isinstance(obj, dict): for k, v in obj.items(): if isinstance(k, str) and any(k.startswith(kw) for kw in keywords): return True if check_dict_keywords(v, keywords): return True return False
[docs] def uuid_to_path( uuid: str, index: int | None = 1, num_subdirs: int = 3, subdir_len: int = 2 ): u = UUID(uuid) u_hex = u.hex # Split the digest into groups of "subdir_len" characters subdirs = [ u_hex[i : i + subdir_len] for i in range(0, num_subdirs * subdir_len, subdir_len) ] # add the index to the final dir name dir_name = f"{uuid}" if index is not None: dir_name += f"_{index}" # Combine root directory and subdirectories to form the final path return os.path.join(*subdirs, dir_name)
[docs] def store_from_dict(store_dict: dict) -> Store: if "@class" in store_dict and "@module" in store_dict: store = MontyDecoder().process_decoded(store_dict) if not isinstance(store, Store): raise ValueError( f"The converted object {store} is not an instance of a maggma Store" ) return store def all_subclasses(cl): return set(cl.__subclasses__()).union( [s for c in cl.__subclasses__() for s in all_subclasses(c)] ) all_stores = {s.__name__: s for s in all_subclasses(maggma.stores.Store)} return convert_store(store_dict, all_stores)
[docs] def convert_store(spec_dict: dict, valid_stores) -> Store: """ Build a store based on the dict spec configuration from JobFlow TODO expose the methods from jobflow and don't duplicate the code. """ _spec_dict = dict(spec_dict) store_type = _spec_dict.pop("type") for k, v in _spec_dict.items(): if isinstance(v, dict) and "type" in v: _spec_dict[k] = convert_store(v, valid_stores) return valid_stores[store_type](**_spec_dict)
[docs] def convert_utc_time(datetime_value: datetime) -> datetime: """ Convert a time in UTC (used in the DB) to the time zone of the system where the code is being executed. Parameters ---------- datetime_value a datetime object in UTC Returns ------- The datetime in the zone of the current system """ return datetime_value.replace(tzinfo=timezone.utc).astimezone(tz=None)
# TODO imported this from jobflow remote for backward compatibility. # remove this in favor of suid in the next release
[docs] def suuid() -> str: """ Generate a string UUID (universally unique identifier). Uses the UUID4 specification. Returns ------- str A UUID. """ from uuid import uuid4 return str(uuid4())