Compare commits

..

3 Commits

Author SHA1 Message Date
3d432cd0d7 fix: test
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-23 11:44:39 +02:00
0080a48e70 feat: removed legacy code 2024-04-23 11:39:25 +02:00
ea5adfbe8a feat: limited complexity of collection by removing child collections 2024-04-23 11:37:49 +02:00
5 changed files with 40 additions and 121 deletions

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
from .parents import OuterProxy from .parents import OuterProxy
from ..utils import object_trace from ..utils import object_trace
@ -22,28 +22,27 @@ class Collection(Generic[T]):
self, self,
data: Optional[Iterable[T]] = None, data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, Collection] = None, sync_on_append: Dict[str, Collection] = None,
contain_given_in_attribute: Dict[str, Collection] = None, append_object_to_attribute: Dict[str, T] = None,
contain_attribute_in_given: Dict[str, Collection] = None, extend_object_to_attribute: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None
) -> None: ) -> None:
self._collection_for: dict = dict() self._collection_for: dict = dict()
self._contains_ids = set() self._contains_ids = set()
self._data = [] self._data = []
self.parents: List[Collection[T]] = []
self.children: List[Collection[T]] = []
# List of collection attributes that should be modified on append # List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element # Key: collection attribute (str) of appended element
# Value: main collection to sync to # Value: main collection to sync to
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self._id_to_index_values: Dict[int, set] = defaultdict(set) self._id_to_index_values: Dict[int, set] = defaultdict(set)
self._indexed_values = defaultdict(lambda: None)
self._indexed_to_objects = defaultdict(lambda: None) # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
self.extend(data) self.extend(data)
@ -51,66 +50,34 @@ class Collection(Generic[T]):
return f"Collection({id(self)})" return f"Collection({id(self)})"
def _map_element(self, __object: T, from_map: bool = False): def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id) self._unmap_element(__object.id)
for name, value in (*__object.indexing_values, ('id', __object.id)): self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
for name, value in __object.indexing_values:
if value is None or value == __object._inner._default_values.get(name): if value is None or value == __object._inner._default_values.get(name):
continue continue
self._indexed_values[name] = value self._indexed_values[name][value] = __object
self._indexed_to_objects[value] = __object self._indexed_from_id[__object.id][name] = value
self._id_to_index_values[__object.id].add((name, value))
def _unmap_element(self, __object: Union[T, int]): def _unmap_element(self, __object: Union[T, int]):
obj_id = __object.id if isinstance(__object, OuterProxy) else __object obj_id = __object.id if isinstance(__object, OuterProxy) else __object
if obj_id in self._contains_ids: if obj_id not in self._indexed_from_id:
self._contains_ids.remove(obj_id) return
for name, value in self._id_to_index_values[obj_id]: for name, value in self._indexed_from_id[obj_id].items():
if name in self._indexed_values: if value in self._indexed_values[name]:
del self._indexed_values[name] del self._indexed_values[name][value]
if value in self._indexed_to_objects:
del self._indexed_to_objects[value]
del self._id_to_index_values[obj_id] del self._indexed_from_id[obj_id]
@property def _find_object(self, __object: T) -> Optional[T]:
def is_root(self) -> bool:
return len(self.parents) <= 0
def _find_object_in_self(self, __object: T) -> Optional[T]:
for name, value in __object.indexing_values: for name, value in __object.indexing_values:
if value == self._indexed_values[name]: if value in self._indexed_values[name]:
return self._indexed_to_objects[value] return self._indexed_values[name][value]
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
other_object = self._find_object_in_self(__object)
if other_object is not None:
return self, other_object
for c in self.children:
o, other_object = c._find_object(__object)
if other_object is not None:
return o, other_object
if no_sibling:
return self, None
"""
# find in siblings and all children of siblings
for parent in self.parents:
for sibling in parent.children:
if sibling is self:
continue
o, other_object = sibling._find_object(__object, no_sibling=True)
if other_object is not None:
return o, other_object
"""
return self, None
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
""" """
@ -127,15 +94,15 @@ class Collection(Generic[T]):
if __object is None: if __object is None:
return return
append_to, existing_object = self._find_object(__object) existing_object = self._find_object(__object)
if existing_object is None: if existing_object is None:
# append # append
append_to._data.append(__object) self._data.append(__object)
append_to._map_element(__object) self._map_element(__object)
for collection_attribute, child_collection in self.contain_given_in_attribute.items(): for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object) __object.__getattribute__(collection_attribute).extend(child_collection)
for attribute, new_object in self.append_object_to_attribute.items(): for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object) __object.__getattribute__(attribute).append(new_object)
@ -164,9 +131,9 @@ class Collection(Generic[T]):
existing_object.merge(__object) existing_object.merge(__object)
if existing_object.id != old_id: if existing_object.id != old_id:
append_to._unmap_element(old_id) self._unmap_element(old_id)
append_to._map_element(existing_object) self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]): def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None: if __iterable is None:
@ -175,54 +142,22 @@ class Collection(Generic[T]):
for __object in __iterable: for __object in __iterable:
self.append(__object) self.append(__object)
def contain_collection_inside(self, sub_collection: Collection, _object: T):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if self is sub_collection or sub_collection in self.children:
return
_object._inner._is_collection_child[self] = sub_collection
_object._inner._is_collection_parent[sub_collection] = self
self.children.append(sub_collection)
sub_collection.parents.append(self)
@property @property
def data(self) -> List[T]: def data(self) -> List[T]:
return list(self.__iter__()) return list(self.__iter__())
def __len__(self) -> int: def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.children) return len(self._data)
@property @property
def empty(self) -> bool: def empty(self) -> bool:
return self.__len__() <= 0 return self.__len__() <= 0
def __iter__(self, finished_ids: set = None) -> Iterator[T]: def __iter__(self) -> Iterator[T]:
_finished_ids = finished_ids or set() yield from self._data
for element in self._data:
if element.id in _finished_ids:
continue
_finished_ids.add(element.id)
yield element
for c in self.children:
yield from c.__iter__(finished_ids=finished_ids)
def __merge__(self, __other: Collection, override: bool = False): def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other) self.extend(__other)
def __getitem__(self, item: int): def __getitem__(self, item: int):
if item < len(self._data): return self._data[item]
return self._data[item]
item = item - len(self._data)
for c in self.children:
if item < len(c):
return c.__getitem__(item)
item = item - len(c._data)
raise IndexError

View File

@ -34,10 +34,6 @@ class InnerData:
self._refers_to_instances = set() self._refers_to_instances = set()
self._fetched_from: dict = {} self._fetched_from: dict = {}
# collection : collection that is a collection of self
self._is_collection_child: Dict[Collection, Collection] = {}
self._is_collection_parent: Dict[Collection, Collection] = {}
# initialize the default values # initialize the default values
self._default_values = {} self._default_values = {}
for name, factory in object_type._default_factories.items(): for name, factory in object_type._default_factories.items():
@ -201,18 +197,6 @@ class OuterProxy:
a, b = b, a a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]") object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
for collection, child_collection in b._inner._is_collection_child.items():
try:
collection.children.remove(child_collection)
except ValueError:
pass
for collection, parent_collection in b._inner._is_collection_parent.items():
try:
collection.parents.remove(parent_collection)
except ValueError:
pass
old_inner = b._inner old_inner = b._inner

View File

@ -93,7 +93,7 @@ class Song(Base):
self.album_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
"song_collection": self, "song_collection": self,
} }
self.main_artist_collection.contain_given_in_attribute = { self.main_artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection "main_album_collection": self.album_collection
} }
self.feature_artist_collection.append_object_to_attribute = { self.feature_artist_collection.append_object_to_attribute = {
@ -253,7 +253,7 @@ class Album(Base):
self.artist_collection.append_object_to_attribute = { self.artist_collection.append_object_to_attribute = {
"main_album_collection": self "main_album_collection": self
} }
self.artist_collection.contain_given_in_attribute = { self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection "label_collection": self.label_collection
} }

View File

@ -16,7 +16,7 @@ DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG and True DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False DEBUG_DUMP = DEBUG and False

View File

@ -86,7 +86,7 @@ class TestCollection(unittest.TestCase):
] ]
) )
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].artist_collection[0].id) self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
def test_artist_collection_sync(self): def test_artist_collection_sync(self):
album_1 = Album( album_1 = Album(