feat: added new implementation

This commit is contained in:
Hazel 2023-12-19 13:58:39 +01:00
parent 0ec1a162be
commit 513054a0fe
13 changed files with 895 additions and 388 deletions

View File

@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10 (music-downloader)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (music-downloader)" project-jdk-type="Python SDK" />
</project>

View File

@ -3,9 +3,10 @@
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/venv/lib/python3.10/site-packages" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Python 3.10 (music-downloader)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="rythmbox-id3-lyrics-support" />
<orderEntry type="module" module-name="sponsorblock.py" />

View File

@ -2,6 +2,5 @@
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
<mapping directory="$PROJECT_DIR$/../forks/sponsorblock.py" vcs="Git" />
</component>
</project>

View File

@ -9,7 +9,23 @@ from music_kraken.objects import (
from music_kraken.objects.collection import Collection
from music_kraken.utils.enums import SourcePages
song = Song(title="Sad Story", isrc="testTest")
other_song = Song(title="hihi", genre="dsbm")
song.merge(other_song)
print(song.__dict__)
print(other_song.__dict__)
other_song.title = ":3"
print(song.__dict__)
print(other_song.__dict__)
print(song)
"""
only_smile = Artist(
name="Only Smile",
source_list=[Source(SourcePages.BANDCAMP, "https://onlysmile.bandcamp.com/")],
@ -100,10 +116,16 @@ def add_to_objects_dump(db_obj: DatabaseObject):
add_to_objects_dump(only_smile)
for _id, _object in objects_by_id.items():
print(_id, _object, sep=": ")
try:
print(_id, _object.title, sep=": ")
except AttributeError:
try:
print(_id, _object.name, sep=": ")
except AttributeError:
print(_id, _object, sep=": ")
print(only_smile)
"""
"""
c = Collection([Song(title="hi"), Song(title="hi2"), Song(title="hi3")])
c1 = Collection([Song(title="he"), Song(title="hi5")])

View File

@ -1,14 +1,13 @@
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator
from .parents import OuterProxy
from .parents import DatabaseObject
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=OuterProxy)
T = TypeVar('T', bound=DatabaseObject)
class Collection(Generic[T], metaclass=MetaClass):
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
@ -17,16 +16,18 @@ class Collection(Generic[T], metaclass=MetaClass):
shallow_list = property(fget=lambda self: self.data)
def __init__(
self, data: Optional[Iterable[T]],
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, DatabaseObject] = None
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, Collection] = None,
contain_given_in_attribute: Dict[str, Collection] = None,
contain_attribute_in_given: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
self.parents: List[Collection[T]] = []
self.children: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
@ -34,13 +35,13 @@ class Collection(Generic[T], metaclass=MetaClass):
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
@ -56,13 +57,13 @@ class Collection(Generic[T], metaclass=MetaClass):
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map = True)
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
@ -71,7 +72,7 @@ class Collection(Generic[T], metaclass=MetaClass):
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
@ -81,59 +82,62 @@ class Collection(Generic[T], metaclass=MetaClass):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
def _get_root_collections(self) -> List[Collection]:
if not len(self.parents):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
for upper_collection in self.parents:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
return len(self.parents) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List[Collection]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
for collection in self.children:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
if len(self.children) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
for collection in self.children:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
def merge_into_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
@ -141,30 +145,30 @@ class Collection(Generic[T], metaclass=MetaClass):
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
existing_object: T = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object == __object:
if existing_object.id == __object.id:
return None
else:
break
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
existing_object.merge(__object)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map = from_map)
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
@ -178,37 +182,39 @@ class Collection(Generic[T], metaclass=MetaClass):
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map = from_map)
self.merge_into_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
exists_in_collection[0]._merge_in_self(__object, from_map = from_map)
pass
exists_in_collection[0].merge_into_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
def sync_with_other_collection(self, equal_collection: Collection):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
@ -222,31 +228,31 @@ class Collection(Generic[T], metaclass=MetaClass):
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
for equal_sub_collection in equal_collection.children:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
if sub_collection in self.children:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
self.children.append(sub_collection)
sub_collection.parents.append(self)
@property
def data(self) -> List[T]:
return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
return [*self._data,
*(__object for collection in self.children for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
return len(self._data) + sum(len(collection) for collection in self.children)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element
yield element

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,257 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
from .parents import OuterProxy
T = TypeVar('T', bound=OuterProxy)
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, T] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -1,221 +1,256 @@
from typing import List, Iterable, Dict, TypeVar, Generic, Iterator
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
from dataclasses import dataclass
from .parents import DatabaseObject
from ..utils.hooks import HookEventTypes, Hooks, Event
class CollectionHooks(HookEventTypes):
APPEND_NEW = "append_new"
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
@dataclass
class AppendResult:
was_in_collection: bool
current_element: DatabaseObject
was_the_same: bool
class Collection(Generic[T]):
"""
This a class for the iterables
like tracklist or discography
"""
_data: List[T]
_by_url: dict
_by_attribute: dict
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None:
# Attribute needs to point to
self.element_type = element_type
shallow_list = property(fget=lambda self: self.data)
self._data: List[T] = list()
def __init__(
self, data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, DatabaseObject] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
"""
example of attribute_to_object_map
the song objects are references pointing to objects
in _data
```python
{
'id': {323: song_1, 563: song_2, 666: song_3},
'url': {'www.song_2.com': song_2}
}
```
"""
self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict)
self._used_ids: set = set()
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
self.hooks: Hooks = Hooks(self)
self.contain_self_on_append: List[str] = []
if data is not None:
self.extend(data, merge_on_conflict=True)
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
def sort(self, reverse: bool = False, **kwargs):
self._data.sort(reverse=reverse, **kwargs)
self.extend(data)
def map_element(self, element: T):
for name, value in element.indexing_values:
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._attribute_to_object_map[name][value] = element
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
self._used_ids.add(element.id)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
def unmap_element(self, element: T):
for name, value in element.indexing_values:
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
if value in self._attribute_to_object_map[name]:
if element is self._attribute_to_object_map[name][value]:
try:
self._attribute_to_object_map[name].pop(value)
except KeyError:
pass
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
def append(self, element: T, merge_on_conflict: bool = True,
merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult:
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
:param element:
:param merge_on_conflict:
:param merge_into_existing:
:return did_not_exist:
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if element is None:
return AppendResult(False, None, False)
for existing_element in self._data:
if element is existing_element:
return AppendResult(False, None, False)
# if the element type has been defined in the initializer it checks if the type matches
if self.element_type is not None and not isinstance(element, self.element_type):
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
# return if the same instance of the object is in the list
for existing in self._data:
if element is existing:
return AppendResult(True, element, True)
for name, value in element.indexing_values:
if value in self._attribute_to_object_map[name]:
existing_object = self._attribute_to_object_map[name][value]
if not merge_on_conflict:
return AppendResult(True, existing_object, False)
# if the object does already exist
# thus merging and don't add it afterward
if merge_into_existing:
existing_object.merge(element)
# in case any relevant data has been added (e.g. it remaps the old object)
self.map_element(existing_object)
return AppendResult(True, existing_object, False)
element.merge(existing_object)
exists_at = self._data.index(existing_object)
self._data[exists_at] = element
self.unmap_element(existing_object)
self.map_element(element)
return AppendResult(True, existing_object, False)
if not no_hook:
self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element)
self._data.append(element)
self.map_element(element)
return AppendResult(False, element, False)
def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True,
merge_into_existing: bool = True, no_hook: bool = False):
if element_list is None:
if __object.id in self._contains_ids:
return
if len(element_list) <= 0:
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if element_list is self:
if __object.id in self._contains_ids:
return
for element in element_list:
self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook)
def sync_collection(self, collection_attribute: str):
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection = new_object.__getattribute__(collection_attribute)
if self is new_collection:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
self.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
new_object.__setattr__(collection_attribute, self)
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def sync_main_collection(self, main_collection: "Collection", collection_attribute: str):
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection = new_object.__getattribute__(collection_attribute)
if main_collection is new_collection:
return
main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
new_object.__setattr__(collection_attribute, main_collection)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
for __object in __iterable:
self.append(__object)
"""
def on_append(event: Event, new_object: T, *args, **kwargs):
new_collection: Collection = new_object.__getattribute__(collection_attribute)
if self is new_collection:
return
self.extend(new_collection.shallow_list, no_hook=False)
new_object.__setattr__(collection_attribute, self)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
"""
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element
def __str__(self) -> str:
return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
def __len__(self) -> int:
return len(self._data)
def __getitem__(self, key) -> T:
if type(key) != int:
return ValueError("key needs to be an integer")
return self._data[key]
def __setitem__(self, key, value: T):
if type(key) != int:
return ValueError("key needs to be an integer")
old_item = self._data[key]
self.unmap_element(old_item)
self.map_element(value)
self._data[key] = value
@property
def shallow_list(self) -> List[T]:
"""
returns a shallow copy of the data list
"""
return self._data.copy()
@property
def empty(self) -> bool:
return len(self._data) == 0
def clear(self):
self.__init__(element_type=self.element_type)
yield element

View File

@ -9,12 +9,13 @@ from .option import Options
from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings
from ..utils.support_classes.hacking import MetaClass
from ..utils.exception.objects import IsDynamicException
LOGGER = logging_settings["object_logger"]
P = TypeVar('P')
@dataclass
class StaticAttribute(Generic[P]):
name: str
@ -27,6 +28,139 @@ class StaticAttribute(Generic[P]):
is_upwards_collection: bool = False
class InnerData:
"""
This is the core class, which is used for every Data class.
The attributes are set, and can be merged.
The concept is, that the outer class proxies this class.
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
"""
def __init__(self, **kwargs):
for key, value in kwargs.items():
self.__setattr__(key, value)
def __merge__(self, __other: InnerData, override: bool = False):
"""
TODO
is default is totally ignored
:param __other:
:param override:
:return:
"""
for key, value in __other.__dict__.items():
# just set the other value if self doesn't already have it
if key not in self.__dict__:
self.__setattr__(key, value)
continue
# if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key)
if hasattr(type(existing), "__merge__"):
existing.merge_into_self(value, override)
continue
# override the existing value if requested
if override:
self.__setattr__(key, value)
class OuterProxy:
"""
Wraps the inner data, and provides apis, to naturally access those values.
"""
_default_factories: dict
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
_automatic_id: bool = False
if _id is None and not dynamic:
"""
generates a random integer id
the range is defined in the config
"""
_id = random.randint(0, HIGHEST_ID)
_automatic_id = True
kwargs["automatic_id"] = _automatic_id
kwargs["id"] = _id
kwargs["dynamic"] = dynamic
for name, factory in type(self)._default_factories.items():
if name not in kwargs:
kwargs[name] = factory()
self._inner: InnerData = InnerData(**kwargs)
self.__init_collections__()
for name, data_list in kwargs.items():
if isinstance(data_list, list) and name.endswith("_list"):
collection_name = name.replace("_list", "_collection")
if collection_name not in self.__dict__:
continue
collection = self.__getattribute__(collection_name)
collection.extend(data_list)
def __init_collections__(self):
pass
def __getattribute__(self, __name: str) -> Any:
"""
Returns the attribute of _inner if the attribute exists,
else it returns the attribute of self.
That the _inner gets checked first is essential for the type hints.
:param __name:
:return:
"""
_inner: InnerData = super().__getattribute__("_inner")
try:
return _inner.__getattribute__(__name)
except AttributeError:
return super().__getattribute__(__name)
def __setattr__(self, __name, __value):
if not __name.startswith("_") and hasattr(self, "_inner"):
_inner: InnerData = super().__getattribute__("_inner")
return _inner.__setattr__(__name, __value)
return super().__setattr__(__name, __value)
def __hash__(self):
"""
:raise: IsDynamicException
:return:
"""
if self.dynamic:
return id(self._inner)
return self.id
def __eq__(self, other: Any):
return self.__hash__() == other.__hash__()
def merge(self, __other: OuterProxy, override: bool = False):
"""
1. merges the data of __other in self
2. replaces the data of __other with the data of self
:param __other:
:param override:
:return:
"""
self._inner.__merge__(__other._inner, override=override)
__other._inner = self._inner
class Attribute(Generic[P]):
def __init__(self, database_object: "DatabaseObject", static_attribute: StaticAttribute) -> None:
self.database_object: DatabaseObject = database_object
@ -38,12 +172,11 @@ class Attribute(Generic[P]):
def get(self) -> P:
return self.database_object.__getattribute__(self.name)
def set(self, value: P):
self.database_object.__setattr__(self.name, value)
class DatabaseObject(metaclass=MetaClass):
COLLECTION_STRING_ATTRIBUTES: tuple = tuple()
SIMPLE_STRING_ATTRIBUTES: dict = dict()
@ -77,7 +210,7 @@ class DatabaseObject(metaclass=MetaClass):
for static_attribute in self.STATIC_ATTRIBUTES:
attribute: Attribute = Attribute(self, static_attribute)
self._attributes.append(attribute)
if static_attribute.is_collection:
if static_attribute.is_collection:
self._collection_attributes.append(attribute)
@ -94,6 +227,8 @@ class DatabaseObject(metaclass=MetaClass):
self.dynamic = dynamic
self.build_version = -1
super().__init__()
@property
def upwards_collection(self) -> Collection:
for attribute in self._upwards_collection_attributes:
@ -114,10 +249,19 @@ class DatabaseObject(metaclass=MetaClass):
raise TypeError("Dynamic DatabaseObjects are unhashable.")
return self.id
def __deep_eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return False
return super().__eq__(other)
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return False
if super().__eq__(other):
return True
# add the checks for dynamic, to not throw an exception
if not self.dynamic and not other.dynamic and self.id == other.id:
return True
@ -152,10 +296,10 @@ class DatabaseObject(metaclass=MetaClass):
if other is None:
return
if self.id == other.id:
if self.__deep_eq__(other):
return
if not isinstance(other, type(self)):
LOGGER.warning(f"can't merge \"{type(other)}\" into \"{type(self)}\"")
return
@ -163,6 +307,7 @@ class DatabaseObject(metaclass=MetaClass):
for collection in self._collection_attributes:
if hasattr(self, collection.name) and hasattr(other, collection.name):
if collection.get() is not getattr(other, collection.name):
pass
collection.get().extend(getattr(other, collection.name))
for simple_attribute, default_value in type(self).SIMPLE_STRING_ATTRIBUTES.items():
@ -190,7 +335,7 @@ class DatabaseObject(metaclass=MetaClass):
@property
def option_string(self) -> str:
return self.__repr__()
def _build_recursive_structures(self, build_version: int, merge: False):
pass
@ -202,7 +347,7 @@ class DatabaseObject(metaclass=MetaClass):
no need to override if only the recursive structure should be build.
override self.build_recursive_structures() instead
"""
self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into)
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type
@ -15,11 +17,13 @@ from .metadata import (
Metadata
)
from .option import Options
from .parents import MainObject, DatabaseObject, StaticAttribute
from .parents import DatabaseObject, StaticAttribute
from .source import Source, SourceCollection
from .target import Target
from ..utils.string_processing import unify
from .parents import OuterProxy as Base
from ..utils.config import main_settings
"""
@ -30,12 +34,39 @@ CountryTyping = type(list(pycountry.countries)[0])
OPTION_STRING_DELIMITER = " | "
class Song(MainObject):
class Song(Base):
"""
Class representing a song object, with attributes id, mb_id, title, album_name, isrc, length,
tracksort, genre, source_list, target, lyrics_list, album, main_artist_list, and feature_artist_list.
"""
title: str
unified_title: str
isrc: str
length: int
genre: str
note: FormattedText
source_collection: SourceCollection
target_collection: Collection[Target]
lyrics_collection: Collection[Lyrics]
main_artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
album_collection: Collection[Album]
_default_factories = {
"note": FormattedText,
"length": lambda: 0,
"source_collection": SourceCollection,
"target_collection": Collection,
"lyrics_collection": Collection,
"main_artist_collection": Collection,
"album_collection": Collection,
"feature_artist_collection": Collection
}
"""
COLLECTION_STRING_ATTRIBUTES = (
"lyrics_collection", "album_collection", "main_artist_collection", "feature_artist_collection",
"source_collection")
@ -48,118 +79,38 @@ class Song(MainObject):
"genre": None,
"notes": FormattedText()
}
"""
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection")
"""
title: str = None,
unified_title: str = None,
isrc: str = None,
length: int = None,
tracksort: int = None,
genre: str = None,
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
album_list: List['Album'] = None,
main_artist_list: List['Artist'] = None,
feature_artist_list: List['Artist'] = None,
notes: FormattedText = None,
"""
def __init_collections__(self) -> None:
self.album_collection.contain_given_in_attribute = {
"artist_collection": self.main_artist_collection,
}
self.album_collection.append_object_to_attribute = {
"song_collection": self,
}
STATIC_ATTRIBUTES = [
StaticAttribute(name="title", weight=.5),
StaticAttribute(name="unified_title", weight=.3),
StaticAttribute(name="isrc", weight=1),
StaticAttribute(name="length"),
StaticAttribute(name="tracksort", default_value=0),
StaticAttribute(name="genre"),
StaticAttribute(name="notes", default_value=FormattedText()),
StaticAttribute(name="source_collection", is_collection=True),
StaticAttribute(name="lyrics_collection", is_collection=True),
StaticAttribute(name="album_collection", is_collection=True, is_upwards_collection=True),
StaticAttribute(name="main_artist_collection", is_collection=True, is_upwards_collection=True),
StaticAttribute(name="feature_artist_collection", is_collection=True, is_upwards_collection=True)
]
def __init__(
self,
_id: int = None,
dynamic: bool = False,
title: str = None,
unified_title: str = None,
isrc: str = None,
length: int = None,
tracksort: int = None,
genre: str = None,
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
album_list: List['Album'] = None,
main_artist_list: List['Artist'] = None,
feature_artist_list: List['Artist'] = None,
notes: FormattedText = None,
**kwargs
) -> None:
super().__init__(_id=_id, dynamic=dynamic, **kwargs)
# attributes
self.title: str = title
self.unified_title: str = unified_title
if unified_title is None and title is not None:
self.unified_title = unify(title)
self.isrc: str = isrc
self.length: int = length
self.tracksort: int = tracksort or 0
self.genre: str = genre
self.notes: FormattedText = notes or FormattedText()
self.source_collection: SourceCollection = SourceCollection(source_list)
self.target_collection: Collection[Target] = Collection(data=target_list)
self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list)
# main_artist_collection = album.artist collection
self.main_artist_collection: Collection[Artist] = Collection(data=[])
# this album_collection equals no collection
self.album_collection: Collection[Album] = Collection(data=album_list,
contain_given_in_attribute={
"artist_collection": self.main_artist_collection
}, append_object_to_attribute={
"song_collection": self
})
self.main_artist_collection.contain_given_in_attribute = {"main_album_collection": self.album_collection}
self.main_artist_collection.extend(main_artist_list)
self.feature_artist_collection: Collection[Artist] = Collection(
data=feature_artist_list,
append_object_to_attribute={
"feature_song_collection": self
}
)
def _build_recursive_structures(self, build_version: int, merge: bool):
if build_version == self.build_version:
return
self.build_version = build_version
album: Album
for album in self.album_collection:
album.song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False)
album._build_recursive_structures(build_version=build_version, merge=merge)
artist: Artist
for artist in self.feature_artist_collection:
artist.feature_song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False)
artist._build_recursive_structures(build_version=build_version, merge=merge)
for artist in self.main_artist_collection:
for album in self.album_collection:
artist.main_album_collection.append(album, merge_on_conflict=merge, merge_into_existing=False)
artist._build_recursive_structures(build_version=build_version, merge=merge)
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):
if object_type is Song:
return
if object_type is Lyrics:
self.lyrics_collection.extend(object_list)
return
if object_type is Artist:
self.main_artist_collection.extend(object_list)
return
if object_type is Album:
self.album_collection.extend(object_list)
return
self.main_artist_collection.contain_given_in_attribute = {
"main_album_collection": self.album_collection
}
self.feature_artist_collection.append_object_to_attribute = {
"feature_song_collection": self
}
@property
def indexing_values(self) -> List[Tuple[str, object]]:
@ -245,7 +196,7 @@ All objects dependent on Album
"""
class Album(MainObject):
class Album(Base):
COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection", "song_collection")
SIMPLE_STRING_ATTRIBUTES = {
"title": None,
@ -259,6 +210,16 @@ class Album(MainObject):
"notes": FormattedText()
}
title: str
unified_title: str
album_status: str
album_type: AlbumType
language: LanguageSelector
_default_factories = {
"album_type": AlbumType.OTHER
}
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection", )
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
@ -298,7 +259,7 @@ class Album(MainObject):
notes: FormattedText = None,
**kwargs
) -> None:
MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.title: str = title
self.unified_title: str = unified_title
@ -512,7 +473,7 @@ All objects dependent on Artist
"""
class Artist(MainObject):
class Artist(Base):
COLLECTION_STRING_ATTRIBUTES = (
"feature_song_collection",
"main_album_collection",
@ -570,7 +531,7 @@ class Artist(MainObject):
unformated_location: str = None,
**kwargs
):
MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.name: str = name
self.unified_name: str = unified_name
@ -806,7 +767,7 @@ Label
"""
class Label(MainObject):
class Label(Base):
COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection")
SIMPLE_STRING_ATTRIBUTES = {
"name": None,
@ -837,7 +798,7 @@ class Label(MainObject):
source_list: List[Source] = None,
**kwargs
):
MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.name: str = name
self.unified_name: str = unified_name

View File

@ -121,7 +121,8 @@ class Source(DatabaseObject):
class SourceCollection(Collection):
def __init__(self, source_list: List[Source]):
def __init__(self, source_list: List[Source] = None):
source_list = source_list if source_list is not None else []
self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
super().__init__(data=source_list)

View File

@ -0,0 +1,10 @@
class ObjectException(Exception):
pass
class IsDynamicException(Exception):
"""
Gets raised, if a dynamic data object tries to perform an action,
which does not make sense for a dynamic object.
"""
pass

View File

@ -1,7 +1,8 @@
import weakref
from types import FunctionType
from functools import wraps
from typing import Dict
from typing import Dict, Set
class Lake:
def __init__(self):
@ -17,15 +18,34 @@ class Lake:
return self.id_to_object[_id]
except KeyError:
self.add(db_object)
return db_object
return db_object
def add(self, db_object: object):
self.id_to_object[id(db_object)] = db_object
def override(self, to_override: object, new_db_object: object):
self.redirects[id(to_override)] = id(new_db_object)
if id(to_override) in self.id_to_object:
del self.id_to_object[id(to_override)]
_id = id(to_override)
while _id in self.redirects:
_id = self.redirects[_id]
if id(new_db_object) in self.id_to_object:
print("!!!!!")
self.add(new_db_object)
self.redirects[_id] = id(new_db_object)
# if _id in self.id_to_object:
# del self.id_to_object[_id]
def is_same(self, __object: object, other: object) -> bool:
_self_id = id(__object)
while _self_id in self.redirects:
_self_id = self.redirects[_self_id]
_other_id = id(other)
while _other_id in self.redirects:
_other_id = self.redirects[_other_id]
return _self_id == _other_id
lake = Lake()
@ -35,11 +55,20 @@ def wrapper(method):
@wraps(method)
def wrapped(*args, **kwargs):
return method(*(lake.get_real_object(args[0]), *args[1:]), **kwargs)
return wrapped
class BaseClass:
def __new__(cls, *args, **kwargs):
instance = cls(*args, **kwargs)
print("new")
lake.add(instance)
return instance
def __eq__(self, other):
return lake.is_same(self, other)
def _risky_merge(self, to_replace):
lake.override(to_replace, self)
@ -49,17 +78,27 @@ class MetaClass(type):
bases = (*bases, BaseClass)
newClassDict = {}
ignore_functions: Set[str] = {"__new__", "__init__"}
for attributeName, attribute in classDict.items():
if isinstance(attribute, FunctionType) and attributeName not in ("__new__", "__init__"):
if isinstance(attribute, FunctionType) and (attributeName not in ignore_functions):
"""
The funktion new and init shouldn't be accounted for because we can assume the class is
independent on initialization.
"""
attribute = wrapper(attribute)
newClassDict[attributeName] = attribute
for key, value in object.__dict__.items( ):
if hasattr( value, '__call__' ) and value not in newClassDict and key not in ("__new__", "__init__"):
print()
for key, value in object.__dict__.items():
# hasattr( value, '__call__' ) and
if hasattr(value, '__call__') and value not in newClassDict and key not in ("__new__", "__init__"):
newClassDict[key] = wrapper(value)
new_instance = type.__new__(meta, classname, bases, newClassDict)
lake.add(new_instance)
return new_instance
return new_instance