from __future__ import annotations
import copy
import os
from collections import UserDict
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
from ruamel.yaml import CommentedMap, scanner, YAML # pyre-ignore[21]
from ruamel.yaml.error import YAMLError # pyre-ignore[21]
from ruamel.yaml.representer import RoundTripRepresenter # pyre-ignore[21]
from ruamel.yaml.dumper import RoundTripDumper # pyre-ignore[21]
from onyo.lib.consts import PSEUDO_KEYS, RESERVED_KEYS
from onyo.lib.exceptions import NotAnAssetError
from onyo.lib.ui import ui
if TYPE_CHECKING:
from typing import (
Dict,
Generator,
Hashable,
Mapping,
Set,
TypeVar,
)
_KT = TypeVar("_KT") # Key type.
_VT = TypeVar("_VT") # Value type.
[docs]
class DotNotationWrapper(UserDict):
"""Dictionary wrapper for providing access to nested dictionaries via hierarchical keys.
This class wraps a dictionary (available from the attribute .data) to allow traversing
multidimensional dictionaries using a dot as the delimiter. In other words, it provides a view on the
flattened dictionary:
> d = {'key': 'value', 'nested': {'key': 'another value'}}
> wrapper = DotNotationWrapper(d)
> wrapper['nested.key']
'another value'
> list(wrapper.keys())
['key', 'nested.key']
Iteration only considers the flattened view, and keys that contain dictionaries will not be yielded when using
`wrapper.keys()`, `wrapper.values()`, and `wrapper.items()`. Whenever the python standard behavior is needed, the
underlying dictionary is available from the `.data` attribute.
"""
[docs]
def __init__(self, __dict: Mapping[_KT, _VT] | None = None, **kwargs: _VT) -> None:
if __dict and isinstance(__dict, dict):
super().__init__()
# If we have any sort of existing dict, we want to wrap it, maintaining the original object (and class).
# Note: Currently would modify wrapped dict w/ kwargs if both are given.
# Would need deepcopy to prevent this, but this kinda contradicts the idea of wrapping.
self.data = __dict
self.update(**kwargs)
else:
# Resort to `UserDict` behavior if we have any sort of sequence or just `kwargs`.
super().__init__(__dict, **kwargs)
def _keys(self) -> Generator[str, None, None]:
"""Recursively yield all keys from nested dicts in dot notation.
Note, that this forces the returned keys to be strings no matter their original type.
"""
def recursive_keys(d: dict):
for k in d.keys():
if hasattr(d[k], "keys"):
yield from (k + "." + sk for sk in recursive_keys(d[k]))
else:
# For the purpose of dot notation access, key types other than string don't make sense.
# One can't have a key 'some.1.more', where 1 remains an integer.
yield str(k)
yield from recursive_keys(self.data)
def __getitem__(self, key):
if isinstance(key, str):
parts = key.split('.')
effective_dict = self.data
if len(parts) > 1:
for lvl in range(len(parts) - 1):
try:
effective_dict = effective_dict[parts[lvl]]
except KeyError as e:
raise KeyError(f"'{'.'.join(parts[:lvl + 1])}'") from e
except TypeError as e:
raise TypeError(f"'{'.'.join(parts[:lvl])}' is not a dictionary.") from e
try:
return effective_dict[parts[-1]]
except KeyError as e:
raise KeyError(f"'{key}'") from e
except TypeError as e:
raise TypeError(f"'{'.'.join(parts[:-1])}' is not a dictionary.") from e
return super().__getitem__(key)
def __setitem__(self, key, item):
if isinstance(key, str):
parts = key.split('.')
effective_dict = self.data
if len(parts) > 1:
for lvl in range(len(parts) - 1):
try:
effective_dict = effective_dict[parts[lvl]]
except KeyError:
# nested dict doesn't exist yet
effective_dict[parts[lvl]] = dict()
effective_dict = effective_dict[parts[lvl]]
effective_dict[parts[-1]] = item
else:
super().__setitem__(key, item)
def __delitem__(self, key):
if isinstance(key, str):
parts = key.split('.')
effective_dict = self.data
if len(parts) > 1:
for lvl in range(len(parts) - 1):
try:
effective_dict = effective_dict[parts[lvl]]
except KeyError as e:
raise KeyError(f"'{'.'.join(parts[:lvl + 1])}'") from e
del effective_dict[parts[-1]]
else:
super().__delitem__(key)
def __contains__(self, key):
"""Whether `key` is in self.
Note, that this is `True` for intermediate keys (dicts), although `self.__iter__` wouldn't yield them.
"""
try:
self.__getitem__(key)
return True
except KeyError:
return False
def __iter__(self):
return self._keys()
def __len__(self):
return len(list(self._keys()))
[docs]
def deduplicate(sequence: list | None) -> list | None:
r"""Deduplicate a list and preserve its order.
The first occurrence of a value is kept. All later occurrences are
discarded.
For convenience, also accepts ``None`` and returns ``None`` in that case.
Parameters
----------
sequence
List to deduplicate.
"""
if not sequence:
return sequence
seen = set()
return [x for x in sequence if not (x in seen or seen.add(x))]
[docs]
def dict_to_asset_yaml(d: Dict | UserDict) -> str:
r"""Convert a dictionary to a YAML string, stripped of reserved-keys.
Dictionaries that contain a map of comments (ruamel, etc) will have those
comments included in the string.
See Also
--------
onyo.lib.consts.RESERVED_KEYS
Parameters
----------
d
Dictionary to strip of reserved-keys and convert to a YAML string.
"""
if isinstance(d, DotNotationWrapper):
d = d.data
# deepcopy to keep comments when `d` is `ruamel.yaml.comments.CommentedMap`.
content = copy.deepcopy(d)
for k in PSEUDO_KEYS + RESERVED_KEYS:
if k in content.keys():
del content[k]
# Empty dicts are serialized to '{}', and I was unable to find any input
# ('', None, etc) that would serialize to nothing. Hardcoding, though ugly,
# seems to be the only option.
if not content:
return '---\n'
from io import StringIO
yaml = YAML(typ='rt')
yaml.explicit_start = True
s = StringIO()
yaml.dump(content,
s)
return s.getvalue()
[docs]
def get_asset_content(asset_file: Path) -> dict[str, bool | float | int | str | Path]:
r"""Get the contents of an asset as a dictionary.
If the asset file's contents are not valid YAML, an error is printed.
Parameters
----------
asset_file
The Path of the asset file to get the contents of.
"""
yaml = YAML(typ='rt', pure=True)
contents = dict()
try:
contents = yaml.load(asset_file)
except YAMLError as e: # pyre-ignore[66]
# Remove ruaml usage pointer (see github issue 436)
if hasattr(e, 'note') and isinstance(e.note, str) and "suppress this check" in e.note:
e.note = ""
raise NotAnAssetError(f"Invalid YAML in {asset_file}:{os.linesep}{str(e)}") from e
if contents is None:
return dict()
if not isinstance(contents, (dict, CommentedMap)):
# For example: a simple text file may technically be valid YAML,
# but we may get a string instead of dict.
raise NotAnAssetError(f"{asset_file} does not appear to be an asset.")
return contents
[docs]
def get_temp_file() -> Path:
r"""Create and return the Path of a new temporary file.
"""
from tempfile import mkstemp
fd, tmp_path = mkstemp(prefix='onyo_', suffix='.yaml', text=True)
return Path(tmp_path)
[docs]
def has_unique_names(asset_files: Set[Path]) -> bool:
r"""Check files for unique file names.
If duplicates are found, an error is printed listing them.
Parameters
----------
asset_files
A set of files to check for the uniqueness of their file names.
"""
asset_names = [a.name for a in asset_files]
duplicates = [a for a in asset_files if asset_names.count(a.name) > 1]
duplicates.sort(key=lambda x: x.name)
if duplicates:
ui.error('The following file names are not unique:\n{}'.format(
'\n'.join(map(str, duplicates))))
return False
return True
[docs]
def validate_yaml(asset_files: list[Path] | None) -> bool:
r"""Check files for valid YAML.
If files with invalid YAML are detected, an error is printed listing them.
Parameters
----------
asset_files
A list of files to check for valid YAML.
"""
# Note: Does not (and cannot) account for asset dirs automatically in this form.
# Thus needs to be done by caller.
# Note: assumes absolute paths!
invalid_yaml = []
asset_files = asset_files or []
for asset in asset_files:
# TODO: use valid_yaml()
try:
YAML(typ='rt').load(asset)
except scanner.ScannerError: # pyre-ignore[66]
invalid_yaml.append(str(asset))
if invalid_yaml:
ui.error('The following files fail YAML validation:\n{}'.format(
'\n'.join(invalid_yaml)))
return False
return True
[docs]
def write_asset_file(path: Path,
asset: Dict | UserDict) -> None:
r"""Write content to an asset file.
All ``RESERVED_KEYS`` will be stripped from the content before writing.
Parameters
----------
path
The Path to write content to.
asset
A dictionary of content to write to the path.
"""
path.open('w').write(dict_to_asset_yaml(asset))
[docs]
class YAMLDumpWrapper(UserDict):
r"""Wrapper class for asset dicts accessing ruamel's representation of data rather than the provided object.
This works around the issue that something like `serial: 001234` yields a `{'serial': 1234}` but is dumped as
`serial: 001234`, which messes up onyo's comparisons for whether there's a modification of an asset.
"""
[docs]
def __init__(self, d: dict | UserDict):
super().__init__(d)
def __getitem__(self, item: Hashable):
data = self.data[item] # potentially raises KeyError
if isinstance(data, dict) and data:
# non-empty dict: recurse
return YAMLDumpWrapper(data)
if isinstance(data, list) and data:
# non-empty list: Implement analogous wrapper
raise NotImplementedError
if isinstance(data, Path):
return data # no representer for `Path`
return RoundTripRepresenter(dumper=RoundTripDumper(stream=StringIO())).represent_data(data).value
[docs]
def is_equal_assets_dict(a: Dict | UserDict, b: Dict | UserDict) -> bool:
r"""Whether two asset dictionaries have the same content.
This accounts for comments in YAML.
For this to return `True`, both assets need to
be equal not only in terms of their key-value pairs,
but also in terms of annotated comments.
This also accounts for nested dicts recursively.
"""
# Note: Checking types here, because of potential recursive calls.
if not isinstance(a, (dict, UserDict)) or not isinstance(b, (dict, UserDict)):
return False
# TODO: This may become part of (thin) Asset class instead,
# if there are more reasons to have such a class.
if isinstance(a, DotNotationWrapper):
a = a.data
if isinstance(b, DotNotationWrapper):
b = b.data
# Recurse into nested dicts:
for k, v in a.items():
if isinstance(v, (dict, UserDict)):
try:
eq_ = is_equal_assets_dict(a[k], b[k])
except (KeyError, TypeError):
eq_ = False
if not eq_:
return False
if not isinstance(a, CommentedMap) and not isinstance(b, CommentedMap):
return a == b
if YAMLDumpWrapper(a) != YAMLDumpWrapper(b):
# not accounting for comments yet
return False
# Note, that ruamel does appear to implement `__eq__` and `__contains__` for the relevant objects.
# However, it all breaks when comparing across files, b/c the `start_mark` attribute of a `CommentToken`
# is a `FileMark` (when the YAML was read from file, ofc) which holds the path. So, comparison is only
# ever equal if both `CommentedMap` are pointing to the same file.
# Hence, we need our own comparison, that ignores the path.
from ruamel.yaml.comments import Comment, comment_attrib # pyre-ignore[21] pyre doesn't find a comments module
from ruamel.yaml import CommentToken
a_comment = getattr(a, comment_attrib, Comment())
b_comment = getattr(b, comment_attrib, Comment())
def contains_all_comments(container: Comment, comment: Comment) -> bool: # pyre-ignore[11] - see `Comment` import
for a_key, a_values in comment.items.items():
try:
b_values = container.items.get(a_key)
except KeyError:
# `a` has an annotation at a key that's not in `b`'s annotations
return False
if not b_values:
return False
if len(a_values) != len(b_values):
# Not sure whether this is necessary (may be a defined, fixed length),
# but better be safe.
return False
for a_v, b_v in zip(a_values, b_values):
if type(a_v) is not type(b_v):
return False
if isinstance(a_v, CommentToken) and \
(a_v.value, a_v.start_mark.line, a_v.start_mark.column) != \
(b_v.value, b_v.start_mark.line, b_v.start_mark.column):
return False
return True
return contains_all_comments(b_comment, a_comment) and contains_all_comments(a_comment, b_comment)