Compare commits

..

3 Commits

Author SHA1 Message Date
3d432cd0d7 fix: test
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-23 11:44:39 +02:00
0080a48e70 feat: removed legacy code 2024-04-23 11:39:25 +02:00
ea5adfbe8a feat: limited complexity of collection by removing child collections 2024-04-23 11:37:49 +02:00
5 changed files with 40 additions and 121 deletions

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
from .parents import OuterProxy
from ..utils import object_trace
@ -22,28 +22,27 @@ class Collection(Generic[T]):
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, Collection] = None,
contain_given_in_attribute: Dict[str, Collection] = None,
contain_attribute_in_given: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None
append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None,
) -> None:
self._collection_for: dict = dict()
self._contains_ids = set()
self._data = []
self.parents: List[Collection[T]] = []
self.children: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self._id_to_index_values: Dict[int, set] = defaultdict(set)
self._indexed_values = defaultdict(lambda: None)
self._indexed_to_objects = defaultdict(lambda: None)
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
self.extend(data)
@ -51,66 +50,34 @@ class Collection(Generic[T]):
return f"Collection({id(self)})"
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
self._unmap_element(__object.id)
for name, value in (*__object.indexing_values, ('id', __object.id)):
self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
for name, value in __object.indexing_values:
if value is None or value == __object._inner._default_values.get(name):
continue
self._indexed_values[name] = value
self._indexed_to_objects[value] = __object
self._id_to_index_values[__object.id].add((name, value))
self._indexed_values[name][value] = __object
self._indexed_from_id[__object.id][name] = value
def _unmap_element(self, __object: Union[T, int]):
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
if obj_id in self._contains_ids:
self._contains_ids.remove(obj_id)
if obj_id not in self._indexed_from_id:
return
for name, value in self._id_to_index_values[obj_id]:
if name in self._indexed_values:
del self._indexed_values[name]
if value in self._indexed_to_objects:
del self._indexed_to_objects[value]
for name, value in self._indexed_from_id[obj_id].items():
if value in self._indexed_values[name]:
del self._indexed_values[name][value]
del self._id_to_index_values[obj_id]
del self._indexed_from_id[obj_id]
@property
def is_root(self) -> bool:
return len(self.parents) <= 0
def _find_object_in_self(self, __object: T) -> Optional[T]:
def _find_object(self, __object: T) -> Optional[T]:
for name, value in __object.indexing_values:
if value == self._indexed_values[name]:
return self._indexed_to_objects[value]
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
other_object = self._find_object_in_self(__object)
if other_object is not None:
return self, other_object
for c in self.children:
o, other_object = c._find_object(__object)
if other_object is not None:
return o, other_object
if no_sibling:
return self, None
"""
# find in siblings and all children of siblings
for parent in self.parents:
for sibling in parent.children:
if sibling is self:
continue
o, other_object = sibling._find_object(__object, no_sibling=True)
if other_object is not None:
return o, other_object
"""
return self, None
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
"""
@ -127,15 +94,15 @@ class Collection(Generic[T]):
if __object is None:
return
append_to, existing_object = self._find_object(__object)
existing_object = self._find_object(__object)
if existing_object is None:
# append
append_to._data.append(__object)
append_to._map_element(__object)
self._data.append(__object)
self._map_element(__object)
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object)
@ -164,9 +131,9 @@ class Collection(Generic[T]):
existing_object.merge(__object)
if existing_object.id != old_id:
append_to._unmap_element(old_id)
self._unmap_element(old_id)
append_to._map_element(existing_object)
self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None:
@ -175,54 +142,22 @@ class Collection(Generic[T]):
for __object in __iterable:
self.append(__object)
def contain_collection_inside(self, sub_collection: Collection, _object: T):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if self is sub_collection or sub_collection in self.children:
return
_object._inner._is_collection_child[self] = sub_collection
_object._inner._is_collection_parent[sub_collection] = self
self.children.append(sub_collection)
sub_collection.parents.append(self)
@property
def data(self) -> List[T]:
return list(self.__iter__())
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.children)
return len(self._data)
@property
def empty(self) -> bool:
return self.__len__() <= 0
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
_finished_ids = finished_ids or set()
for element in self._data:
if element.id in _finished_ids:
continue
_finished_ids.add(element.id)
yield element
for c in self.children:
yield from c.__iter__(finished_ids=finished_ids)
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other)
def __getitem__(self, item: int):
if item < len(self._data):
return self._data[item]
item = item - len(self._data)
for c in self.children:
if item < len(c):
return c.__getitem__(item)
item = item - len(c._data)
raise IndexError
return self._data[item]

View File

@ -34,10 +34,6 @@ class InnerData:
self._refers_to_instances = set()
self._fetched_from: dict = {}
# collection : collection that is a collection of self
self._is_collection_child: Dict[Collection, Collection] = {}
self._is_collection_parent: Dict[Collection, Collection] = {}
# initialize the default values
self._default_values = {}
for name, factory in object_type._default_factories.items():
@ -201,18 +197,6 @@ class OuterProxy:
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
for collection, child_collection in b._inner._is_collection_child.items():
try:
collection.children.remove(child_collection)
except ValueError:
pass
for collection, parent_collection in b._inner._is_collection_parent.items():
try:
collection.parents.remove(parent_collection)
except ValueError:
pass
old_inner = b._inner

View File

@ -93,7 +93,7 @@ class Song(Base):
self.album_collection.append_object_to_attribute = {
"song_collection": self,
}
self.main_artist_collection.contain_given_in_attribute = {
self.main_artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection
}
self.feature_artist_collection.append_object_to_attribute = {
@ -253,7 +253,7 @@ class Album(Base):
self.artist_collection.append_object_to_attribute = {
"main_album_collection": self
}
self.artist_collection.contain_given_in_attribute = {
self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection
}

View File

@ -16,7 +16,7 @@ DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False

View File

@ -86,7 +86,7 @@ class TestCollection(unittest.TestCase):
]
)
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].artist_collection[0].id)
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
def test_artist_collection_sync(self):
album_1 = Album(