feat: limited complexity of collection by removing child collections
This commit is contained in:
parent
fa723d7747
commit
ea5adfbe8a
@ -1,7 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union
|
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
|
||||||
from .parents import OuterProxy
|
from .parents import OuterProxy
|
||||||
from ..utils import object_trace
|
from ..utils import object_trace
|
||||||
|
|
||||||
@ -22,28 +22,27 @@ class Collection(Generic[T]):
|
|||||||
self,
|
self,
|
||||||
data: Optional[Iterable[T]] = None,
|
data: Optional[Iterable[T]] = None,
|
||||||
sync_on_append: Dict[str, Collection] = None,
|
sync_on_append: Dict[str, Collection] = None,
|
||||||
contain_given_in_attribute: Dict[str, Collection] = None,
|
append_object_to_attribute: Dict[str, T] = None,
|
||||||
contain_attribute_in_given: Dict[str, Collection] = None,
|
extend_object_to_attribute: Dict[str, Collection] = None,
|
||||||
append_object_to_attribute: Dict[str, T] = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self._collection_for: dict = dict()
|
self._collection_for: dict = dict()
|
||||||
|
|
||||||
self._contains_ids = set()
|
self._contains_ids = set()
|
||||||
self._data = []
|
self._data = []
|
||||||
|
|
||||||
self.parents: List[Collection[T]] = []
|
|
||||||
self.children: List[Collection[T]] = []
|
|
||||||
|
|
||||||
# List of collection attributes that should be modified on append
|
# List of collection attributes that should be modified on append
|
||||||
# Key: collection attribute (str) of appended element
|
# Key: collection attribute (str) of appended element
|
||||||
# Value: main collection to sync to
|
# Value: main collection to sync to
|
||||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
|
||||||
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
||||||
|
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
|
||||||
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||||
|
|
||||||
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
||||||
self._indexed_values = defaultdict(lambda: None)
|
|
||||||
self._indexed_to_objects = defaultdict(lambda: None)
|
# This is to cleanly unmap previously mapped items by their id
|
||||||
|
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
||||||
|
# this is to keep track and look up the actual objects
|
||||||
|
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
|
||||||
|
|
||||||
self.extend(data)
|
self.extend(data)
|
||||||
|
|
||||||
@ -51,66 +50,34 @@ class Collection(Generic[T]):
|
|||||||
return f"Collection({id(self)})"
|
return f"Collection({id(self)})"
|
||||||
|
|
||||||
def _map_element(self, __object: T, from_map: bool = False):
|
def _map_element(self, __object: T, from_map: bool = False):
|
||||||
self._contains_ids.add(__object.id)
|
self._unmap_element(__object.id)
|
||||||
|
|
||||||
for name, value in (*__object.indexing_values, ('id', __object.id)):
|
self._indexed_from_id[__object.id]["id"] = __object.id
|
||||||
|
self._indexed_values["id"][__object.id] = __object
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
if value is None or value == __object._inner._default_values.get(name):
|
if value is None or value == __object._inner._default_values.get(name):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._indexed_values[name] = value
|
self._indexed_values[name][value] = __object
|
||||||
self._indexed_to_objects[value] = __object
|
self._indexed_from_id[__object.id][name] = value
|
||||||
|
|
||||||
self._id_to_index_values[__object.id].add((name, value))
|
|
||||||
|
|
||||||
def _unmap_element(self, __object: Union[T, int]):
|
def _unmap_element(self, __object: Union[T, int]):
|
||||||
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
|
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
|
||||||
|
|
||||||
if obj_id in self._contains_ids:
|
if obj_id not in self._indexed_from_id:
|
||||||
self._contains_ids.remove(obj_id)
|
return
|
||||||
|
|
||||||
for name, value in self._id_to_index_values[obj_id]:
|
for name, value in self._indexed_from_id[obj_id].items():
|
||||||
if name in self._indexed_values:
|
if value in self._indexed_values[name]:
|
||||||
del self._indexed_values[name]
|
del self._indexed_values[name][value]
|
||||||
if value in self._indexed_to_objects:
|
|
||||||
del self._indexed_to_objects[value]
|
|
||||||
|
|
||||||
del self._id_to_index_values[obj_id]
|
del self._indexed_from_id[obj_id]
|
||||||
|
|
||||||
@property
|
def _find_object(self, __object: T) -> Optional[T]:
|
||||||
def is_root(self) -> bool:
|
|
||||||
return len(self.parents) <= 0
|
|
||||||
|
|
||||||
def _find_object_in_self(self, __object: T) -> Optional[T]:
|
|
||||||
for name, value in __object.indexing_values:
|
for name, value in __object.indexing_values:
|
||||||
if value == self._indexed_values[name]:
|
if value in self._indexed_values[name]:
|
||||||
return self._indexed_to_objects[value]
|
return self._indexed_values[name][value]
|
||||||
|
|
||||||
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
|
|
||||||
other_object = self._find_object_in_self(__object)
|
|
||||||
if other_object is not None:
|
|
||||||
return self, other_object
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
o, other_object = c._find_object(__object)
|
|
||||||
if other_object is not None:
|
|
||||||
return o, other_object
|
|
||||||
|
|
||||||
if no_sibling:
|
|
||||||
return self, None
|
|
||||||
|
|
||||||
"""
|
|
||||||
# find in siblings and all children of siblings
|
|
||||||
for parent in self.parents:
|
|
||||||
for sibling in parent.children:
|
|
||||||
if sibling is self:
|
|
||||||
continue
|
|
||||||
|
|
||||||
o, other_object = sibling._find_object(__object, no_sibling=True)
|
|
||||||
if other_object is not None:
|
|
||||||
return o, other_object
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self, None
|
|
||||||
|
|
||||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||||
"""
|
"""
|
||||||
@ -127,15 +94,15 @@ class Collection(Generic[T]):
|
|||||||
if __object is None:
|
if __object is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
append_to, existing_object = self._find_object(__object)
|
existing_object = self._find_object(__object)
|
||||||
|
|
||||||
if existing_object is None:
|
if existing_object is None:
|
||||||
# append
|
# append
|
||||||
append_to._data.append(__object)
|
self._data.append(__object)
|
||||||
append_to._map_element(__object)
|
self._map_element(__object)
|
||||||
|
|
||||||
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
|
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
|
||||||
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
|
__object.__getattribute__(collection_attribute).extend(child_collection)
|
||||||
|
|
||||||
for attribute, new_object in self.append_object_to_attribute.items():
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
__object.__getattribute__(attribute).append(new_object)
|
__object.__getattribute__(attribute).append(new_object)
|
||||||
@ -164,9 +131,9 @@ class Collection(Generic[T]):
|
|||||||
existing_object.merge(__object)
|
existing_object.merge(__object)
|
||||||
|
|
||||||
if existing_object.id != old_id:
|
if existing_object.id != old_id:
|
||||||
append_to._unmap_element(old_id)
|
self._unmap_element(old_id)
|
||||||
|
|
||||||
append_to._map_element(existing_object)
|
self._map_element(existing_object)
|
||||||
|
|
||||||
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
||||||
if __iterable is None:
|
if __iterable is None:
|
||||||
@ -175,54 +142,22 @@ class Collection(Generic[T]):
|
|||||||
for __object in __iterable:
|
for __object in __iterable:
|
||||||
self.append(__object)
|
self.append(__object)
|
||||||
|
|
||||||
def contain_collection_inside(self, sub_collection: Collection, _object: T):
|
|
||||||
"""
|
|
||||||
This collection will ALWAYS contain everything from the passed in collection
|
|
||||||
"""
|
|
||||||
if self is sub_collection or sub_collection in self.children:
|
|
||||||
return
|
|
||||||
|
|
||||||
_object._inner._is_collection_child[self] = sub_collection
|
|
||||||
_object._inner._is_collection_parent[sub_collection] = self
|
|
||||||
|
|
||||||
self.children.append(sub_collection)
|
|
||||||
sub_collection.parents.append(self)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def data(self) -> List[T]:
|
def data(self) -> List[T]:
|
||||||
return list(self.__iter__())
|
return list(self.__iter__())
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
return len(self._data) + sum(len(collection) for collection in self.children)
|
return len(self._data)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def empty(self) -> bool:
|
def empty(self) -> bool:
|
||||||
return self.__len__() <= 0
|
return self.__len__() <= 0
|
||||||
|
|
||||||
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
|
def __iter__(self) -> Iterator[T]:
|
||||||
_finished_ids = finished_ids or set()
|
yield from self._data
|
||||||
|
|
||||||
for element in self._data:
|
|
||||||
if element.id in _finished_ids:
|
|
||||||
continue
|
|
||||||
_finished_ids.add(element.id)
|
|
||||||
yield element
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
yield from c.__iter__(finished_ids=finished_ids)
|
|
||||||
|
|
||||||
def __merge__(self, __other: Collection, override: bool = False):
|
def __merge__(self, __other: Collection, override: bool = False):
|
||||||
self.extend(__other)
|
self.extend(__other)
|
||||||
|
|
||||||
def __getitem__(self, item: int):
|
def __getitem__(self, item: int):
|
||||||
if item < len(self._data):
|
return self._data[item]
|
||||||
return self._data[item]
|
|
||||||
|
|
||||||
item = item - len(self._data)
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
if item < len(c):
|
|
||||||
return c.__getitem__(item)
|
|
||||||
item = item - len(c._data)
|
|
||||||
|
|
||||||
raise IndexError
|
|
||||||
|
@ -93,7 +93,7 @@ class Song(Base):
|
|||||||
self.album_collection.append_object_to_attribute = {
|
self.album_collection.append_object_to_attribute = {
|
||||||
"song_collection": self,
|
"song_collection": self,
|
||||||
}
|
}
|
||||||
self.main_artist_collection.contain_given_in_attribute = {
|
self.main_artist_collection.extend_object_to_attribute = {
|
||||||
"main_album_collection": self.album_collection
|
"main_album_collection": self.album_collection
|
||||||
}
|
}
|
||||||
self.feature_artist_collection.append_object_to_attribute = {
|
self.feature_artist_collection.append_object_to_attribute = {
|
||||||
@ -253,7 +253,7 @@ class Album(Base):
|
|||||||
self.artist_collection.append_object_to_attribute = {
|
self.artist_collection.append_object_to_attribute = {
|
||||||
"main_album_collection": self
|
"main_album_collection": self
|
||||||
}
|
}
|
||||||
self.artist_collection.contain_given_in_attribute = {
|
self.artist_collection.extend_object_to_attribute = {
|
||||||
"label_collection": self.label_collection
|
"label_collection": self.label_collection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user