Source code for

from __future__ import annotations

import os
from import Mapping, MutableMapping
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any
from uuid import UUID

import maggma.stores  # required to enable subclass searching
from dateutil.relativedelta import relativedelta
from import gettz
from import Store
from monty.json import MontyDecoder

[docs] def deep_merge_dict( d1: MutableMapping, d2: Mapping, path: list[str] | None = None, raise_on_conflicts: bool = True, inplace: bool = True, ) -> MutableMapping: """ Merge a dictionary d2 into a dictionary d1 recursively. Parameters ---------- d1 d2 path raise_on_conflicts inplace Returns ------- """ if not inplace: d1 = deepcopy(d1) if path is None: path = [] for key in d2: if key in d1: if isinstance(d1[key], Mapping) and isinstance(d2[key], Mapping): deep_merge_dict(d1[key], d2[key], [*path, str(key)]) elif d1[key] == d2[key]: pass # same leaf value elif raise_on_conflicts: raise ValueError(f"Conflict at {'.'.join([*path, str(key)])}") else: d1[key] = d2[key] else: d1[key] = d2[key] return d1
[docs] def remove_none(obj): if isinstance(obj, (list, tuple, set)): return type(obj)(remove_none(x) for x in obj if x is not None) if isinstance(obj, dict): return type(obj)( (remove_none(k), remove_none(v)) for k, v in obj.items() if k is not None and v is not None ) return obj
[docs] def check_dict_keywords(obj: Any, keywords: list[str]) -> bool: if isinstance(obj, (list, tuple, set)): return any(check_dict_keywords(x, keywords) for x in obj) if isinstance(obj, dict): for k, v in obj.items(): if isinstance(k, str) and any(k.startswith(kw) for kw in keywords): return True if check_dict_keywords(v, keywords): return True return False
[docs] def uuid_to_path( uuid: str, index: int | None = 1, num_subdirs: int = 3, subdir_len: int = 2 ): u = UUID(uuid) u_hex = u.hex # Split the digest into groups of "subdir_len" characters subdirs = [ u_hex[i : i + subdir_len] for i in range(0, num_subdirs * subdir_len, subdir_len) ] # add the index to the final dir name dir_name = f"{uuid}" if index is not None: dir_name += f"_{index}" # Combine root directory and subdirectories to form the final path return os.path.join(*subdirs, dir_name)
[docs] def store_from_dict(store_dict: dict) -> Store: if "@class" in store_dict and "@module" in store_dict: store = MontyDecoder().process_decoded(store_dict) if not isinstance(store, Store): raise ValueError( f"The converted object {store} is not an instance of a maggma Store" ) return store def all_subclasses(cl): return set(cl.__subclasses__()).union( [s for c in cl.__subclasses__() for s in all_subclasses(c)] ) all_stores = {s.__name__: s for s in all_subclasses(maggma.stores.Store)} return convert_store(store_dict, all_stores)
[docs] def convert_store(spec_dict: dict, valid_stores) -> Store: """ Build a store based on the dict spec configuration from JobFlow TODO expose the methods from jobflow and don't duplicate the code. """ _spec_dict = dict(spec_dict) store_type = _spec_dict.pop("type") for k, v in _spec_dict.items(): if isinstance(v, dict) and "type" in v: _spec_dict[k] = convert_store(v, valid_stores) return valid_stores[store_type](**_spec_dict)
[docs] def convert_utc_time(datetime_value: datetime) -> datetime: """ Convert a time in UTC (used in the DB) to the time zone of the system where the code is being executed. Parameters ---------- datetime_value a datetime object in UTC Returns ------- The datetime in the zone of the current system """ return datetime_value.replace(tzinfo=timezone.utc).astimezone(tz=None)
[docs] def get_past_time_rounded( interval: str, num_intervals: int, reference: datetime | None = None ) -> datetime: """ Return a datetime object that is the specified number of intervals in the past relative to the given reference datetime. The returned datetime is rounded to the nearest interval start time. Parameters ---------- interval One of 'hours', 'days', 'weeks', 'months', 'years' num_intervals The number of intervals to go back in time reference The datetime to use as the reference for the calculation. If not specified, the current time is used. Returns ------- datetime The datetime object that is the specified number of intervals in the past relative to the given reference datetime. """ if not reference: reference = datetime.utcnow() past = reference - relativedelta(**{interval: num_intervals - 1}) # type: ignore[arg-type] # Define starting point modifications based on quantity type start_modifiers = { "hours": lambda dt: dt.replace(minute=0, second=0, microsecond=0), "days": lambda dt: dt.replace(hour=0, minute=0, second=0, microsecond=0), "weeks": lambda dt: dt.replace(hour=0, minute=0, second=0, microsecond=0) - relativedelta(days=dt.weekday()), "months": lambda dt: dt.replace( day=1, hour=0, minute=0, second=0, microsecond=0 ), "years": lambda dt: dt.replace( month=1, day=1, hour=0, minute=0, second=0, microsecond=0 ), } return start_modifiers[interval](past)
[docs] def get_utc_offset(timezone: str): # Get current time in the specified timezone """ Return the UTC offset of the given timezone as a string. Parameters ---------- timezone The timezone for which to get the UTC offset. Returns ------- str The UTC offset as a string in the format +/-HH:MM. """ tz_info_val = gettz(timezone) if not tz_info_val: raise ValueError(f"Could not determine the timezone for {timezone}") now = # Get the UTC offset utc_offset = now.utcoffset() # Extract hours and minutes hours, remainder = divmod(utc_offset.total_seconds(), 3600) minutes = remainder // 60 return f"{int(hours):+03d}:{int(minutes):02d}"
# TODO imported this from jobflow remote for backward compatibility. # remove this in favor of suid in the next release
[docs] def suuid() -> str: """ Generate a string UUID (universally unique identifier). Uses the UUID4 specification. Returns ------- str A UUID. """ from uuid import uuid4 return str(uuid4())