music-kraken-core/music_kraken/objects/collection.py
Lars Noack 9c369b421d
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat: oh no
2024-05-03 14:52:12 +02:00

253 lines
8.9 KiB
Python

from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from .parents import OuterProxy
from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy)
class Collection(Generic[T]):
__is_collection__ = True
_data: List[T]
_indexed_from_id: Dict[int, Dict[str, Any]]
_indexed_values: Dict[str, Dict[Any, T]]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None,
) -> None:
self._collection_for: dict = dict()
self._contains_ids = set()
self._data = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
self.extend(data)
def __repr__(self) -> str:
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
for name, value in __object.indexing_values:
if value is None or value == __object._inner._default_values.get(name):
continue
self._indexed_values[name][value] = __object
self._indexed_from_id[__object.id][name] = value
def _unmap_element(self, __object: Union[T, int]):
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
if obj_id not in self._indexed_from_id:
return
for name, value in self._indexed_from_id[obj_id].items():
if value in self._indexed_values[name]:
del self._indexed_values[name][value]
del self._indexed_from_id[obj_id]
def _remap(self):
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
return None
def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
"""
This function merges the other object into the existing object, which is contained in the current collection.
This also modifies the correct mapping.
"""
if existing.id == other.id:
return
self._map_element(existing)
existing.merge(other, **kwargs)
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
self._map_element(other)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
"""
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
if id(b) in no_sync_collection:
continue
"""
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
# no_sync_collection.add(id(b))
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param other:
:return:
"""
if other is None:
return
if other.id in self._indexed_from_id:
return
object_trace(f"Appending {other.option_string} to {self}")
push_to: Optional[Tuple[Collection, T]] = None
for c in self.push_to:
r = c._find_object(other)
if r is not None:
push_to_collection = (c, r)
output("found push to", found, other, self, color=BColors.RED, sep="\t")
break
pull_from: Optional[Tuple[Collection, T]] = None
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
pull_from_collection = (c, r)
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
break
if pull_from is not None:
pull_from[0].remove(pull_from[1])
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
if existing_object is None:
if push_to is None:
self._append_new_object(other, **kwargs)
else:
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs)
else:
self._merge_into_contained_object(existing_object, other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
def remove(self, *other_list: List[T], silent: bool = False):
for other in other_list:
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
self._data.remove(existing)
self._unmap_element(existing)
def contains(self, other: T) -> bool:
return self._find_object(other) is not None
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return
for __object in other_collections:
self.append(__object, **kwargs)
@property
def data(self) -> List[T]:
return list(self.__iter__())
def __len__(self) -> int:
return len(self._data)
@property
def empty(self) -> bool:
return self.__len__() <= 0
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, other: Collection, **kwargs):
self.extend(other, **kwargs)
def __getitem__(self, item: int):
return self._data[item]
def get(self, item: int, default = None):
if item >= len(self._data):
return default
return self._data[item]