feat: start

This commit is contained in:
Hazel 2023-10-24 11:44:00 +02:00
parent 0db00f934b
commit b1ac14f2f2
6 changed files with 168 additions and 360 deletions

View File

@ -107,7 +107,7 @@ print(only_smile)
c = Collection([Song(title="hi"), Song(title="hi2"), Song(title="hi3")])
c1 = Collection([Song(title="he"), Song(title="hi5")])
c11 = Collection([Song(title="wow how ultra subby")])
c11 = Collection([Song(title="wow how ultra subby"),Song(title="wow how ultra subby")])
c2 = Collection([Song(title="heeee")])
b = Collection([Song(title="some b"), Song(title="other b")])

View File

@ -1,221 +1,151 @@
from typing import List, Iterable, Dict, TypeVar, Generic, Iterator
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
from dataclasses import dataclass
from .parents import DatabaseObject
from ..utils.hooks import HookEventTypes, Hooks, Event
class CollectionHooks(HookEventTypes):
APPEND_NEW = "append_new"
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
@dataclass
class AppendResult:
was_in_collection: bool
current_element: DatabaseObject
was_the_same: bool
class Collection(Generic[T]):
"""
This a class for the iterables
like tracklist or discography
"""
class Collection(Generic[T], metaclass=MetaClass):
_data: List[T]
_by_url: dict
_by_attribute: dict
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None:
# Attribute needs to point to
self.element_type = element_type
shallow_list = property(fget=lambda self: self.data)
self._data: List[T] = list()
def __init__(
self, data: Optional[Iterable[T]],
sync_on_append: Dict[str, Collection] = None,
contain_given_in_attribute: Dict[str, Collection] = None,
contain_attribute_in_given: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, DatabaseObject] = None
) -> None:
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
"""
example of attribute_to_object_map
the song objects are references pointing to objects
in _data
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_given_in_attribute or {}
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
```python
{
'id': {323: song_1, 563: song_2, 666: song_3},
'url': {'www.song_2.com': song_2}
}
```
"""
self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict)
self._used_ids: set = set()
self.contain_self_on_append: List[str] = []
self.hooks: Hooks = Hooks(self)
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
if data is not None:
self.extend(data, merge_on_conflict=True)
self.extend(data)
def sort(self, reverse: bool = False, **kwargs):
self._data.sort(reverse=reverse, **kwargs)
def map_element(self, element: T):
for name, value in element.indexing_values:
def _map_element(self, __object: T):
for name, value in __object.indexing_values:
if value is None:
continue
self._attribute_to_object_map[name][value] = element
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
self._used_ids.add(element.id)
def unmap_element(self, element: T):
for name, value in element.indexing_values:
def _unmap_element(self, __object: T):
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
if value in self._attribute_to_object_map[name]:
if element is self._attribute_to_object_map[name][value]:
try:
self._attribute_to_object_map[name].pop(value)
except KeyError:
pass
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
def append(self, element: T, merge_on_conflict: bool = True,
merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult:
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _contained_in(self, __object: T) -> Optional["Collection"]:
if self._contained_in_self(__object):
return self
for collection in self.contained_collections:
if collection._contained_in_self(__object):
return collection
return None
def contains(self, __object: T) -> bool:
return self._contained_in(__object) is not None
def _append(self, __object: T):
self._map_element(__object)
self._data.append(__object)
def append(self, __object: Optional[T]):
if __object is None:
return
self._append(__object)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
:param element:
:param merge_on_conflict:
:param merge_into_existing:
:return did_not_exist:
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if element is None:
return AppendResult(False, None, False)
for existing_element in self._data:
if element is existing_element:
return AppendResult(False, None, False)
# if the element type has been defined in the initializer it checks if the type matches
if self.element_type is not None and not isinstance(element, self.element_type):
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
# return if the same instance of the object is in the list
for existing in self._data:
if element is existing:
return AppendResult(True, element, True)
for name, value in element.indexing_values:
if value in self._attribute_to_object_map[name]:
existing_object = self._attribute_to_object_map[name][value]
if not merge_on_conflict:
return AppendResult(True, existing_object, False)
# if the object does already exist
# thus merging and don't add it afterward
if merge_into_existing:
existing_object.merge(element)
# in case any relevant data has been added (e.g. it remaps the old object)
self.map_element(existing_object)
return AppendResult(True, existing_object, False)
element.merge(existing_object)
exists_at = self._data.index(existing_object)
self._data[exists_at] = element
self.unmap_element(existing_object)
self.map_element(element)
return AppendResult(True, existing_object, False)
if not no_hook:
self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element)
self._data.append(element)
self.map_element(element)
return AppendResult(False, element, False)
def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True,
merge_into_existing: bool = True, no_hook: bool = False):
if element_list is None:
if equal_collection is self:
return
if len(element_list) <= 0:
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self.merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
if element_list is self:
return
for element in element_list:
self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook)
def sync_collection(self, collection_attribute: str):
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection = new_object.__getattribute__(collection_attribute)
if self is new_collection:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
self.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
new_object.__setattr__(collection_attribute, self)
@property
def data(self) -> List[T]:
return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
def sync_main_collection(self, main_collection: "Collection", collection_attribute: str):
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection = new_object.__getattribute__(collection_attribute)
if main_collection is new_collection:
return
main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
new_object.__setattr__(collection_attribute, main_collection)
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
"""
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection: Collection = new_object.__getattribute__(collection_attribute)
if self is new_collection:
return
self.extend(new_collection.shallow_list, no_hook=False)
new_object.__setattr__(collection_attribute, self)
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
"""
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element
def __str__(self) -> str:
return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
def __len__(self) -> int:
return len(self._data)
def __getitem__(self, key) -> T:
if type(key) != int:
return ValueError("key needs to be an integer")
return self._data[key]
def __setitem__(self, key, value: T):
if type(key) != int:
return ValueError("key needs to be an integer")
old_item = self._data[key]
self.unmap_element(old_item)
self.map_element(value)
self._data[key] = value
@property
def shallow_list(self) -> List[T]:
"""
returns a shallow copy of the data list
"""
return self._data.copy()
@property
def empty(self) -> bool:
return len(self._data) == 0
def clear(self):
self.__init__(element_type=self.element_type)

View File

@ -1,133 +0,0 @@
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
from .parents import DatabaseObject
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
class Collection(Generic[T], metaclass=MetaClass):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(self, data: Optional[Iterable[T]]) -> None:
self._data = []
self.contained_collections: List[Collection[T]] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T):
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
def _unmap_element(self, __object: T):
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _contained_in(self, __object: T) -> Optional["Collection"]:
if self._contained_in_self(__object):
return self
for collection in self.contained_collections:
if collection._contained_in_self(__object):
return collection
return None
def contains(self, __object: T) -> bool:
return self._contained_in(__object) is not None
def _append(self, __object: T):
self._map_element(__object)
self._data.append(__object)
def append(self, __object: Optional[T]):
if __object is None:
return
self._append(__object)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self.merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
@property
def data(self) -> List[T]:
return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -147,6 +147,8 @@ class DatabaseObject(metaclass=MetaClass):
return list()
def merge(self, other, override: bool = False, replace_all_refs: bool = False):
print("merge")
if other is None:
return

View File

@ -5,7 +5,7 @@ from typing import List, Optional, Dict, Tuple, Type
import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus
from .collection import Collection, CollectionHooks
from .collection import Collection
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .contact import Contact
@ -100,25 +100,29 @@ class Song(MainObject):
self.notes: FormattedText = notes or FormattedText()
self.source_collection: SourceCollection = SourceCollection(source_list)
self.target_collection: Collection[Target] = Collection(data=target_list, element_type=Target)
self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list, element_type=Lyrics)
self.target_collection: Collection[Target] = Collection(data=target_list)
self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list)
# main_artist_collection = album.artist collection
self.main_artist_collection: Collection[Artist] = Collection(data=main_artist_list, element_type=Artist)
self.main_artist_collection: Collection[Artist] = Collection(data=[])
# this album_collection equals no collection
self.album_collection: Collection[Album] = Collection(data=[], element_type=Album)
self.album_collection.sync_main_collection(self.main_artist_collection, "artist_collection")
self.album_collection.extend(album_list)
# self.album_collection.sync_collection("song_collection")
# self.album_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_album_append)
self.album_collection: Collection[Album] = Collection(data=album_list,
contain_given_in_attribute={
"artist_collection": self.main_artist_collection
}, append_object_to_attribute={
"song_collection": self
})
# on feature_artist_collection append, append self to artist self
self.feature_artist_collection: Collection[Artist] = Collection(data=[], element_type=Artist)
def on_feature_artist_append(event, new_object: Artist, *args, **kwargs):
new_object.feature_song_collection.append(self, no_hook=True)
self.feature_artist_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_feature_artist_append)
self.feature_artist_collection.extend(feature_artist_list)
self.main_artist_collection.contain_given_in_attribute = {"main_album_collection": self.album_collection}
self.main_artist_collection.extend(main_artist_list)
self.feature_artist_collection: Collection[Artist] = Collection(
data=feature_artist_list,
append_object_to_attribute={
"feature_song_collection": self
}
)
def _build_recursive_structures(self, build_version: int, merge: bool):
if build_version == self.build_version:
@ -322,13 +326,16 @@ class Album(MainObject):
self.source_collection: SourceCollection = SourceCollection(source_list)
self.artist_collection: Collection[Artist] = Collection(data=artist_list, element_type=Artist)
self.artist_collection: Collection[Artist] = Collection(data=artist_list)
self.song_collection: Collection[Song] = Collection(data=[], element_type=Song)
self.song_collection.sync_main_collection(self.artist_collection, "main_artist_collection")
self.song_collection.extend(song_list)
self.song_collection: Collection[Song] = Collection(
data=song_list,
contain_attribute_in_given={
"main_artist_collection": self.artist_collection
}
)
self.label_collection: Collection[Label] = Collection(data=label_list, element_type=Label)
self.label_collection: Collection[Label] = Collection(data=label_list)
def _build_recursive_structures(self, build_version: int, merge: bool):
if build_version == self.build_version:
@ -589,22 +596,26 @@ class Artist(MainObject):
self.source_collection: SourceCollection = SourceCollection(source_list)
self.contact_collection: Collection[Label] = Collection(data=contact_list, element_type=Contact)
self.feature_song_collection: Collection[Song] = Collection(data=[], element_type=Song)
def on_feature_song_append(event, new_object: Song, *args, **kwargs):
new_object.feature_artist_collection.append(self, no_hook=True)
self.feature_song_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_feature_song_append)
self.feature_song_collection.extend(feature_song_list)
self.feature_song_collection: Collection[Song] = Collection(
data=feature_song_list,
append_object_to_attribute={
"feature_artist_collection": self
}
)
self.main_album_collection: Collection[Album] = Collection(data=[], element_type=Album)
def on_album_append(event, new_object: Album, *args, **kwargs):
new_object.artist_collection.append(self, no_hook=True)
self.main_album_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_album_append)
self.main_album_collection.extend(main_album_list)
self.main_album_collection: Collection[Album] = Collection(
data=main_album_list,
append_object_to_attribute={
"artist_collection": self
}
)
self.label_collection: Collection[Label] = Collection(data=label_list, element_type=Label)
def on_label_append(event, new_object: Label, *args, **kwargs):
new_object.current_artist_collection.append(self, no_hook=True)
self.label_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_label_append)
self.label_collection: Collection[Label] = Collection(
data=label_list,
append_object_to_attribute={
"current_artist_collection": self
}
)
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):

View File

@ -10,9 +10,7 @@ class Lake:
def get_real_object(self, db_object: object) -> object:
def _get_real_id(_id: int) -> int:
if _id in self.redirects:
return _get_real_id(self.redirects[_id])
return _id
return self.redirects.get(_id, _id)
_id = _get_real_id(id(db_object))
if _id not in self.id_to_object: