Compare commits
3 Commits
fa723d7747
...
3d432cd0d7
Author | SHA1 | Date | |
---|---|---|---|
3d432cd0d7 | |||
0080a48e70 | |||
ea5adfbe8a |
@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union
|
||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
|
||||
from .parents import OuterProxy
|
||||
from ..utils import object_trace
|
||||
|
||||
@ -22,28 +22,27 @@ class Collection(Generic[T]):
|
||||
self,
|
||||
data: Optional[Iterable[T]] = None,
|
||||
sync_on_append: Dict[str, Collection] = None,
|
||||
contain_given_in_attribute: Dict[str, Collection] = None,
|
||||
contain_attribute_in_given: Dict[str, Collection] = None,
|
||||
append_object_to_attribute: Dict[str, T] = None
|
||||
append_object_to_attribute: Dict[str, T] = None,
|
||||
extend_object_to_attribute: Dict[str, Collection] = None,
|
||||
) -> None:
|
||||
self._collection_for: dict = dict()
|
||||
|
||||
self._contains_ids = set()
|
||||
self._data = []
|
||||
|
||||
self.parents: List[Collection[T]] = []
|
||||
self.children: List[Collection[T]] = []
|
||||
|
||||
# List of collection attributes that should be modified on append
|
||||
# Key: collection attribute (str) of appended element
|
||||
# Value: main collection to sync to
|
||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
||||
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
|
||||
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||
|
||||
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
||||
self._indexed_values = defaultdict(lambda: None)
|
||||
self._indexed_to_objects = defaultdict(lambda: None)
|
||||
|
||||
# This is to cleanly unmap previously mapped items by their id
|
||||
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
||||
# this is to keep track and look up the actual objects
|
||||
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
|
||||
|
||||
self.extend(data)
|
||||
|
||||
@ -51,66 +50,34 @@ class Collection(Generic[T]):
|
||||
return f"Collection({id(self)})"
|
||||
|
||||
def _map_element(self, __object: T, from_map: bool = False):
|
||||
self._contains_ids.add(__object.id)
|
||||
self._unmap_element(__object.id)
|
||||
|
||||
for name, value in (*__object.indexing_values, ('id', __object.id)):
|
||||
self._indexed_from_id[__object.id]["id"] = __object.id
|
||||
self._indexed_values["id"][__object.id] = __object
|
||||
|
||||
for name, value in __object.indexing_values:
|
||||
if value is None or value == __object._inner._default_values.get(name):
|
||||
continue
|
||||
|
||||
self._indexed_values[name] = value
|
||||
self._indexed_to_objects[value] = __object
|
||||
|
||||
self._id_to_index_values[__object.id].add((name, value))
|
||||
self._indexed_values[name][value] = __object
|
||||
self._indexed_from_id[__object.id][name] = value
|
||||
|
||||
def _unmap_element(self, __object: Union[T, int]):
|
||||
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
|
||||
|
||||
if obj_id in self._contains_ids:
|
||||
self._contains_ids.remove(obj_id)
|
||||
if obj_id not in self._indexed_from_id:
|
||||
return
|
||||
|
||||
for name, value in self._id_to_index_values[obj_id]:
|
||||
if name in self._indexed_values:
|
||||
del self._indexed_values[name]
|
||||
if value in self._indexed_to_objects:
|
||||
del self._indexed_to_objects[value]
|
||||
for name, value in self._indexed_from_id[obj_id].items():
|
||||
if value in self._indexed_values[name]:
|
||||
del self._indexed_values[name][value]
|
||||
|
||||
del self._id_to_index_values[obj_id]
|
||||
del self._indexed_from_id[obj_id]
|
||||
|
||||
@property
|
||||
def is_root(self) -> bool:
|
||||
return len(self.parents) <= 0
|
||||
|
||||
def _find_object_in_self(self, __object: T) -> Optional[T]:
|
||||
def _find_object(self, __object: T) -> Optional[T]:
|
||||
for name, value in __object.indexing_values:
|
||||
if value == self._indexed_values[name]:
|
||||
return self._indexed_to_objects[value]
|
||||
|
||||
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
|
||||
other_object = self._find_object_in_self(__object)
|
||||
if other_object is not None:
|
||||
return self, other_object
|
||||
|
||||
for c in self.children:
|
||||
o, other_object = c._find_object(__object)
|
||||
if other_object is not None:
|
||||
return o, other_object
|
||||
|
||||
if no_sibling:
|
||||
return self, None
|
||||
|
||||
"""
|
||||
# find in siblings and all children of siblings
|
||||
for parent in self.parents:
|
||||
for sibling in parent.children:
|
||||
if sibling is self:
|
||||
continue
|
||||
|
||||
o, other_object = sibling._find_object(__object, no_sibling=True)
|
||||
if other_object is not None:
|
||||
return o, other_object
|
||||
"""
|
||||
|
||||
return self, None
|
||||
if value in self._indexed_values[name]:
|
||||
return self._indexed_values[name][value]
|
||||
|
||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||
"""
|
||||
@ -127,15 +94,15 @@ class Collection(Generic[T]):
|
||||
if __object is None:
|
||||
return
|
||||
|
||||
append_to, existing_object = self._find_object(__object)
|
||||
existing_object = self._find_object(__object)
|
||||
|
||||
if existing_object is None:
|
||||
# append
|
||||
append_to._data.append(__object)
|
||||
append_to._map_element(__object)
|
||||
self._data.append(__object)
|
||||
self._map_element(__object)
|
||||
|
||||
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
|
||||
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
|
||||
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
|
||||
__object.__getattribute__(collection_attribute).extend(child_collection)
|
||||
|
||||
for attribute, new_object in self.append_object_to_attribute.items():
|
||||
__object.__getattribute__(attribute).append(new_object)
|
||||
@ -164,9 +131,9 @@ class Collection(Generic[T]):
|
||||
existing_object.merge(__object)
|
||||
|
||||
if existing_object.id != old_id:
|
||||
append_to._unmap_element(old_id)
|
||||
self._unmap_element(old_id)
|
||||
|
||||
append_to._map_element(existing_object)
|
||||
self._map_element(existing_object)
|
||||
|
||||
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
||||
if __iterable is None:
|
||||
@ -175,54 +142,22 @@ class Collection(Generic[T]):
|
||||
for __object in __iterable:
|
||||
self.append(__object)
|
||||
|
||||
def contain_collection_inside(self, sub_collection: Collection, _object: T):
|
||||
"""
|
||||
This collection will ALWAYS contain everything from the passed in collection
|
||||
"""
|
||||
if self is sub_collection or sub_collection in self.children:
|
||||
return
|
||||
|
||||
_object._inner._is_collection_child[self] = sub_collection
|
||||
_object._inner._is_collection_parent[sub_collection] = self
|
||||
|
||||
self.children.append(sub_collection)
|
||||
sub_collection.parents.append(self)
|
||||
|
||||
@property
|
||||
def data(self) -> List[T]:
|
||||
return list(self.__iter__())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data) + sum(len(collection) for collection in self.children)
|
||||
return len(self._data)
|
||||
|
||||
@property
|
||||
def empty(self) -> bool:
|
||||
return self.__len__() <= 0
|
||||
|
||||
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
|
||||
_finished_ids = finished_ids or set()
|
||||
|
||||
for element in self._data:
|
||||
if element.id in _finished_ids:
|
||||
continue
|
||||
_finished_ids.add(element.id)
|
||||
yield element
|
||||
|
||||
for c in self.children:
|
||||
yield from c.__iter__(finished_ids=finished_ids)
|
||||
def __iter__(self) -> Iterator[T]:
|
||||
yield from self._data
|
||||
|
||||
def __merge__(self, __other: Collection, override: bool = False):
|
||||
self.extend(__other)
|
||||
|
||||
def __getitem__(self, item: int):
|
||||
if item < len(self._data):
|
||||
return self._data[item]
|
||||
|
||||
item = item - len(self._data)
|
||||
|
||||
for c in self.children:
|
||||
if item < len(c):
|
||||
return c.__getitem__(item)
|
||||
item = item - len(c._data)
|
||||
|
||||
raise IndexError
|
||||
return self._data[item]
|
||||
|
@ -34,10 +34,6 @@ class InnerData:
|
||||
self._refers_to_instances = set()
|
||||
self._fetched_from: dict = {}
|
||||
|
||||
# collection : collection that is a collection of self
|
||||
self._is_collection_child: Dict[Collection, Collection] = {}
|
||||
self._is_collection_parent: Dict[Collection, Collection] = {}
|
||||
|
||||
# initialize the default values
|
||||
self._default_values = {}
|
||||
for name, factory in object_type._default_factories.items():
|
||||
@ -202,18 +198,6 @@ class OuterProxy:
|
||||
|
||||
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
|
||||
|
||||
for collection, child_collection in b._inner._is_collection_child.items():
|
||||
try:
|
||||
collection.children.remove(child_collection)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
for collection, parent_collection in b._inner._is_collection_parent.items():
|
||||
try:
|
||||
collection.parents.remove(parent_collection)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
old_inner = b._inner
|
||||
|
||||
for instance in b._inner._refers_to_instances.copy():
|
||||
|
@ -93,7 +93,7 @@ class Song(Base):
|
||||
self.album_collection.append_object_to_attribute = {
|
||||
"song_collection": self,
|
||||
}
|
||||
self.main_artist_collection.contain_given_in_attribute = {
|
||||
self.main_artist_collection.extend_object_to_attribute = {
|
||||
"main_album_collection": self.album_collection
|
||||
}
|
||||
self.feature_artist_collection.append_object_to_attribute = {
|
||||
@ -253,7 +253,7 @@ class Album(Base):
|
||||
self.artist_collection.append_object_to_attribute = {
|
||||
"main_album_collection": self
|
||||
}
|
||||
self.artist_collection.contain_given_in_attribute = {
|
||||
self.artist_collection.extend_object_to_attribute = {
|
||||
"label_collection": self.label_collection
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ DEBUG = (__stage__ == "dev") and True
|
||||
DEBUG_LOGGING = DEBUG and False
|
||||
DEBUG_TRACE = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||
DEBUG_PAGES = DEBUG and False
|
||||
DEBUG_DUMP = DEBUG and False
|
||||
|
@ -86,7 +86,7 @@ class TestCollection(unittest.TestCase):
|
||||
]
|
||||
)
|
||||
|
||||
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].artist_collection[0].id)
|
||||
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
|
||||
|
||||
def test_artist_collection_sync(self):
|
||||
album_1 = Album(
|
||||
|
Loading…
Reference in New Issue
Block a user