diff --git a/.idea/misc.xml b/.idea/misc.xml
index 6468d4f..df5fc58 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
+
+
+
\ No newline at end of file
diff --git a/.idea/music-downloader.iml b/.idea/music-downloader.iml
index d3aa814..27eed28 100644
--- a/.idea/music-downloader.iml
+++ b/.idea/music-downloader.iml
@@ -3,9 +3,10 @@
+
-
+
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 1a15e6d..35eb1dd 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -2,6 +2,5 @@
-
\ No newline at end of file
diff --git a/src/create_custom_objects.py b/src/create_custom_objects.py
index a61da6c..49d58a9 100644
--- a/src/create_custom_objects.py
+++ b/src/create_custom_objects.py
@@ -9,7 +9,23 @@ from music_kraken.objects import (
from music_kraken.objects.collection import Collection
from music_kraken.utils.enums import SourcePages
+song = Song(title="Sad Story", isrc="testTest")
+other_song = Song(title="hihi", genre="dsbm")
+song.merge(other_song)
+
+print(song.__dict__)
+print(other_song.__dict__)
+
+other_song.title = ":3"
+
+print(song.__dict__)
+print(other_song.__dict__)
+
+
+print(song)
+
+"""
only_smile = Artist(
name="Only Smile",
source_list=[Source(SourcePages.BANDCAMP, "https://onlysmile.bandcamp.com/")],
@@ -100,10 +116,16 @@ def add_to_objects_dump(db_obj: DatabaseObject):
add_to_objects_dump(only_smile)
for _id, _object in objects_by_id.items():
- print(_id, _object, sep=": ")
+ try:
+ print(_id, _object.title, sep=": ")
+ except AttributeError:
+ try:
+ print(_id, _object.name, sep=": ")
+ except AttributeError:
+ print(_id, _object, sep=": ")
print(only_smile)
-
+"""
"""
c = Collection([Song(title="hi"), Song(title="hi2"), Song(title="hi3")])
c1 = Collection([Song(title="he"), Song(title="hi5")])
diff --git a/src/music_kraken/objects/collection.py b/src/music_kraken/objects/collection.py
index f105de9..de98f47 100644
--- a/src/music_kraken/objects/collection.py
+++ b/src/music_kraken/objects/collection.py
@@ -1,14 +1,13 @@
-from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
+from __future__ import annotations
+
from collections import defaultdict
+from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator
+from .parents import OuterProxy
-from .parents import DatabaseObject
-from ..utils.support_classes.hacking import MetaClass
+T = TypeVar('T', bound=OuterProxy)
-T = TypeVar('T', bound=DatabaseObject)
-
-
-class Collection(Generic[T], metaclass=MetaClass):
+class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
@@ -17,16 +16,18 @@ class Collection(Generic[T], metaclass=MetaClass):
shallow_list = property(fget=lambda self: self.data)
def __init__(
- self, data: Optional[Iterable[T]],
- sync_on_append: Dict[str, "Collection"] = None,
- contain_given_in_attribute: Dict[str, "Collection"] = None,
- contain_attribute_in_given: Dict[str, "Collection"] = None,
- append_object_to_attribute: Dict[str, DatabaseObject] = None
+ self,
+ data: Optional[Iterable[T]] = None,
+ sync_on_append: Dict[str, Collection] = None,
+ contain_given_in_attribute: Dict[str, Collection] = None,
+ contain_attribute_in_given: Dict[str, Collection] = None,
+ append_object_to_attribute: Dict[str, T] = None
) -> None:
self._contains_ids = set()
self._data = []
- self.upper_collections: List[Collection[T]] = []
- self.contained_collections: List[Collection[T]] = []
+
+ self.parents: List[Collection[T]] = []
+ self.children: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
@@ -34,13 +35,13 @@ class Collection(Generic[T], metaclass=MetaClass):
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
- self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
+ self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
-
+
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
@@ -56,13 +57,13 @@ class Collection(Generic[T], metaclass=MetaClass):
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
-
+
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
- __object.__getattribute__(attribute).append(new_object, from_map = True)
-
+ __object.__getattribute__(attribute).append(new_object, from_map=True)
+
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
@@ -71,7 +72,7 @@ class Collection(Generic[T], metaclass=MetaClass):
continue
if value not in self._indexed_values[name]:
continue
-
+
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
@@ -81,59 +82,62 @@ class Collection(Generic[T], metaclass=MetaClass):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
+ if __object.id in self._contains_ids:
+ return True
+
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
-
- def _get_root_collections(self) -> List["Collection"]:
- if not len(self.upper_collections):
+
+ def _get_root_collections(self) -> List[Collection]:
+ if not len(self.parents):
return [self]
-
+
root_collections = []
- for upper_collection in self.upper_collections:
+ for upper_collection in self.parents:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
- return len(self.upper_collections) <= 0
+ return len(self.parents) <= 0
- def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
+ def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List[Collection]:
results = []
if self._contained_in_self(__object):
return [self]
-
- for collection in self.contained_collections:
+
+ for collection in self.children:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
-
+
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
- if len(self.contained_collections) < 2 or self._contained_in_self(__object):
+ if len(self.children) < 2 or self._contained_in_self(__object):
return results
-
+
count = 0
- for collection in self.contained_collections:
+ for collection in self.children:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
-
+
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
-
+
if count >= 2:
results.append(self)
return results
-
- def _merge_in_self(self, __object: T, from_map: bool = False):
+
+ def merge_into_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
@@ -141,30 +145,30 @@ class Collection(Generic[T], metaclass=MetaClass):
"""
if __object.id in self._contains_ids:
return
-
- existing_object: DatabaseObject = None
+
+ existing_object: T = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
- if existing_object == __object:
+ if existing_object.id == __object.id:
return None
- else:
- break
-
+
+ break
+
if existing_object is None:
return None
- existing_object.merge(__object, replace_all_refs=True)
+ existing_object.merge(__object)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
- self._map_element(existing_object, from_map = from_map)
-
+ self._map_element(existing_object, from_map=from_map)
+
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
@@ -178,37 +182,39 @@ class Collection(Generic[T], metaclass=MetaClass):
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
+
if __object.id in self._contains_ids:
return
-
+
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
- self._merge_in_self(__object, from_map = from_map)
+ self.merge_into_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
- exists_in_collection[0]._merge_in_self(__object, from_map = from_map)
+ pass
+ exists_in_collection[0].merge_into_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
+ pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
-
+
for __object in __iterable:
self.append(__object)
-
- def sync_with_other_collection(self, equal_collection: "Collection"):
+ def sync_with_other_collection(self, equal_collection: Collection):
"""
If two collections always need to have the same values, this can be used.
-
+
Internally:
1. import the data from other to self
- _data
@@ -222,31 +228,31 @@ class Collection(Generic[T], metaclass=MetaClass):
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
- for equal_sub_collection in equal_collection.contained_collections:
+ for equal_sub_collection in equal_collection.children:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
-
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
- if sub_collection in self.contained_collections:
+ if sub_collection in self.children:
return
-
- self.contained_collections.append(sub_collection)
- sub_collection.upper_collections.append(self)
+
+ self.children.append(sub_collection)
+ sub_collection.parents.append(self)
@property
def data(self) -> List[T]:
- return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
-
+ return [*self._data,
+ *(__object for collection in self.children for __object in collection.shallow_list)]
+
def __len__(self) -> int:
- return len(self._data) + sum(len(collection) for collection in self.contained_collections)
+ return len(self._data) + sum(len(collection) for collection in self.children)
def __iter__(self) -> Iterator[T]:
for element in self._data:
- yield element
\ No newline at end of file
+ yield element
diff --git a/src/music_kraken/objects/country.py b/src/music_kraken/objects/country.py
index aed4842..c9aeaa3 100644
--- a/src/music_kraken/objects/country.py
+++ b/src/music_kraken/objects/country.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
from dataclasses import dataclass
import pycountry
@@ -9,7 +11,6 @@ CountryTyping = type(list(pycountry.countries)[0])
emoji_map = {'AD': '🇦🇩', 'AE': '🇦🇪', 'AF': '🇦🇫', 'AG': '🇦🇬', 'AI': '🇦🇮', 'AL': '🇦🇱', 'AM': '🇦🇲', 'AO': '🇦🇴', 'AQ': '🇦🇶', 'AR': '🇦🇷', 'AS': '🇦🇸', 'AT': '🇦🇹', 'AU': '🇦🇺', 'AW': '🇦🇼', 'AX': '🇦🇽', 'AZ': '🇦🇿', 'BA': '🇧🇦', 'BB': '🇧🇧', 'BD': '🇧🇩', 'BE': '🇧🇪', 'BF': '🇧🇫', 'BG': '🇧🇬', 'BH': '🇧ðŸ‡', 'BI': '🇧🇮', 'BJ': '🇧🇯', 'BL': '🇧🇱', 'BM': '🇧🇲', 'BN': '🇧🇳', 'BO': '🇧🇴', 'BQ': '🇧🇶', 'BR': '🇧🇷', 'BS': '🇧🇸', 'BT': '🇧🇹', 'BV': '🇧🇻', 'BW': '🇧🇼', 'BY': '🇧🇾', 'BZ': '🇧🇿', 'CA': '🇨🇦', 'CC': '🇨🇨', 'CD': '🇨🇩', 'CF': '🇨🇫', 'CG': '🇨🇬', 'CH': '🇨ðŸ‡', 'CI': '🇨🇮', 'CK': '🇨🇰', 'CL': '🇨🇱', 'CM': '🇨🇲', 'CN': '🇨🇳', 'CO': '🇨🇴', 'CR': '🇨🇷', 'CU': '🇨🇺', 'CV': '🇨🇻', 'CW': '🇨🇼', 'CX': '🇨🇽', 'CY': '🇨🇾', 'CZ': '🇨🇿', 'DE': '🇩🇪', 'DJ': '🇩🇯', 'DK': '🇩🇰', 'DM': '🇩🇲', 'DO': '🇩🇴', 'DZ': '🇩🇿', 'EC': '🇪🇨', 'EE': '🇪🇪', 'EG': '🇪🇬', 'EH': '🇪ðŸ‡', 'ER': '🇪🇷', 'ES': '🇪🇸', 'ET': '🇪🇹', 'FI': '🇫🇮', 'FJ': '🇫🇯', 'FK': '🇫🇰', 'FM': '🇫🇲', 'FO': '🇫🇴', 'FR': '🇫🇷', 'GA': '🇬🇦', 'GB': '🇬🇧', 'GD': '🇬🇩', 'GE': '🇬🇪', 'GF': '🇬🇫', 'GG': '🇬🇬', 'GH': '🇬ðŸ‡', 'GI': '🇬🇮', 'GL': '🇬🇱', 'GM': '🇬🇲', 'GN': '🇬🇳', 'GP': '🇬🇵', 'GQ': '🇬🇶', 'GR': '🇬🇷', 'GS': '🇬🇸', 'GT': '🇬🇹', 'GU': '🇬🇺', 'GW': '🇬🇼', 'GY': '🇬🇾', 'HK': 'ðŸ‡ðŸ‡°', 'HM': 'ðŸ‡ðŸ‡²', 'HN': 'ðŸ‡ðŸ‡³', 'HR': 'ðŸ‡ðŸ‡·', 'HT': 'ðŸ‡ðŸ‡¹', 'HU': 'ðŸ‡ðŸ‡º', 'ID': '🇮🇩', 'IE': '🇮🇪', 'IL': '🇮🇱', 'IM': '🇮🇲', 'IN': '🇮🇳', 'IO': '🇮🇴', 'IQ': '🇮🇶', 'IR': '🇮🇷', 'IS': '🇮🇸', 'IT': '🇮🇹', 'JE': '🇯🇪', 'JM': '🇯🇲', 'JO': '🇯🇴', 'JP': '🇯🇵', 'KE': '🇰🇪', 'KG': '🇰🇬', 'KH': '🇰ðŸ‡', 'KI': '🇰🇮', 'KM': '🇰🇲', 'KN': '🇰🇳', 'KP': '🇰🇵', 'KR': '🇰🇷', 'KW': '🇰🇼', 'KY': '🇰🇾', 'KZ': '🇰🇿', 'LA': '🇱🇦', 'LB': '🇱🇧', 'LC': '🇱🇨', 'LI': '🇱🇮', 'LK': '🇱🇰', 'LR': '🇱🇷', 'LS': '🇱🇸', 'LT': '🇱🇹', 'LU': '🇱🇺', 'LV': '🇱🇻', 'LY': '🇱🇾', 'MA': '🇲🇦', 'MC': '🇲🇨', 'MD': '🇲🇩', 'ME': '🇲🇪', 'MF': '🇲🇫', 'MG': '🇲🇬', 'MH': '🇲ðŸ‡', 'MK': '🇲🇰', 'ML': '🇲🇱', 'MM': '🇲🇲', 'MN': '🇲🇳', 'MO': '🇲🇴', 'MP': '🇲🇵', 'MQ': '🇲🇶', 'MR': '🇲🇷', 'MS': '🇲🇸', 'MT': '🇲🇹', 'MU': '🇲🇺', 'MV': '🇲🇻', 'MW': '🇲🇼', 'MX': '🇲🇽', 'MY': '🇲🇾', 'MZ': '🇲🇿', 'NA': '🇳🇦', 'NC': '🇳🇨', 'NE': '🇳🇪', 'NF': '🇳🇫', 'NG': '🇳🇬', 'NI': '🇳🇮', 'NL': '🇳🇱', 'NO': '🇳🇴', 'NP': '🇳🇵', 'NR': '🇳🇷', 'NU': '🇳🇺', 'NZ': '🇳🇿', 'OM': '🇴🇲', 'PA': '🇵🇦', 'PE': '🇵🇪', 'PF': '🇵🇫', 'PG': '🇵🇬', 'PH': '🇵ðŸ‡', 'PK': '🇵🇰', 'PL': '🇵🇱', 'PM': '🇵🇲', 'PN': '🇵🇳', 'PR': '🇵🇷', 'PS': '🇵🇸', 'PT': '🇵🇹', 'PW': '🇵🇼', 'PY': '🇵🇾', 'QA': '🇶🇦', 'RE': '🇷🇪', 'RO': '🇷🇴', 'RS': '🇷🇸', 'RU': '🇷🇺', 'RW': '🇷🇼', 'SA': '🇸🇦', 'SB': '🇸🇧', 'SC': '🇸🇨', 'SD': '🇸🇩', 'SE': '🇸🇪', 'SG': '🇸🇬', 'SH': '🇸ðŸ‡', 'SI': '🇸🇮', 'SJ': '🇸🇯', 'SK': '🇸🇰', 'SL': '🇸🇱', 'SM': '🇸🇲', 'SN': '🇸🇳', 'SO': '🇸🇴', 'SR': '🇸🇷', 'SS': '🇸🇸', 'ST': '🇸🇹', 'SV': '🇸🇻', 'SX': '🇸🇽', 'SY': '🇸🇾', 'SZ': '🇸🇿', 'TC': '🇹🇨', 'TD': '🇹🇩', 'TF': '🇹🇫', 'TG': '🇹🇬', 'TH': '🇹ðŸ‡', 'TJ': '🇹🇯', 'TK': '🇹🇰', 'TL': '🇹🇱', 'TM': '🇹🇲', 'TN': '🇹🇳', 'TO': '🇹🇴', 'TR': '🇹🇷', 'TT': '🇹🇹', 'TV': '🇹🇻', 'TW': '🇹🇼', 'TZ': '🇹🇿', 'UA': '🇺🇦', 'UG': '🇺🇬', 'UM': '🇺🇲', 'US': '🇺🇸', 'UY': '🇺🇾', 'UZ': '🇺🇿', 'VA': '🇻🇦', 'VC': '🇻🇨', 'VE': '🇻🇪', 'VG': '🇻🇬', 'VI': '🇻🇮', 'VN': '🇻🇳', 'VU': '🇻🇺', 'WF': '🇼🇫', 'WS': '🇼🇸', 'YE': '🇾🇪', 'YT': '🇾🇹', 'ZA': '🇿🇦', 'ZM': '🇿🇲', 'ZW': '🇿🇼', '🇦🇩': 'AD', '🇦🇪': 'AE', '🇦🇫': 'AF', '🇦🇬': 'AG', '🇦🇮': 'AI', '🇦🇱': 'AL', '🇦🇲': 'AM', '🇦🇴': 'AO', '🇦🇶': 'AQ', '🇦🇷': 'AR', '🇦🇸': 'AS', '🇦🇹': 'AT', '🇦🇺': 'AU', '🇦🇼': 'AW', '🇦🇽': 'AX', '🇦🇿': 'AZ', '🇧🇦': 'BA', '🇧🇧': 'BB', '🇧🇩': 'BD', '🇧🇪': 'BE', '🇧🇫': 'BF', '🇧🇬': 'BG', '🇧ðŸ‡': 'BH', '🇧🇮': 'BI', '🇧🇯': 'BJ', '🇧🇱': 'BL', '🇧🇲': 'BM', '🇧🇳': 'BN', '🇧🇴': 'BO', '🇧🇶': 'BQ', '🇧🇷': 'BR', '🇧🇸': 'BS', '🇧🇹': 'BT', '🇧🇻': 'BV', '🇧🇼': 'BW', '🇧🇾': 'BY', '🇧🇿': 'BZ', '🇨🇦': 'CA', '🇨🇨': 'CC', '🇨🇩': 'CD', '🇨🇫': 'CF', '🇨🇬': 'CG', '🇨ðŸ‡': 'CH', '🇨🇮': 'CI', '🇨🇰': 'CK', '🇨🇱': 'CL', '🇨🇲': 'CM', '🇨🇳': 'CN', '🇨🇴': 'CO', '🇨🇷': 'CR', '🇨🇺': 'CU', '🇨🇻': 'CV', '🇨🇼': 'CW', '🇨🇽': 'CX', '🇨🇾': 'CY', '🇨🇿': 'CZ', '🇩🇪': 'DE', '🇩🇯': 'DJ', '🇩🇰': 'DK', '🇩🇲': 'DM', '🇩🇴': 'DO', '🇩🇿': 'DZ', '🇪🇨': 'EC', '🇪🇪': 'EE', '🇪🇬': 'EG', '🇪ðŸ‡': 'EH', '🇪🇷': 'ER', '🇪🇸': 'ES', '🇪🇹': 'ET', '🇫🇮': 'FI', '🇫🇯': 'FJ', '🇫🇰': 'FK', '🇫🇲': 'FM', '🇫🇴': 'FO', '🇫🇷': 'FR', '🇬🇦': 'GA', '🇬🇧': 'GB', '🇬🇩': 'GD', '🇬🇪': 'GE', '🇬🇫': 'GF', '🇬🇬': 'GG', '🇬ðŸ‡': 'GH', '🇬🇮': 'GI', '🇬🇱': 'GL', '🇬🇲': 'GM', '🇬🇳': 'GN', '🇬🇵': 'GP', '🇬🇶': 'GQ', '🇬🇷': 'GR', '🇬🇸': 'GS', '🇬🇹': 'GT', '🇬🇺': 'GU', '🇬🇼': 'GW', '🇬🇾': 'GY', 'ðŸ‡ðŸ‡°': 'HK', 'ðŸ‡ðŸ‡²': 'HM', 'ðŸ‡ðŸ‡³': 'HN', 'ðŸ‡ðŸ‡·': 'HR', 'ðŸ‡ðŸ‡¹': 'HT', 'ðŸ‡ðŸ‡º': 'HU', '🇮🇩': 'ID', '🇮🇪': 'IE', '🇮🇱': 'IL', '🇮🇲': 'IM', '🇮🇳': 'IN', '🇮🇴': 'IO', '🇮🇶': 'IQ', '🇮🇷': 'IR', '🇮🇸': 'IS', '🇮🇹': 'IT', '🇯🇪': 'JE', '🇯🇲': 'JM', '🇯🇴': 'JO', '🇯🇵': 'JP', '🇰🇪': 'KE', '🇰🇬': 'KG', '🇰ðŸ‡': 'KH', '🇰🇮': 'KI', '🇰🇲': 'KM', '🇰🇳': 'KN', '🇰🇵': 'KP', '🇰🇷': 'KR', '🇰🇼': 'KW', '🇰🇾': 'KY', '🇰🇿': 'KZ', '🇱🇦': 'LA', '🇱🇧': 'LB', '🇱🇨': 'LC', '🇱🇮': 'LI', '🇱🇰': 'LK', '🇱🇷': 'LR', '🇱🇸': 'LS', '🇱🇹': 'LT', '🇱🇺': 'LU', '🇱🇻': 'LV', '🇱🇾': 'LY', '🇲🇦': 'MA', '🇲🇨': 'MC', '🇲🇩': 'MD', '🇲🇪': 'ME', '🇲🇫': 'MF', '🇲🇬': 'MG', '🇲ðŸ‡': 'MH', '🇲🇰': 'MK', '🇲🇱': 'ML', '🇲🇲': 'MM', '🇲🇳': 'MN', '🇲🇴': 'MO', '🇲🇵': 'MP', '🇲🇶': 'MQ', '🇲🇷': 'MR', '🇲🇸': 'MS', '🇲🇹': 'MT', '🇲🇺': 'MU', '🇲🇻': 'MV', '🇲🇼': 'MW', '🇲🇽': 'MX', '🇲🇾': 'MY', '🇲🇿': 'MZ', '🇳🇦': 'NA', '🇳🇨': 'NC', '🇳🇪': 'NE', '🇳🇫': 'NF', '🇳🇬': 'NG', '🇳🇮': 'NI', '🇳🇱': 'NL', '🇳🇴': 'NO', '🇳🇵': 'NP', '🇳🇷': 'NR', '🇳🇺': 'NU', '🇳🇿': 'NZ', '🇴🇲': 'OM', '🇵🇦': 'PA', '🇵🇪': 'PE', '🇵🇫': 'PF', '🇵🇬': 'PG', '🇵ðŸ‡': 'PH', '🇵🇰': 'PK', '🇵🇱': 'PL', '🇵🇲': 'PM', '🇵🇳': 'PN', '🇵🇷': 'PR', '🇵🇸': 'PS', '🇵🇹': 'PT', '🇵🇼': 'PW', '🇵🇾': 'PY', '🇶🇦': 'QA', '🇷🇪': 'RE', '🇷🇴': 'RO', '🇷🇸': 'RS', '🇷🇺': 'RU', '🇷🇼': 'RW', '🇸🇦': 'SA', '🇸🇧': 'SB', '🇸🇨': 'SC', '🇸🇩': 'SD', '🇸🇪': 'SE', '🇸🇬': 'SG', '🇸ðŸ‡': 'SH', '🇸🇮': 'SI', '🇸🇯': 'SJ', '🇸🇰': 'SK', '🇸🇱': 'SL', '🇸🇲': 'SM', '🇸🇳': 'SN', '🇸🇴': 'SO', '🇸🇷': 'SR', '🇸🇸': 'SS', '🇸🇹': 'ST', '🇸🇻': 'SV', '🇸🇽': 'SX', '🇸🇾': 'SY', '🇸🇿': 'SZ', '🇹🇨': 'TC', '🇹🇩': 'TD', '🇹🇫': 'TF', '🇹🇬': 'TG', '🇹ðŸ‡': 'TH', '🇹🇯': 'TJ', '🇹🇰': 'TK', '🇹🇱': 'TL', '🇹🇲': 'TM', '🇹🇳': 'TN', '🇹🇴': 'TO', '🇹🇷': 'TR', '🇹🇹': 'TT', '🇹🇻': 'TV', '🇹🇼': 'TW', '🇹🇿': 'TZ', '🇺🇦': 'UA', '🇺🇬': 'UG', '🇺🇲': 'UM', '🇺🇸': 'US', '🇺🇾': 'UY', '🇺🇿': 'UZ', '🇻🇦': 'VA', '🇻🇨': 'VC', '🇻🇪': 'VE', '🇻🇬': 'VG', '🇻🇮': 'VI', '🇻🇳': 'VN', '🇻🇺': 'VU', '🇼🇫': 'WF', '🇼🇸': 'WS', '🇾🇪': 'YE', '🇾🇹': 'YT', '🇿🇦': 'ZA', '🇿🇲': 'ZM', '🇿🇼': 'ZW'}
-
@dataclass
class Country:
alpha_2: str
@@ -19,7 +20,7 @@ class Country:
emoji: str
@classmethod
- def by_pycountry(cls, country: CountryTyping, emoji: str = "") -> "Country":
+ def by_pycountry(cls, country: CountryTyping, emoji: str = "") -> "Country":
emoji = ""
alpha_2 = country.alpha_2.upper()
@@ -40,7 +41,7 @@ class Country:
return cls.by_pycountry(pycountry.countries.get(alpha_2=alpha_2))
@classmethod
- def by_apha_3(cls, alpha_3: str) -> "Country":
+ def by_alpha_3(cls, alpha_3: str) -> "Country":
return cls.by_pycountry(pycountry.countries.get(alpha_3=alpha_3))
@classmethod
@@ -63,4 +64,31 @@ class Country:
return hash(self.alpha_3)
def __eq__(self, __value: object) -> bool:
- return self.__hash__() == __value.__hash__()
\ No newline at end of file
+ return self.__hash__() == __value.__hash__()
+
+
+@dataclass
+class Language:
+ alpha_2: str
+ alpha_3: str
+ name: str
+ numeric: int
+
+ @classmethod
+ def by_pycountry(cls, language) -> Language:
+ alpha_2 = language.alpha_2.upper()
+
+ return cls(
+ alpha_2=alpha_2,
+ alpha_3=language.alpha_3,
+ name=language.name,
+ numeric=language.numeric,
+ )
+
+ @classmethod
+ def by_alpha_2(cls, alpha_2: str) -> Language:
+ return cls.by_pycountry(pycountry.languages.get(alpha_2=alpha_2))
+
+ @classmethod
+ def by_alpha_3(cls, alpha_3: str) -> Language:
+ return cls.by_pycountry(pycountry.languages.get(alpha_3=alpha_3))
diff --git a/src/music_kraken/objects/new_collection.py b/src/music_kraken/objects/new_collection.py
index e69de29..1a556b6 100644
--- a/src/music_kraken/objects/new_collection.py
+++ b/src/music_kraken/objects/new_collection.py
@@ -0,0 +1,257 @@
+from __future__ import annotations
+
+from collections import defaultdict
+from typing import TypeVar, Generic, Dict, Optional, Iterable, List
+from .parents import OuterProxy
+
+T = TypeVar('T', bound=OuterProxy)
+
+
+class Collection(Generic[T]):
+ _data: List[T]
+
+ _indexed_values: Dict[str, set]
+ _indexed_to_objects: Dict[any, list]
+
+ shallow_list = property(fget=lambda self: self.data)
+
+ def __init__(
+ self,
+ data: Optional[Iterable[T]] = None,
+ sync_on_append: Dict[str, "Collection"] = None,
+ contain_given_in_attribute: Dict[str, "Collection"] = None,
+ contain_attribute_in_given: Dict[str, "Collection"] = None,
+ append_object_to_attribute: Dict[str, T] = None
+ ) -> None:
+ self._contains_ids = set()
+ self._data = []
+ self.upper_collections: List[Collection[T]] = []
+ self.contained_collections: List[Collection[T]] = []
+
+ # List of collection attributes that should be modified on append
+ # Key: collection attribute (str) of appended element
+ # Value: main collection to sync to
+ self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
+ self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
+ self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
+ self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
+
+ self.contain_self_on_append: List[str] = []
+
+ self._indexed_values = defaultdict(set)
+ self._indexed_to_objects = defaultdict(list)
+
+ self.extend(data)
+
+ def _map_element(self, __object: T, from_map: bool = False):
+ self._contains_ids.add(__object.id)
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+
+ self._indexed_values[name].add(value)
+ self._indexed_to_objects[value].append(__object)
+
+ if not from_map:
+ for attribute, new_object in self.contain_given_in_attribute.items():
+ __object.__getattribute__(attribute).contain_collection_inside(new_object)
+
+ for attribute, new_object in self.contain_given_in_attribute.items():
+ new_object.contain_collection_inside(__object.__getattribute__(attribute))
+
+ for attribute, new_object in self.append_object_to_attribute.items():
+ __object.__getattribute__(attribute).append(new_object, from_map=True)
+
+ def _unmap_element(self, __object: T):
+ self._contains_ids.remove(__object.id)
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+ if value not in self._indexed_values[name]:
+ continue
+
+ try:
+ self._indexed_to_objects[value].remove(__object)
+ except ValueError:
+ continue
+
+ if not len(self._indexed_to_objects[value]):
+ self._indexed_values[name].remove(value)
+
+ def _contained_in_self(self, __object: T) -> bool:
+ if __object.id in self._contains_ids:
+ return True
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+ if value in self._indexed_values[name]:
+ return True
+ return False
+
+ def _get_root_collections(self) -> List["Collection"]:
+ if not len(self.upper_collections):
+ return [self]
+
+ root_collections = []
+ for upper_collection in self.upper_collections:
+ root_collections.extend(upper_collection._get_root_collections())
+ return root_collections
+
+ @property
+ def _is_root(self) -> bool:
+ return len(self.upper_collections) <= 0
+
+ def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
+ results = []
+
+ if self._contained_in_self(__object):
+ return [self]
+
+ for collection in self.contained_collections:
+ results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
+ if break_at_first:
+ return results
+
+ return results
+
+ def _get_parents_of_multiple_contained_children(self, __object: T):
+ results = []
+ if len(self.contained_collections) < 2 or self._contained_in_self(__object):
+ return results
+
+ count = 0
+
+ for collection in self.contained_collections:
+ sub_results = collection._get_parents_of_multiple_contained_children(__object)
+
+ if len(sub_results) > 0:
+ count += 1
+ results.extend(sub_results)
+
+ if count >= 2:
+ results.append(self)
+
+ return results
+
+ def _merge_in_self(self, __object: T, from_map: bool = False):
+ """
+ 1. find existing objects
+ 2. merge into existing object
+ 3. remap existing object
+ """
+ if __object.id in self._contains_ids:
+ return
+
+ existing_object: DatabaseObject = None
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+ if value in self._indexed_values[name]:
+ existing_object = self._indexed_to_objects[value][0]
+ if existing_object.id == __object.id:
+ return None
+
+ break
+
+ if existing_object is None:
+ return None
+
+ existing_object.merge(__object, replace_all_refs=True)
+
+ # just a check if it really worked
+ if existing_object.id != __object.id:
+ raise ValueError("This should NEVER happen. Merging doesn't work.")
+
+ self._map_element(existing_object, from_map=from_map)
+
+ def contains(self, __object: T) -> bool:
+ return len(self._contained_in_sub(__object)) > 0
+
+ def _append(self, __object: T, from_map: bool = False):
+ for attribute, to_sync_with in self.sync_on_append.items():
+ pass
+ to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
+
+ self._map_element(__object, from_map=from_map)
+ self._data.append(__object)
+
+ def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
+ if __object is None:
+ return
+ if __object.id in self._contains_ids:
+ return
+
+ exists_in_collection = self._contained_in_sub(__object)
+ if len(exists_in_collection) and self is exists_in_collection[0]:
+ # assuming that the object already is contained in the correct collections
+ if not already_is_parent:
+ self._merge_in_self(__object, from_map=from_map)
+ return
+
+ if not len(exists_in_collection):
+ self._append(__object, from_map=from_map)
+ else:
+ pass
+ exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
+
+ if not already_is_parent or not self._is_root:
+ for parent_collection in self._get_parents_of_multiple_contained_children(__object):
+ pass
+ parent_collection.append(__object, already_is_parent=True, from_map=from_map)
+
+ def extend(self, __iterable: Optional[Iterable[T]]):
+ if __iterable is None:
+ return
+
+ for __object in __iterable:
+ self.append(__object)
+
+ def sync_with_other_collection(self, equal_collection: "Collection"):
+ """
+ If two collections always need to have the same values, this can be used.
+
+ Internally:
+ 1. import the data from other to self
+ - _data
+ - contained_collections
+ 2. replace all refs from the other object, with refs from this object
+ """
+ if equal_collection is self:
+ return
+
+ # don't add the elements from the subelements from the other collection.
+ # this will be done in the next step.
+ self.extend(equal_collection._data)
+ # add all submodules
+ for equal_sub_collection in equal_collection.contained_collections:
+ self.contain_collection_inside(equal_sub_collection)
+
+ # now the ugly part
+ # replace all refs of the other element with this one
+ self._risky_merge(equal_collection)
+
+ def contain_collection_inside(self, sub_collection: "Collection"):
+ """
+ This collection will ALWAYS contain everything from the passed in collection
+ """
+ if sub_collection in self.contained_collections:
+ return
+
+ self.contained_collections.append(sub_collection)
+ sub_collection.upper_collections.append(self)
+
+ @property
+ def data(self) -> List[T]:
+ return [*self._data,
+ *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
+
+ def __len__(self) -> int:
+ return len(self._data) + sum(len(collection) for collection in self.contained_collections)
+
+ def __iter__(self) -> Iterator[T]:
+ for element in self._data:
+ yield element
diff --git a/src/music_kraken/objects/old_collection.py b/src/music_kraken/objects/old_collection.py
index 4f50ff1..4aa8f21 100644
--- a/src/music_kraken/objects/old_collection.py
+++ b/src/music_kraken/objects/old_collection.py
@@ -1,221 +1,256 @@
-from typing import List, Iterable, Dict, TypeVar, Generic, Iterator
+from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
-from dataclasses import dataclass
from .parents import DatabaseObject
-from ..utils.hooks import HookEventTypes, Hooks, Event
-
-
-class CollectionHooks(HookEventTypes):
- APPEND_NEW = "append_new"
-
+from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
-@dataclass
-class AppendResult:
- was_in_collection: bool
- current_element: DatabaseObject
- was_the_same: bool
-
-
class Collection(Generic[T]):
- """
- This a class for the iterables
- like tracklist or discography
- """
_data: List[T]
- _by_url: dict
- _by_attribute: dict
+ _indexed_values: Dict[str, set]
+ _indexed_to_objects: Dict[any, list]
- def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None:
- # Attribute needs to point to
- self.element_type = element_type
+ shallow_list = property(fget=lambda self: self.data)
- self._data: List[T] = list()
+ def __init__(
+ self, data: Optional[Iterable[T]] = None,
+ sync_on_append: Dict[str, "Collection"] = None,
+ contain_given_in_attribute: Dict[str, "Collection"] = None,
+ contain_attribute_in_given: Dict[str, "Collection"] = None,
+ append_object_to_attribute: Dict[str, DatabaseObject] = None
+ ) -> None:
+ self._contains_ids = set()
+ self._data = []
+ self.upper_collections: List[Collection[T]] = []
+ self.contained_collections: List[Collection[T]] = []
- """
- example of attribute_to_object_map
- the song objects are references pointing to objects
- in _data
-
- ```python
- {
- 'id': {323: song_1, 563: song_2, 666: song_3},
- 'url': {'www.song_2.com': song_2}
- }
- ```
- """
- self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict)
- self._used_ids: set = set()
+ # List of collection attributes that should be modified on append
+ # Key: collection attribute (str) of appended element
+ # Value: main collection to sync to
+ self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
+ self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
+ self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
+ self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
- self.hooks: Hooks = Hooks(self)
+ self.contain_self_on_append: List[str] = []
- if data is not None:
- self.extend(data, merge_on_conflict=True)
+ self._indexed_values = defaultdict(set)
+ self._indexed_to_objects = defaultdict(list)
- def sort(self, reverse: bool = False, **kwargs):
- self._data.sort(reverse=reverse, **kwargs)
+ self.extend(data)
- def map_element(self, element: T):
- for name, value in element.indexing_values:
+ def _map_element(self, __object: T, from_map: bool = False):
+ self._contains_ids.add(__object.id)
+
+ for name, value in __object.indexing_values:
if value is None:
continue
- self._attribute_to_object_map[name][value] = element
+ self._indexed_values[name].add(value)
+ self._indexed_to_objects[value].append(__object)
- self._used_ids.add(element.id)
+ if not from_map:
+ for attribute, new_object in self.contain_given_in_attribute.items():
+ __object.__getattribute__(attribute).contain_collection_inside(new_object)
- def unmap_element(self, element: T):
- for name, value in element.indexing_values:
+ for attribute, new_object in self.contain_given_in_attribute.items():
+ new_object.contain_collection_inside(__object.__getattribute__(attribute))
+
+ for attribute, new_object in self.append_object_to_attribute.items():
+ __object.__getattribute__(attribute).append(new_object, from_map=True)
+
+ def _unmap_element(self, __object: T):
+ self._contains_ids.remove(__object.id)
+
+ for name, value in __object.indexing_values:
if value is None:
continue
+ if value not in self._indexed_values[name]:
+ continue
- if value in self._attribute_to_object_map[name]:
- if element is self._attribute_to_object_map[name][value]:
- try:
- self._attribute_to_object_map[name].pop(value)
- except KeyError:
- pass
+ try:
+ self._indexed_to_objects[value].remove(__object)
+ except ValueError:
+ continue
- def append(self, element: T, merge_on_conflict: bool = True,
- merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult:
+ if not len(self._indexed_to_objects[value]):
+ self._indexed_values[name].remove(value)
+
+ def _contained_in_self(self, __object: T) -> bool:
+ if __object.id in self._contains_ids:
+ return True
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+ if value in self._indexed_values[name]:
+ return True
+ return False
+
+ def _get_root_collections(self) -> List["Collection"]:
+ if not len(self.upper_collections):
+ return [self]
+
+ root_collections = []
+ for upper_collection in self.upper_collections:
+ root_collections.extend(upper_collection._get_root_collections())
+ return root_collections
+
+ @property
+ def _is_root(self) -> bool:
+ return len(self.upper_collections) <= 0
+
+ def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
+ results = []
+
+ if self._contained_in_self(__object):
+ return [self]
+
+ for collection in self.contained_collections:
+ results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
+ if break_at_first:
+ return results
+
+ return results
+
+ def _get_parents_of_multiple_contained_children(self, __object: T):
+ results = []
+ if len(self.contained_collections) < 2 or self._contained_in_self(__object):
+ return results
+
+ count = 0
+
+ for collection in self.contained_collections:
+ sub_results = collection._get_parents_of_multiple_contained_children(__object)
+
+ if len(sub_results) > 0:
+ count += 1
+ results.extend(sub_results)
+
+ if count >= 2:
+ results.append(self)
+
+ return results
+
+ def _merge_in_self(self, __object: T, from_map: bool = False):
"""
- :param element:
- :param merge_on_conflict:
- :param merge_into_existing:
- :return did_not_exist:
+ 1. find existing objects
+ 2. merge into existing object
+ 3. remap existing object
"""
- if element is None:
- return AppendResult(False, None, False)
-
- for existing_element in self._data:
- if element is existing_element:
- return AppendResult(False, None, False)
-
- # if the element type has been defined in the initializer it checks if the type matches
- if self.element_type is not None and not isinstance(element, self.element_type):
- raise TypeError(f"{type(element)} is not the set type {self.element_type}")
-
- # return if the same instance of the object is in the list
- for existing in self._data:
- if element is existing:
- return AppendResult(True, element, True)
-
- for name, value in element.indexing_values:
- if value in self._attribute_to_object_map[name]:
- existing_object = self._attribute_to_object_map[name][value]
-
- if not merge_on_conflict:
- return AppendResult(True, existing_object, False)
-
- # if the object does already exist
- # thus merging and don't add it afterward
- if merge_into_existing:
- existing_object.merge(element)
- # in case any relevant data has been added (e.g. it remaps the old object)
- self.map_element(existing_object)
- return AppendResult(True, existing_object, False)
-
- element.merge(existing_object)
-
- exists_at = self._data.index(existing_object)
- self._data[exists_at] = element
-
- self.unmap_element(existing_object)
- self.map_element(element)
- return AppendResult(True, existing_object, False)
-
- if not no_hook:
- self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element)
- self._data.append(element)
- self.map_element(element)
-
- return AppendResult(False, element, False)
-
- def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True,
- merge_into_existing: bool = True, no_hook: bool = False):
- if element_list is None:
+ if __object.id in self._contains_ids:
return
- if len(element_list) <= 0:
+
+ existing_object: DatabaseObject = None
+
+ for name, value in __object.indexing_values:
+ if value is None:
+ continue
+ if value in self._indexed_values[name]:
+ existing_object = self._indexed_to_objects[value][0]
+ if existing_object.id == __object.id:
+ return None
+
+ break
+
+ if existing_object is None:
+ return None
+
+ existing_object.merge(__object, replace_all_refs=True)
+
+ # just a check if it really worked
+ if existing_object.id != __object.id:
+ raise ValueError("This should NEVER happen. Merging doesn't work.")
+
+ self._map_element(existing_object, from_map=from_map)
+
+ def contains(self, __object: T) -> bool:
+ return len(self._contained_in_sub(__object)) > 0
+
+ def _append(self, __object: T, from_map: bool = False):
+ for attribute, to_sync_with in self.sync_on_append.items():
+ pass
+ to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
+
+ self._map_element(__object, from_map=from_map)
+ self._data.append(__object)
+
+ def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
+ if __object is None:
return
- if element_list is self:
+ if __object.id in self._contains_ids:
return
- for element in element_list:
- self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook)
- def sync_collection(self, collection_attribute: str):
- def on_append(event: Event, new_object: T, *args, **kwargs):
- new_collection = new_object.__getattribute__(collection_attribute)
- if self is new_collection:
- return
+ exists_in_collection = self._contained_in_sub(__object)
+ if len(exists_in_collection) and self is exists_in_collection[0]:
+ # assuming that the object already is contained in the correct collections
+ if not already_is_parent:
+ self._merge_in_self(__object, from_map=from_map)
+ return
- self.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
- new_object.__setattr__(collection_attribute, self)
+ if not len(exists_in_collection):
+ self._append(__object, from_map=from_map)
+ else:
+ pass
+ exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
- self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
+ if not already_is_parent or not self._is_root:
+ for parent_collection in self._get_parents_of_multiple_contained_children(__object):
+ pass
+ parent_collection.append(__object, already_is_parent=True, from_map=from_map)
- def sync_main_collection(self, main_collection: "Collection", collection_attribute: str):
- def on_append(event: Event, new_object: T, *args, **kwargs):
- new_collection = new_object.__getattribute__(collection_attribute)
- if main_collection is new_collection:
- return
-
- main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
- new_object.__setattr__(collection_attribute, main_collection)
+ def extend(self, __iterable: Optional[Iterable[T]]):
+ if __iterable is None:
+ return
- self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
+ for __object in __iterable:
+ self.append(__object)
- """
- def on_append(event: Event, new_object: T, *args, **kwargs):
- new_collection: Collection = new_object.__getattribute__(collection_attribute)
- if self is new_collection:
- return
-
- self.extend(new_collection.shallow_list, no_hook=False)
- new_object.__setattr__(collection_attribute, self)
+ def sync_with_other_collection(self, equal_collection: "Collection"):
+ """
+ If two collections always need to have the same values, this can be used.
- self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
- """
+ Internally:
+ 1. import the data from other to self
+ - _data
+ - contained_collections
+ 2. replace all refs from the other object, with refs from this object
+ """
+ if equal_collection is self:
+ return
+
+ # don't add the elements from the subelements from the other collection.
+ # this will be done in the next step.
+ self.extend(equal_collection._data)
+ # add all submodules
+ for equal_sub_collection in equal_collection.contained_collections:
+ self.contain_collection_inside(equal_sub_collection)
+
+ # now the ugly part
+ # replace all refs of the other element with this one
+ self._risky_merge(equal_collection)
+
+ def contain_collection_inside(self, sub_collection: "Collection"):
+ """
+ This collection will ALWAYS contain everything from the passed in collection
+ """
+ if sub_collection in self.contained_collections:
+ return
+
+ self.contained_collections.append(sub_collection)
+ sub_collection.upper_collections.append(self)
+
+ @property
+ def data(self) -> List[T]:
+ return [*self._data,
+ *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
+
+ def __len__(self) -> int:
+ return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
- yield element
-
- def __str__(self) -> str:
- return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
-
- def __len__(self) -> int:
- return len(self._data)
-
- def __getitem__(self, key) -> T:
- if type(key) != int:
- return ValueError("key needs to be an integer")
-
- return self._data[key]
-
- def __setitem__(self, key, value: T):
- if type(key) != int:
- return ValueError("key needs to be an integer")
-
- old_item = self._data[key]
- self.unmap_element(old_item)
- self.map_element(value)
-
- self._data[key] = value
-
- @property
- def shallow_list(self) -> List[T]:
- """
- returns a shallow copy of the data list
- """
- return self._data.copy()
-
- @property
- def empty(self) -> bool:
- return len(self._data) == 0
-
- def clear(self):
- self.__init__(element_type=self.element_type)
+ yield element
\ No newline at end of file
diff --git a/src/music_kraken/objects/parents.py b/src/music_kraken/objects/parents.py
index 9431db2..13a90f6 100644
--- a/src/music_kraken/objects/parents.py
+++ b/src/music_kraken/objects/parents.py
@@ -9,12 +9,13 @@ from .option import Options
from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings
from ..utils.support_classes.hacking import MetaClass
-
+from ..utils.exception.objects import IsDynamicException
LOGGER = logging_settings["object_logger"]
P = TypeVar('P')
+
@dataclass
class StaticAttribute(Generic[P]):
name: str
@@ -27,6 +28,139 @@ class StaticAttribute(Generic[P]):
is_upwards_collection: bool = False
+class InnerData:
+ """
+ This is the core class, which is used for every Data class.
+ The attributes are set, and can be merged.
+
+ The concept is, that the outer class proxies this class.
+ If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
+ """
+
+ def __init__(self, **kwargs):
+ for key, value in kwargs.items():
+ self.__setattr__(key, value)
+
+ def __merge__(self, __other: InnerData, override: bool = False):
+ """
+ TODO
+ is default is totally ignored
+
+ :param __other:
+ :param override:
+ :return:
+ """
+
+ for key, value in __other.__dict__.items():
+ # just set the other value if self doesn't already have it
+ if key not in self.__dict__:
+ self.__setattr__(key, value)
+ continue
+
+ # if the object of value implemented __merge__, it merges
+ existing = self.__getattribute__(key)
+ if hasattr(type(existing), "__merge__"):
+ existing.merge_into_self(value, override)
+ continue
+
+ # override the existing value if requested
+ if override:
+ self.__setattr__(key, value)
+
+
+
+class OuterProxy:
+ """
+ Wraps the inner data, and provides apis, to naturally access those values.
+ """
+
+ _default_factories: dict
+
+ def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
+ _automatic_id: bool = False
+
+ if _id is None and not dynamic:
+ """
+ generates a random integer id
+ the range is defined in the config
+ """
+ _id = random.randint(0, HIGHEST_ID)
+ _automatic_id = True
+
+ kwargs["automatic_id"] = _automatic_id
+ kwargs["id"] = _id
+ kwargs["dynamic"] = dynamic
+
+ for name, factory in type(self)._default_factories.items():
+ if name not in kwargs:
+ kwargs[name] = factory()
+
+ self._inner: InnerData = InnerData(**kwargs)
+ self.__init_collections__()
+
+ for name, data_list in kwargs.items():
+ if isinstance(data_list, list) and name.endswith("_list"):
+ collection_name = name.replace("_list", "_collection")
+
+ if collection_name not in self.__dict__:
+ continue
+
+ collection = self.__getattribute__(collection_name)
+ collection.extend(data_list)
+
+ def __init_collections__(self):
+ pass
+
+ def __getattribute__(self, __name: str) -> Any:
+ """
+ Returns the attribute of _inner if the attribute exists,
+ else it returns the attribute of self.
+
+ That the _inner gets checked first is essential for the type hints.
+ :param __name:
+ :return:
+ """
+
+ _inner: InnerData = super().__getattribute__("_inner")
+ try:
+ return _inner.__getattribute__(__name)
+ except AttributeError:
+ return super().__getattribute__(__name)
+
+ def __setattr__(self, __name, __value):
+ if not __name.startswith("_") and hasattr(self, "_inner"):
+ _inner: InnerData = super().__getattribute__("_inner")
+ return _inner.__setattr__(__name, __value)
+
+ return super().__setattr__(__name, __value)
+
+ def __hash__(self):
+ """
+ :raise: IsDynamicException
+ :return:
+ """
+
+ if self.dynamic:
+ return id(self._inner)
+
+ return self.id
+
+ def __eq__(self, other: Any):
+ return self.__hash__() == other.__hash__()
+
+ def merge(self, __other: OuterProxy, override: bool = False):
+ """
+ 1. merges the data of __other in self
+ 2. replaces the data of __other with the data of self
+
+ :param __other:
+ :param override:
+ :return:
+ """
+ self._inner.__merge__(__other._inner, override=override)
+ __other._inner = self._inner
+
+
class Attribute(Generic[P]):
def __init__(self, database_object: "DatabaseObject", static_attribute: StaticAttribute) -> None:
self.database_object: DatabaseObject = database_object
@@ -38,12 +172,11 @@ class Attribute(Generic[P]):
def get(self) -> P:
return self.database_object.__getattribute__(self.name)
-
+
def set(self, value: P):
self.database_object.__setattr__(self.name, value)
-
class DatabaseObject(metaclass=MetaClass):
COLLECTION_STRING_ATTRIBUTES: tuple = tuple()
SIMPLE_STRING_ATTRIBUTES: dict = dict()
@@ -77,7 +210,7 @@ class DatabaseObject(metaclass=MetaClass):
for static_attribute in self.STATIC_ATTRIBUTES:
attribute: Attribute = Attribute(self, static_attribute)
self._attributes.append(attribute)
-
+
if static_attribute.is_collection:
if static_attribute.is_collection:
self._collection_attributes.append(attribute)
@@ -94,6 +227,8 @@ class DatabaseObject(metaclass=MetaClass):
self.dynamic = dynamic
self.build_version = -1
+ super().__init__()
+
@property
def upwards_collection(self) -> Collection:
for attribute in self._upwards_collection_attributes:
@@ -114,10 +249,19 @@ class DatabaseObject(metaclass=MetaClass):
raise TypeError("Dynamic DatabaseObjects are unhashable.")
return self.id
+ def __deep_eq__(self, other) -> bool:
+ if not isinstance(other, type(self)):
+ return False
+
+ return super().__eq__(other)
+
def __eq__(self, other) -> bool:
if not isinstance(other, type(self)):
return False
+ if super().__eq__(other):
+ return True
+
# add the checks for dynamic, to not throw an exception
if not self.dynamic and not other.dynamic and self.id == other.id:
return True
@@ -152,10 +296,10 @@ class DatabaseObject(metaclass=MetaClass):
if other is None:
return
-
- if self.id == other.id:
+
+ if self.__deep_eq__(other):
return
-
+
if not isinstance(other, type(self)):
LOGGER.warning(f"can't merge \"{type(other)}\" into \"{type(self)}\"")
return
@@ -163,6 +307,7 @@ class DatabaseObject(metaclass=MetaClass):
for collection in self._collection_attributes:
if hasattr(self, collection.name) and hasattr(other, collection.name):
if collection.get() is not getattr(other, collection.name):
+ pass
collection.get().extend(getattr(other, collection.name))
for simple_attribute, default_value in type(self).SIMPLE_STRING_ATTRIBUTES.items():
@@ -190,7 +335,7 @@ class DatabaseObject(metaclass=MetaClass):
@property
def option_string(self) -> str:
return self.__repr__()
-
+
def _build_recursive_structures(self, build_version: int, merge: False):
pass
@@ -202,7 +347,7 @@ class DatabaseObject(metaclass=MetaClass):
no need to override if only the recursive structure should be build.
override self.build_recursive_structures() instead
"""
-
+
self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into)
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):
diff --git a/src/music_kraken/objects/song.py b/src/music_kraken/objects/song.py
index d7b1cef..0af1489 100644
--- a/src/music_kraken/objects/song.py
+++ b/src/music_kraken/objects/song.py
@@ -1,3 +1,5 @@
+from __future__ import annotations
+
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type
@@ -15,11 +17,13 @@ from .metadata import (
Metadata
)
from .option import Options
-from .parents import MainObject, DatabaseObject, StaticAttribute
+from .parents import DatabaseObject, StaticAttribute
from .source import Source, SourceCollection
from .target import Target
from ..utils.string_processing import unify
+from .parents import OuterProxy as Base
+
from ..utils.config import main_settings
"""
@@ -30,12 +34,39 @@ CountryTyping = type(list(pycountry.countries)[0])
OPTION_STRING_DELIMITER = " | "
-class Song(MainObject):
+class Song(Base):
"""
Class representing a song object, with attributes id, mb_id, title, album_name, isrc, length,
tracksort, genre, source_list, target, lyrics_list, album, main_artist_list, and feature_artist_list.
"""
+ title: str
+ unified_title: str
+ isrc: str
+ length: int
+ genre: str
+ note: FormattedText
+
+ source_collection: SourceCollection
+ target_collection: Collection[Target]
+ lyrics_collection: Collection[Lyrics]
+ main_artist_collection: Collection[Artist]
+ feature_artist_collection: Collection[Artist]
+ album_collection: Collection[Album]
+
+ _default_factories = {
+ "note": FormattedText,
+ "length": lambda: 0,
+ "source_collection": SourceCollection,
+ "target_collection": Collection,
+ "lyrics_collection": Collection,
+
+ "main_artist_collection": Collection,
+ "album_collection": Collection,
+ "feature_artist_collection": Collection
+ }
+
+ """
COLLECTION_STRING_ATTRIBUTES = (
"lyrics_collection", "album_collection", "main_artist_collection", "feature_artist_collection",
"source_collection")
@@ -48,118 +79,38 @@ class Song(MainObject):
"genre": None,
"notes": FormattedText()
}
+ """
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection")
+ """
+ title: str = None,
+ unified_title: str = None,
+ isrc: str = None,
+ length: int = None,
+ tracksort: int = None,
+ genre: str = None,
+ source_list: List[Source] = None,
+ target_list: List[Target] = None,
+ lyrics_list: List[Lyrics] = None,
+ album_list: List['Album'] = None,
+ main_artist_list: List['Artist'] = None,
+ feature_artist_list: List['Artist'] = None,
+ notes: FormattedText = None,
+ """
+ def __init_collections__(self) -> None:
+ self.album_collection.contain_given_in_attribute = {
+ "artist_collection": self.main_artist_collection,
+ }
+ self.album_collection.append_object_to_attribute = {
+ "song_collection": self,
+ }
- STATIC_ATTRIBUTES = [
- StaticAttribute(name="title", weight=.5),
- StaticAttribute(name="unified_title", weight=.3),
- StaticAttribute(name="isrc", weight=1),
- StaticAttribute(name="length"),
- StaticAttribute(name="tracksort", default_value=0),
- StaticAttribute(name="genre"),
- StaticAttribute(name="notes", default_value=FormattedText()),
-
- StaticAttribute(name="source_collection", is_collection=True),
- StaticAttribute(name="lyrics_collection", is_collection=True),
- StaticAttribute(name="album_collection", is_collection=True, is_upwards_collection=True),
- StaticAttribute(name="main_artist_collection", is_collection=True, is_upwards_collection=True),
- StaticAttribute(name="feature_artist_collection", is_collection=True, is_upwards_collection=True)
- ]
-
- def __init__(
- self,
- _id: int = None,
- dynamic: bool = False,
- title: str = None,
- unified_title: str = None,
- isrc: str = None,
- length: int = None,
- tracksort: int = None,
- genre: str = None,
- source_list: List[Source] = None,
- target_list: List[Target] = None,
- lyrics_list: List[Lyrics] = None,
- album_list: List['Album'] = None,
- main_artist_list: List['Artist'] = None,
- feature_artist_list: List['Artist'] = None,
- notes: FormattedText = None,
- **kwargs
- ) -> None:
- super().__init__(_id=_id, dynamic=dynamic, **kwargs)
- # attributes
- self.title: str = title
- self.unified_title: str = unified_title
- if unified_title is None and title is not None:
- self.unified_title = unify(title)
-
- self.isrc: str = isrc
- self.length: int = length
- self.tracksort: int = tracksort or 0
- self.genre: str = genre
- self.notes: FormattedText = notes or FormattedText()
-
- self.source_collection: SourceCollection = SourceCollection(source_list)
- self.target_collection: Collection[Target] = Collection(data=target_list)
- self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list)
-
- # main_artist_collection = album.artist collection
- self.main_artist_collection: Collection[Artist] = Collection(data=[])
-
- # this album_collection equals no collection
- self.album_collection: Collection[Album] = Collection(data=album_list,
- contain_given_in_attribute={
- "artist_collection": self.main_artist_collection
- }, append_object_to_attribute={
- "song_collection": self
- })
-
- self.main_artist_collection.contain_given_in_attribute = {"main_album_collection": self.album_collection}
- self.main_artist_collection.extend(main_artist_list)
-
- self.feature_artist_collection: Collection[Artist] = Collection(
- data=feature_artist_list,
- append_object_to_attribute={
- "feature_song_collection": self
- }
- )
-
- def _build_recursive_structures(self, build_version: int, merge: bool):
- if build_version == self.build_version:
- return
- self.build_version = build_version
-
- album: Album
- for album in self.album_collection:
- album.song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False)
- album._build_recursive_structures(build_version=build_version, merge=merge)
-
- artist: Artist
- for artist in self.feature_artist_collection:
- artist.feature_song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False)
- artist._build_recursive_structures(build_version=build_version, merge=merge)
-
- for artist in self.main_artist_collection:
- for album in self.album_collection:
- artist.main_album_collection.append(album, merge_on_conflict=merge, merge_into_existing=False)
- artist._build_recursive_structures(build_version=build_version, merge=merge)
-
- def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):
- if object_type is Song:
- return
-
- if object_type is Lyrics:
- self.lyrics_collection.extend(object_list)
- return
-
- if object_type is Artist:
- self.main_artist_collection.extend(object_list)
- return
-
- if object_type is Album:
- self.album_collection.extend(object_list)
- return
-
+ self.main_artist_collection.contain_given_in_attribute = {
+ "main_album_collection": self.album_collection
+ }
+ self.feature_artist_collection.append_object_to_attribute = {
+ "feature_song_collection": self
+ }
@property
def indexing_values(self) -> List[Tuple[str, object]]:
@@ -245,7 +196,7 @@ All objects dependent on Album
"""
-class Album(MainObject):
+class Album(Base):
COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection", "song_collection")
SIMPLE_STRING_ATTRIBUTES = {
"title": None,
@@ -259,6 +210,16 @@ class Album(MainObject):
"notes": FormattedText()
}
+ title: str
+ unified_title: str
+ album_status: str
+ album_type: AlbumType
+ language: LanguageSelector
+
+ _default_factories = {
+ "album_type": AlbumType.OTHER
+ }
+
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection", )
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
@@ -298,7 +259,7 @@ class Album(MainObject):
notes: FormattedText = None,
**kwargs
) -> None:
- MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
+ Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.title: str = title
self.unified_title: str = unified_title
@@ -512,7 +473,7 @@ All objects dependent on Artist
"""
-class Artist(MainObject):
+class Artist(Base):
COLLECTION_STRING_ATTRIBUTES = (
"feature_song_collection",
"main_album_collection",
@@ -570,7 +531,7 @@ class Artist(MainObject):
unformated_location: str = None,
**kwargs
):
- MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
+ Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.name: str = name
self.unified_name: str = unified_name
@@ -806,7 +767,7 @@ Label
"""
-class Label(MainObject):
+class Label(Base):
COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection")
SIMPLE_STRING_ATTRIBUTES = {
"name": None,
@@ -837,7 +798,7 @@ class Label(MainObject):
source_list: List[Source] = None,
**kwargs
):
- MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
+ Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs)
self.name: str = name
self.unified_name: str = unified_name
diff --git a/src/music_kraken/objects/source.py b/src/music_kraken/objects/source.py
index a4e489a..4fb1e40 100644
--- a/src/music_kraken/objects/source.py
+++ b/src/music_kraken/objects/source.py
@@ -121,7 +121,8 @@ class Source(DatabaseObject):
class SourceCollection(Collection):
- def __init__(self, source_list: List[Source]):
+ def __init__(self, source_list: List[Source] = None):
+ source_list = source_list if source_list is not None else []
self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
super().__init__(data=source_list)
diff --git a/src/music_kraken/utils/exception/objects.py b/src/music_kraken/utils/exception/objects.py
new file mode 100644
index 0000000..ff3026d
--- /dev/null
+++ b/src/music_kraken/utils/exception/objects.py
@@ -0,0 +1,10 @@
+class ObjectException(Exception):
+ pass
+
+
+class IsDynamicException(Exception):
+ """
+ Gets raised, if a dynamic data object tries to perform an action,
+ which does not make sense for a dynamic object.
+ """
+ pass
diff --git a/src/music_kraken/utils/support_classes/hacking.py b/src/music_kraken/utils/support_classes/hacking.py
index abcf67f..1125768 100644
--- a/src/music_kraken/utils/support_classes/hacking.py
+++ b/src/music_kraken/utils/support_classes/hacking.py
@@ -1,7 +1,8 @@
+import weakref
from types import FunctionType
from functools import wraps
-from typing import Dict
+from typing import Dict, Set
class Lake:
def __init__(self):
@@ -17,15 +18,34 @@ class Lake:
return self.id_to_object[_id]
except KeyError:
self.add(db_object)
- return db_object
+ return db_object
def add(self, db_object: object):
self.id_to_object[id(db_object)] = db_object
def override(self, to_override: object, new_db_object: object):
- self.redirects[id(to_override)] = id(new_db_object)
- if id(to_override) in self.id_to_object:
- del self.id_to_object[id(to_override)]
+ _id = id(to_override)
+ while _id in self.redirects:
+ _id = self.redirects[_id]
+
+ if id(new_db_object) in self.id_to_object:
+ print("!!!!!")
+
+ self.add(new_db_object)
+ self.redirects[_id] = id(new_db_object)
+ # if _id in self.id_to_object:
+ # del self.id_to_object[_id]
+
+ def is_same(self, __object: object, other: object) -> bool:
+ _self_id = id(__object)
+ while _self_id in self.redirects:
+ _self_id = self.redirects[_self_id]
+
+ _other_id = id(other)
+ while _other_id in self.redirects:
+ _other_id = self.redirects[_other_id]
+
+ return _self_id == _other_id
lake = Lake()
@@ -35,11 +55,20 @@ def wrapper(method):
@wraps(method)
def wrapped(*args, **kwargs):
return method(*(lake.get_real_object(args[0]), *args[1:]), **kwargs)
+
return wrapped
-
class BaseClass:
+ def __new__(cls, *args, **kwargs):
+ instance = cls(*args, **kwargs)
+ print("new")
+ lake.add(instance)
+ return instance
+
+ def __eq__(self, other):
+ return lake.is_same(self, other)
+
def _risky_merge(self, to_replace):
lake.override(to_replace, self)
@@ -49,17 +78,27 @@ class MetaClass(type):
bases = (*bases, BaseClass)
newClassDict = {}
+ ignore_functions: Set[str] = {"__new__", "__init__"}
+
for attributeName, attribute in classDict.items():
- if isinstance(attribute, FunctionType) and attributeName not in ("__new__", "__init__"):
+ if isinstance(attribute, FunctionType) and (attributeName not in ignore_functions):
+ """
+ The funktion new and init shouldn't be accounted for because we can assume the class is
+ independent on initialization.
+ """
attribute = wrapper(attribute)
+
newClassDict[attributeName] = attribute
- for key, value in object.__dict__.items( ):
- if hasattr( value, '__call__' ) and value not in newClassDict and key not in ("__new__", "__init__"):
+ print()
+
+ for key, value in object.__dict__.items():
+ # hasattr( value, '__call__' ) and
+ if hasattr(value, '__call__') and value not in newClassDict and key not in ("__new__", "__init__"):
newClassDict[key] = wrapper(value)
new_instance = type.__new__(meta, classname, bases, newClassDict)
lake.add(new_instance)
- return new_instance
\ No newline at end of file
+ return new_instance