From 513054a0feb81b31d04025e52d2368fd4f7ebcaf Mon Sep 17 00:00:00 2001 From: Lars Noack Date: Tue, 19 Dec 2023 13:58:39 +0100 Subject: [PATCH] feat: added new implementation --- .idea/misc.xml | 3 + .idea/music-downloader.iml | 3 +- .idea/vcs.xml | 1 - src/create_custom_objects.py | 26 +- src/music_kraken/objects/collection.py | 132 +++--- src/music_kraken/objects/country.py | 36 +- src/music_kraken/objects/new_collection.py | 257 ++++++++++++ src/music_kraken/objects/old_collection.py | 395 ++++++++++-------- src/music_kraken/objects/parents.py | 163 +++++++- src/music_kraken/objects/song.py | 195 ++++----- src/music_kraken/objects/source.py | 3 +- src/music_kraken/utils/exception/objects.py | 10 + .../utils/support_classes/hacking.py | 59 ++- 13 files changed, 895 insertions(+), 388 deletions(-) create mode 100644 src/music_kraken/utils/exception/objects.py diff --git a/.idea/misc.xml b/.idea/misc.xml index 6468d4f..df5fc58 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,7 @@ + + \ No newline at end of file diff --git a/.idea/music-downloader.iml b/.idea/music-downloader.iml index d3aa814..27eed28 100644 --- a/.idea/music-downloader.iml +++ b/.idea/music-downloader.iml @@ -3,9 +3,10 @@ + - + diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 1a15e6d..35eb1dd 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,6 +2,5 @@ - \ No newline at end of file diff --git a/src/create_custom_objects.py b/src/create_custom_objects.py index a61da6c..49d58a9 100644 --- a/src/create_custom_objects.py +++ b/src/create_custom_objects.py @@ -9,7 +9,23 @@ from music_kraken.objects import ( from music_kraken.objects.collection import Collection from music_kraken.utils.enums import SourcePages +song = Song(title="Sad Story", isrc="testTest") +other_song = Song(title="hihi", genre="dsbm") +song.merge(other_song) + +print(song.__dict__) +print(other_song.__dict__) + +other_song.title = ":3" + +print(song.__dict__) +print(other_song.__dict__) + + +print(song) + +""" only_smile = Artist( name="Only Smile", source_list=[Source(SourcePages.BANDCAMP, "https://onlysmile.bandcamp.com/")], @@ -100,10 +116,16 @@ def add_to_objects_dump(db_obj: DatabaseObject): add_to_objects_dump(only_smile) for _id, _object in objects_by_id.items(): - print(_id, _object, sep=": ") + try: + print(_id, _object.title, sep=": ") + except AttributeError: + try: + print(_id, _object.name, sep=": ") + except AttributeError: + print(_id, _object, sep=": ") print(only_smile) - +""" """ c = Collection([Song(title="hi"), Song(title="hi2"), Song(title="hi3")]) c1 = Collection([Song(title="he"), Song(title="hi5")]) diff --git a/src/music_kraken/objects/collection.py b/src/music_kraken/objects/collection.py index f105de9..de98f47 100644 --- a/src/music_kraken/objects/collection.py +++ b/src/music_kraken/objects/collection.py @@ -1,14 +1,13 @@ -from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type +from __future__ import annotations + from collections import defaultdict +from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator +from .parents import OuterProxy -from .parents import DatabaseObject -from ..utils.support_classes.hacking import MetaClass +T = TypeVar('T', bound=OuterProxy) -T = TypeVar('T', bound=DatabaseObject) - - -class Collection(Generic[T], metaclass=MetaClass): +class Collection(Generic[T]): _data: List[T] _indexed_values: Dict[str, set] @@ -17,16 +16,18 @@ class Collection(Generic[T], metaclass=MetaClass): shallow_list = property(fget=lambda self: self.data) def __init__( - self, data: Optional[Iterable[T]], - sync_on_append: Dict[str, "Collection"] = None, - contain_given_in_attribute: Dict[str, "Collection"] = None, - contain_attribute_in_given: Dict[str, "Collection"] = None, - append_object_to_attribute: Dict[str, DatabaseObject] = None + self, + data: Optional[Iterable[T]] = None, + sync_on_append: Dict[str, Collection] = None, + contain_given_in_attribute: Dict[str, Collection] = None, + contain_attribute_in_given: Dict[str, Collection] = None, + append_object_to_attribute: Dict[str, T] = None ) -> None: self._contains_ids = set() self._data = [] - self.upper_collections: List[Collection[T]] = [] - self.contained_collections: List[Collection[T]] = [] + + self.parents: List[Collection[T]] = [] + self.children: List[Collection[T]] = [] # List of collection attributes that should be modified on append # Key: collection attribute (str) of appended element @@ -34,13 +35,13 @@ class Collection(Generic[T], metaclass=MetaClass): self.sync_on_append: Dict[str, Collection] = sync_on_append or {} self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {} self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {} - self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {} + self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} self.contain_self_on_append: List[str] = [] self._indexed_values = defaultdict(set) self._indexed_to_objects = defaultdict(list) - + self.extend(data) def _map_element(self, __object: T, from_map: bool = False): @@ -56,13 +57,13 @@ class Collection(Generic[T], metaclass=MetaClass): if not from_map: for attribute, new_object in self.contain_given_in_attribute.items(): __object.__getattribute__(attribute).contain_collection_inside(new_object) - + for attribute, new_object in self.contain_given_in_attribute.items(): new_object.contain_collection_inside(__object.__getattribute__(attribute)) for attribute, new_object in self.append_object_to_attribute.items(): - __object.__getattribute__(attribute).append(new_object, from_map = True) - + __object.__getattribute__(attribute).append(new_object, from_map=True) + def _unmap_element(self, __object: T): self._contains_ids.remove(__object.id) @@ -71,7 +72,7 @@ class Collection(Generic[T], metaclass=MetaClass): continue if value not in self._indexed_values[name]: continue - + try: self._indexed_to_objects[value].remove(__object) except ValueError: @@ -81,59 +82,62 @@ class Collection(Generic[T], metaclass=MetaClass): self._indexed_values[name].remove(value) def _contained_in_self(self, __object: T) -> bool: + if __object.id in self._contains_ids: + return True + for name, value in __object.indexing_values: if value is None: continue if value in self._indexed_values[name]: return True return False - - def _get_root_collections(self) -> List["Collection"]: - if not len(self.upper_collections): + + def _get_root_collections(self) -> List[Collection]: + if not len(self.parents): return [self] - + root_collections = [] - for upper_collection in self.upper_collections: + for upper_collection in self.parents: root_collections.extend(upper_collection._get_root_collections()) return root_collections @property def _is_root(self) -> bool: - return len(self.upper_collections) <= 0 + return len(self.parents) <= 0 - def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]: + def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List[Collection]: results = [] if self._contained_in_self(__object): return [self] - - for collection in self.contained_collections: + + for collection in self.children: results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) if break_at_first: return results return results - + def _get_parents_of_multiple_contained_children(self, __object: T): results = [] - if len(self.contained_collections) < 2 or self._contained_in_self(__object): + if len(self.children) < 2 or self._contained_in_self(__object): return results - + count = 0 - for collection in self.contained_collections: + for collection in self.children: sub_results = collection._get_parents_of_multiple_contained_children(__object) - + if len(sub_results) > 0: count += 1 results.extend(sub_results) - + if count >= 2: results.append(self) return results - - def _merge_in_self(self, __object: T, from_map: bool = False): + + def merge_into_self(self, __object: T, from_map: bool = False): """ 1. find existing objects 2. merge into existing object @@ -141,30 +145,30 @@ class Collection(Generic[T], metaclass=MetaClass): """ if __object.id in self._contains_ids: return - - existing_object: DatabaseObject = None + + existing_object: T = None for name, value in __object.indexing_values: if value is None: continue if value in self._indexed_values[name]: existing_object = self._indexed_to_objects[value][0] - if existing_object == __object: + if existing_object.id == __object.id: return None - else: - break - + + break + if existing_object is None: return None - existing_object.merge(__object, replace_all_refs=True) + existing_object.merge(__object) # just a check if it really worked if existing_object.id != __object.id: raise ValueError("This should NEVER happen. Merging doesn't work.") - self._map_element(existing_object, from_map = from_map) - + self._map_element(existing_object, from_map=from_map) + def contains(self, __object: T) -> bool: return len(self._contained_in_sub(__object)) > 0 @@ -178,37 +182,39 @@ class Collection(Generic[T], metaclass=MetaClass): def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): if __object is None: return + if __object.id in self._contains_ids: return - + exists_in_collection = self._contained_in_sub(__object) if len(exists_in_collection) and self is exists_in_collection[0]: # assuming that the object already is contained in the correct collections if not already_is_parent: - self._merge_in_self(__object, from_map = from_map) + self.merge_into_self(__object, from_map=from_map) return if not len(exists_in_collection): self._append(__object, from_map=from_map) else: - exists_in_collection[0]._merge_in_self(__object, from_map = from_map) + pass + exists_in_collection[0].merge_into_self(__object, from_map=from_map) if not already_is_parent or not self._is_root: for parent_collection in self._get_parents_of_multiple_contained_children(__object): + pass parent_collection.append(__object, already_is_parent=True, from_map=from_map) def extend(self, __iterable: Optional[Iterable[T]]): if __iterable is None: return - + for __object in __iterable: self.append(__object) - - def sync_with_other_collection(self, equal_collection: "Collection"): + def sync_with_other_collection(self, equal_collection: Collection): """ If two collections always need to have the same values, this can be used. - + Internally: 1. import the data from other to self - _data @@ -222,31 +228,31 @@ class Collection(Generic[T], metaclass=MetaClass): # this will be done in the next step. self.extend(equal_collection._data) # add all submodules - for equal_sub_collection in equal_collection.contained_collections: + for equal_sub_collection in equal_collection.children: self.contain_collection_inside(equal_sub_collection) # now the ugly part # replace all refs of the other element with this one self._risky_merge(equal_collection) - def contain_collection_inside(self, sub_collection: "Collection"): """ This collection will ALWAYS contain everything from the passed in collection """ - if sub_collection in self.contained_collections: + if sub_collection in self.children: return - - self.contained_collections.append(sub_collection) - sub_collection.upper_collections.append(self) + + self.children.append(sub_collection) + sub_collection.parents.append(self) @property def data(self) -> List[T]: - return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)] - + return [*self._data, + *(__object for collection in self.children for __object in collection.shallow_list)] + def __len__(self) -> int: - return len(self._data) + sum(len(collection) for collection in self.contained_collections) + return len(self._data) + sum(len(collection) for collection in self.children) def __iter__(self) -> Iterator[T]: for element in self._data: - yield element \ No newline at end of file + yield element diff --git a/src/music_kraken/objects/country.py b/src/music_kraken/objects/country.py index aed4842..c9aeaa3 100644 --- a/src/music_kraken/objects/country.py +++ b/src/music_kraken/objects/country.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass import pycountry @@ -9,7 +11,6 @@ CountryTyping = type(list(pycountry.countries)[0]) emoji_map = {'AD': '🇦🇩', 'AE': '🇦🇪', 'AF': '🇦🇫', 'AG': '🇦🇬', 'AI': '🇦🇮', 'AL': '🇦🇱', 'AM': '🇦🇲', 'AO': '🇦🇴', 'AQ': '🇦🇶', 'AR': '🇦🇷', 'AS': '🇦🇸', 'AT': '🇦🇹', 'AU': '🇦🇺', 'AW': '🇦🇼', 'AX': '🇦🇽', 'AZ': '🇦🇿', 'BA': '🇧🇦', 'BB': '🇧🇧', 'BD': '🇧🇩', 'BE': '🇧🇪', 'BF': '🇧🇫', 'BG': '🇧🇬', 'BH': '🇧🇭', 'BI': '🇧🇮', 'BJ': '🇧🇯', 'BL': '🇧🇱', 'BM': '🇧🇲', 'BN': '🇧🇳', 'BO': '🇧🇴', 'BQ': '🇧🇶', 'BR': '🇧🇷', 'BS': '🇧🇸', 'BT': '🇧🇹', 'BV': '🇧🇻', 'BW': '🇧🇼', 'BY': '🇧🇾', 'BZ': '🇧🇿', 'CA': '🇨🇦', 'CC': '🇨🇨', 'CD': '🇨🇩', 'CF': '🇨🇫', 'CG': '🇨🇬', 'CH': '🇨🇭', 'CI': '🇨🇮', 'CK': '🇨🇰', 'CL': '🇨🇱', 'CM': '🇨🇲', 'CN': '🇨🇳', 'CO': '🇨🇴', 'CR': '🇨🇷', 'CU': '🇨🇺', 'CV': '🇨🇻', 'CW': '🇨🇼', 'CX': '🇨🇽', 'CY': '🇨🇾', 'CZ': '🇨🇿', 'DE': '🇩🇪', 'DJ': '🇩🇯', 'DK': '🇩🇰', 'DM': '🇩🇲', 'DO': '🇩🇴', 'DZ': '🇩🇿', 'EC': '🇪🇨', 'EE': '🇪🇪', 'EG': '🇪🇬', 'EH': '🇪🇭', 'ER': '🇪🇷', 'ES': '🇪🇸', 'ET': '🇪🇹', 'FI': '🇫🇮', 'FJ': '🇫🇯', 'FK': '🇫🇰', 'FM': '🇫🇲', 'FO': '🇫🇴', 'FR': '🇫🇷', 'GA': '🇬🇦', 'GB': '🇬🇧', 'GD': '🇬🇩', 'GE': '🇬🇪', 'GF': '🇬🇫', 'GG': '🇬🇬', 'GH': '🇬🇭', 'GI': '🇬🇮', 'GL': '🇬🇱', 'GM': '🇬🇲', 'GN': '🇬🇳', 'GP': '🇬🇵', 'GQ': '🇬🇶', 'GR': '🇬🇷', 'GS': '🇬🇸', 'GT': '🇬🇹', 'GU': '🇬🇺', 'GW': '🇬🇼', 'GY': '🇬🇾', 'HK': '🇭🇰', 'HM': '🇭🇲', 'HN': '🇭🇳', 'HR': '🇭🇷', 'HT': '🇭🇹', 'HU': '🇭🇺', 'ID': '🇮🇩', 'IE': '🇮🇪', 'IL': '🇮🇱', 'IM': '🇮🇲', 'IN': '🇮🇳', 'IO': '🇮🇴', 'IQ': '🇮🇶', 'IR': '🇮🇷', 'IS': '🇮🇸', 'IT': '🇮🇹', 'JE': '🇯🇪', 'JM': '🇯🇲', 'JO': '🇯🇴', 'JP': '🇯🇵', 'KE': '🇰🇪', 'KG': '🇰🇬', 'KH': '🇰🇭', 'KI': '🇰🇮', 'KM': '🇰🇲', 'KN': '🇰🇳', 'KP': '🇰🇵', 'KR': '🇰🇷', 'KW': '🇰🇼', 'KY': '🇰🇾', 'KZ': '🇰🇿', 'LA': '🇱🇦', 'LB': '🇱🇧', 'LC': '🇱🇨', 'LI': '🇱🇮', 'LK': '🇱🇰', 'LR': '🇱🇷', 'LS': '🇱🇸', 'LT': '🇱🇹', 'LU': '🇱🇺', 'LV': '🇱🇻', 'LY': '🇱🇾', 'MA': '🇲🇦', 'MC': '🇲🇨', 'MD': '🇲🇩', 'ME': '🇲🇪', 'MF': '🇲🇫', 'MG': '🇲🇬', 'MH': '🇲🇭', 'MK': '🇲🇰', 'ML': '🇲🇱', 'MM': '🇲🇲', 'MN': '🇲🇳', 'MO': '🇲🇴', 'MP': '🇲🇵', 'MQ': '🇲🇶', 'MR': '🇲🇷', 'MS': '🇲🇸', 'MT': '🇲🇹', 'MU': '🇲🇺', 'MV': '🇲🇻', 'MW': '🇲🇼', 'MX': '🇲🇽', 'MY': '🇲🇾', 'MZ': '🇲🇿', 'NA': '🇳🇦', 'NC': '🇳🇨', 'NE': '🇳🇪', 'NF': '🇳🇫', 'NG': '🇳🇬', 'NI': '🇳🇮', 'NL': '🇳🇱', 'NO': '🇳🇴', 'NP': '🇳🇵', 'NR': '🇳🇷', 'NU': '🇳🇺', 'NZ': '🇳🇿', 'OM': '🇴🇲', 'PA': '🇵🇦', 'PE': '🇵🇪', 'PF': '🇵🇫', 'PG': '🇵🇬', 'PH': '🇵🇭', 'PK': '🇵🇰', 'PL': '🇵🇱', 'PM': '🇵🇲', 'PN': '🇵🇳', 'PR': '🇵🇷', 'PS': '🇵🇸', 'PT': '🇵🇹', 'PW': '🇵🇼', 'PY': '🇵🇾', 'QA': '🇶🇦', 'RE': '🇷🇪', 'RO': '🇷🇴', 'RS': '🇷🇸', 'RU': '🇷🇺', 'RW': '🇷🇼', 'SA': '🇸🇦', 'SB': '🇸🇧', 'SC': '🇸🇨', 'SD': '🇸🇩', 'SE': '🇸🇪', 'SG': '🇸🇬', 'SH': '🇸🇭', 'SI': '🇸🇮', 'SJ': '🇸🇯', 'SK': '🇸🇰', 'SL': '🇸🇱', 'SM': '🇸🇲', 'SN': '🇸🇳', 'SO': '🇸🇴', 'SR': '🇸🇷', 'SS': '🇸🇸', 'ST': '🇸🇹', 'SV': '🇸🇻', 'SX': '🇸🇽', 'SY': '🇸🇾', 'SZ': '🇸🇿', 'TC': '🇹🇨', 'TD': '🇹🇩', 'TF': '🇹🇫', 'TG': '🇹🇬', 'TH': '🇹🇭', 'TJ': '🇹🇯', 'TK': '🇹🇰', 'TL': '🇹🇱', 'TM': '🇹🇲', 'TN': '🇹🇳', 'TO': '🇹🇴', 'TR': '🇹🇷', 'TT': '🇹🇹', 'TV': '🇹🇻', 'TW': '🇹🇼', 'TZ': '🇹🇿', 'UA': '🇺🇦', 'UG': '🇺🇬', 'UM': '🇺🇲', 'US': '🇺🇸', 'UY': '🇺🇾', 'UZ': '🇺🇿', 'VA': '🇻🇦', 'VC': '🇻🇨', 'VE': '🇻🇪', 'VG': '🇻🇬', 'VI': '🇻🇮', 'VN': '🇻🇳', 'VU': '🇻🇺', 'WF': '🇼🇫', 'WS': '🇼🇸', 'YE': '🇾🇪', 'YT': '🇾🇹', 'ZA': '🇿🇦', 'ZM': '🇿🇲', 'ZW': '🇿🇼', '🇦🇩': 'AD', '🇦🇪': 'AE', '🇦🇫': 'AF', '🇦🇬': 'AG', '🇦🇮': 'AI', '🇦🇱': 'AL', '🇦🇲': 'AM', '🇦🇴': 'AO', '🇦🇶': 'AQ', '🇦🇷': 'AR', '🇦🇸': 'AS', '🇦🇹': 'AT', '🇦🇺': 'AU', '🇦🇼': 'AW', '🇦🇽': 'AX', '🇦🇿': 'AZ', '🇧🇦': 'BA', '🇧🇧': 'BB', '🇧🇩': 'BD', '🇧🇪': 'BE', '🇧🇫': 'BF', '🇧🇬': 'BG', '🇧🇭': 'BH', '🇧🇮': 'BI', '🇧🇯': 'BJ', '🇧🇱': 'BL', '🇧🇲': 'BM', '🇧🇳': 'BN', '🇧🇴': 'BO', '🇧🇶': 'BQ', '🇧🇷': 'BR', '🇧🇸': 'BS', '🇧🇹': 'BT', '🇧🇻': 'BV', '🇧🇼': 'BW', '🇧🇾': 'BY', '🇧🇿': 'BZ', '🇨🇦': 'CA', '🇨🇨': 'CC', '🇨🇩': 'CD', '🇨🇫': 'CF', '🇨🇬': 'CG', '🇨🇭': 'CH', '🇨🇮': 'CI', '🇨🇰': 'CK', '🇨🇱': 'CL', '🇨🇲': 'CM', '🇨🇳': 'CN', '🇨🇴': 'CO', '🇨🇷': 'CR', '🇨🇺': 'CU', '🇨🇻': 'CV', '🇨🇼': 'CW', '🇨🇽': 'CX', '🇨🇾': 'CY', '🇨🇿': 'CZ', '🇩🇪': 'DE', '🇩🇯': 'DJ', '🇩🇰': 'DK', '🇩🇲': 'DM', '🇩🇴': 'DO', '🇩🇿': 'DZ', '🇪🇨': 'EC', '🇪🇪': 'EE', '🇪🇬': 'EG', '🇪🇭': 'EH', '🇪🇷': 'ER', '🇪🇸': 'ES', '🇪🇹': 'ET', '🇫🇮': 'FI', '🇫🇯': 'FJ', '🇫🇰': 'FK', '🇫🇲': 'FM', '🇫🇴': 'FO', '🇫🇷': 'FR', '🇬🇦': 'GA', '🇬🇧': 'GB', '🇬🇩': 'GD', '🇬🇪': 'GE', '🇬🇫': 'GF', '🇬🇬': 'GG', '🇬🇭': 'GH', '🇬🇮': 'GI', '🇬🇱': 'GL', '🇬🇲': 'GM', '🇬🇳': 'GN', '🇬🇵': 'GP', '🇬🇶': 'GQ', '🇬🇷': 'GR', '🇬🇸': 'GS', '🇬🇹': 'GT', '🇬🇺': 'GU', '🇬🇼': 'GW', '🇬🇾': 'GY', '🇭🇰': 'HK', '🇭🇲': 'HM', '🇭🇳': 'HN', '🇭🇷': 'HR', '🇭🇹': 'HT', '🇭🇺': 'HU', '🇮🇩': 'ID', '🇮🇪': 'IE', '🇮🇱': 'IL', '🇮🇲': 'IM', '🇮🇳': 'IN', '🇮🇴': 'IO', '🇮🇶': 'IQ', '🇮🇷': 'IR', '🇮🇸': 'IS', '🇮🇹': 'IT', '🇯🇪': 'JE', '🇯🇲': 'JM', '🇯🇴': 'JO', '🇯🇵': 'JP', '🇰🇪': 'KE', '🇰🇬': 'KG', '🇰🇭': 'KH', '🇰🇮': 'KI', '🇰🇲': 'KM', '🇰🇳': 'KN', '🇰🇵': 'KP', '🇰🇷': 'KR', '🇰🇼': 'KW', '🇰🇾': 'KY', '🇰🇿': 'KZ', '🇱🇦': 'LA', '🇱🇧': 'LB', '🇱🇨': 'LC', '🇱🇮': 'LI', '🇱🇰': 'LK', '🇱🇷': 'LR', '🇱🇸': 'LS', '🇱🇹': 'LT', '🇱🇺': 'LU', '🇱🇻': 'LV', '🇱🇾': 'LY', '🇲🇦': 'MA', '🇲🇨': 'MC', '🇲🇩': 'MD', '🇲🇪': 'ME', '🇲🇫': 'MF', '🇲🇬': 'MG', '🇲🇭': 'MH', '🇲🇰': 'MK', '🇲🇱': 'ML', '🇲🇲': 'MM', '🇲🇳': 'MN', '🇲🇴': 'MO', '🇲🇵': 'MP', '🇲🇶': 'MQ', '🇲🇷': 'MR', '🇲🇸': 'MS', '🇲🇹': 'MT', '🇲🇺': 'MU', '🇲🇻': 'MV', '🇲🇼': 'MW', '🇲🇽': 'MX', '🇲🇾': 'MY', '🇲🇿': 'MZ', '🇳🇦': 'NA', '🇳🇨': 'NC', '🇳🇪': 'NE', '🇳🇫': 'NF', '🇳🇬': 'NG', '🇳🇮': 'NI', '🇳🇱': 'NL', '🇳🇴': 'NO', '🇳🇵': 'NP', '🇳🇷': 'NR', '🇳🇺': 'NU', '🇳🇿': 'NZ', '🇴🇲': 'OM', '🇵🇦': 'PA', '🇵🇪': 'PE', '🇵🇫': 'PF', '🇵🇬': 'PG', '🇵🇭': 'PH', '🇵🇰': 'PK', '🇵🇱': 'PL', '🇵🇲': 'PM', '🇵🇳': 'PN', '🇵🇷': 'PR', '🇵🇸': 'PS', '🇵🇹': 'PT', '🇵🇼': 'PW', '🇵🇾': 'PY', '🇶🇦': 'QA', '🇷🇪': 'RE', '🇷🇴': 'RO', '🇷🇸': 'RS', '🇷🇺': 'RU', '🇷🇼': 'RW', '🇸🇦': 'SA', '🇸🇧': 'SB', '🇸🇨': 'SC', '🇸🇩': 'SD', '🇸🇪': 'SE', '🇸🇬': 'SG', '🇸🇭': 'SH', '🇸🇮': 'SI', '🇸🇯': 'SJ', '🇸🇰': 'SK', '🇸🇱': 'SL', '🇸🇲': 'SM', '🇸🇳': 'SN', '🇸🇴': 'SO', '🇸🇷': 'SR', '🇸🇸': 'SS', '🇸🇹': 'ST', '🇸🇻': 'SV', '🇸🇽': 'SX', '🇸🇾': 'SY', '🇸🇿': 'SZ', '🇹🇨': 'TC', '🇹🇩': 'TD', '🇹🇫': 'TF', '🇹🇬': 'TG', '🇹🇭': 'TH', '🇹🇯': 'TJ', '🇹🇰': 'TK', '🇹🇱': 'TL', '🇹🇲': 'TM', '🇹🇳': 'TN', '🇹🇴': 'TO', '🇹🇷': 'TR', '🇹🇹': 'TT', '🇹🇻': 'TV', '🇹🇼': 'TW', '🇹🇿': 'TZ', '🇺🇦': 'UA', '🇺🇬': 'UG', '🇺🇲': 'UM', '🇺🇸': 'US', '🇺🇾': 'UY', '🇺🇿': 'UZ', '🇻🇦': 'VA', '🇻🇨': 'VC', '🇻🇪': 'VE', '🇻🇬': 'VG', '🇻🇮': 'VI', '🇻🇳': 'VN', '🇻🇺': 'VU', '🇼🇫': 'WF', '🇼🇸': 'WS', '🇾🇪': 'YE', '🇾🇹': 'YT', '🇿🇦': 'ZA', '🇿🇲': 'ZM', '🇿🇼': 'ZW'} - @dataclass class Country: alpha_2: str @@ -19,7 +20,7 @@ class Country: emoji: str @classmethod - def by_pycountry(cls, country: CountryTyping, emoji: str = "") -> "Country": + def by_pycountry(cls, country: CountryTyping, emoji: str = "") -> "Country": emoji = "" alpha_2 = country.alpha_2.upper() @@ -40,7 +41,7 @@ class Country: return cls.by_pycountry(pycountry.countries.get(alpha_2=alpha_2)) @classmethod - def by_apha_3(cls, alpha_3: str) -> "Country": + def by_alpha_3(cls, alpha_3: str) -> "Country": return cls.by_pycountry(pycountry.countries.get(alpha_3=alpha_3)) @classmethod @@ -63,4 +64,31 @@ class Country: return hash(self.alpha_3) def __eq__(self, __value: object) -> bool: - return self.__hash__() == __value.__hash__() \ No newline at end of file + return self.__hash__() == __value.__hash__() + + +@dataclass +class Language: + alpha_2: str + alpha_3: str + name: str + numeric: int + + @classmethod + def by_pycountry(cls, language) -> Language: + alpha_2 = language.alpha_2.upper() + + return cls( + alpha_2=alpha_2, + alpha_3=language.alpha_3, + name=language.name, + numeric=language.numeric, + ) + + @classmethod + def by_alpha_2(cls, alpha_2: str) -> Language: + return cls.by_pycountry(pycountry.languages.get(alpha_2=alpha_2)) + + @classmethod + def by_alpha_3(cls, alpha_3: str) -> Language: + return cls.by_pycountry(pycountry.languages.get(alpha_3=alpha_3)) diff --git a/src/music_kraken/objects/new_collection.py b/src/music_kraken/objects/new_collection.py index e69de29..1a556b6 100644 --- a/src/music_kraken/objects/new_collection.py +++ b/src/music_kraken/objects/new_collection.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +from collections import defaultdict +from typing import TypeVar, Generic, Dict, Optional, Iterable, List +from .parents import OuterProxy + +T = TypeVar('T', bound=OuterProxy) + + +class Collection(Generic[T]): + _data: List[T] + + _indexed_values: Dict[str, set] + _indexed_to_objects: Dict[any, list] + + shallow_list = property(fget=lambda self: self.data) + + def __init__( + self, + data: Optional[Iterable[T]] = None, + sync_on_append: Dict[str, "Collection"] = None, + contain_given_in_attribute: Dict[str, "Collection"] = None, + contain_attribute_in_given: Dict[str, "Collection"] = None, + append_object_to_attribute: Dict[str, T] = None + ) -> None: + self._contains_ids = set() + self._data = [] + self.upper_collections: List[Collection[T]] = [] + self.contained_collections: List[Collection[T]] = [] + + # List of collection attributes that should be modified on append + # Key: collection attribute (str) of appended element + # Value: main collection to sync to + self.sync_on_append: Dict[str, Collection] = sync_on_append or {} + self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {} + self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {} + self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} + + self.contain_self_on_append: List[str] = [] + + self._indexed_values = defaultdict(set) + self._indexed_to_objects = defaultdict(list) + + self.extend(data) + + def _map_element(self, __object: T, from_map: bool = False): + self._contains_ids.add(__object.id) + + for name, value in __object.indexing_values: + if value is None: + continue + + self._indexed_values[name].add(value) + self._indexed_to_objects[value].append(__object) + + if not from_map: + for attribute, new_object in self.contain_given_in_attribute.items(): + __object.__getattribute__(attribute).contain_collection_inside(new_object) + + for attribute, new_object in self.contain_given_in_attribute.items(): + new_object.contain_collection_inside(__object.__getattribute__(attribute)) + + for attribute, new_object in self.append_object_to_attribute.items(): + __object.__getattribute__(attribute).append(new_object, from_map=True) + + def _unmap_element(self, __object: T): + self._contains_ids.remove(__object.id) + + for name, value in __object.indexing_values: + if value is None: + continue + if value not in self._indexed_values[name]: + continue + + try: + self._indexed_to_objects[value].remove(__object) + except ValueError: + continue + + if not len(self._indexed_to_objects[value]): + self._indexed_values[name].remove(value) + + def _contained_in_self(self, __object: T) -> bool: + if __object.id in self._contains_ids: + return True + + for name, value in __object.indexing_values: + if value is None: + continue + if value in self._indexed_values[name]: + return True + return False + + def _get_root_collections(self) -> List["Collection"]: + if not len(self.upper_collections): + return [self] + + root_collections = [] + for upper_collection in self.upper_collections: + root_collections.extend(upper_collection._get_root_collections()) + return root_collections + + @property + def _is_root(self) -> bool: + return len(self.upper_collections) <= 0 + + def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]: + results = [] + + if self._contained_in_self(__object): + return [self] + + for collection in self.contained_collections: + results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) + if break_at_first: + return results + + return results + + def _get_parents_of_multiple_contained_children(self, __object: T): + results = [] + if len(self.contained_collections) < 2 or self._contained_in_self(__object): + return results + + count = 0 + + for collection in self.contained_collections: + sub_results = collection._get_parents_of_multiple_contained_children(__object) + + if len(sub_results) > 0: + count += 1 + results.extend(sub_results) + + if count >= 2: + results.append(self) + + return results + + def _merge_in_self(self, __object: T, from_map: bool = False): + """ + 1. find existing objects + 2. merge into existing object + 3. remap existing object + """ + if __object.id in self._contains_ids: + return + + existing_object: DatabaseObject = None + + for name, value in __object.indexing_values: + if value is None: + continue + if value in self._indexed_values[name]: + existing_object = self._indexed_to_objects[value][0] + if existing_object.id == __object.id: + return None + + break + + if existing_object is None: + return None + + existing_object.merge(__object, replace_all_refs=True) + + # just a check if it really worked + if existing_object.id != __object.id: + raise ValueError("This should NEVER happen. Merging doesn't work.") + + self._map_element(existing_object, from_map=from_map) + + def contains(self, __object: T) -> bool: + return len(self._contained_in_sub(__object)) > 0 + + def _append(self, __object: T, from_map: bool = False): + for attribute, to_sync_with in self.sync_on_append.items(): + pass + to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute)) + + self._map_element(__object, from_map=from_map) + self._data.append(__object) + + def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): + if __object is None: + return + if __object.id in self._contains_ids: + return + + exists_in_collection = self._contained_in_sub(__object) + if len(exists_in_collection) and self is exists_in_collection[0]: + # assuming that the object already is contained in the correct collections + if not already_is_parent: + self._merge_in_self(__object, from_map=from_map) + return + + if not len(exists_in_collection): + self._append(__object, from_map=from_map) + else: + pass + exists_in_collection[0]._merge_in_self(__object, from_map=from_map) + + if not already_is_parent or not self._is_root: + for parent_collection in self._get_parents_of_multiple_contained_children(__object): + pass + parent_collection.append(__object, already_is_parent=True, from_map=from_map) + + def extend(self, __iterable: Optional[Iterable[T]]): + if __iterable is None: + return + + for __object in __iterable: + self.append(__object) + + def sync_with_other_collection(self, equal_collection: "Collection"): + """ + If two collections always need to have the same values, this can be used. + + Internally: + 1. import the data from other to self + - _data + - contained_collections + 2. replace all refs from the other object, with refs from this object + """ + if equal_collection is self: + return + + # don't add the elements from the subelements from the other collection. + # this will be done in the next step. + self.extend(equal_collection._data) + # add all submodules + for equal_sub_collection in equal_collection.contained_collections: + self.contain_collection_inside(equal_sub_collection) + + # now the ugly part + # replace all refs of the other element with this one + self._risky_merge(equal_collection) + + def contain_collection_inside(self, sub_collection: "Collection"): + """ + This collection will ALWAYS contain everything from the passed in collection + """ + if sub_collection in self.contained_collections: + return + + self.contained_collections.append(sub_collection) + sub_collection.upper_collections.append(self) + + @property + def data(self) -> List[T]: + return [*self._data, + *(__object for collection in self.contained_collections for __object in collection.shallow_list)] + + def __len__(self) -> int: + return len(self._data) + sum(len(collection) for collection in self.contained_collections) + + def __iter__(self) -> Iterator[T]: + for element in self._data: + yield element diff --git a/src/music_kraken/objects/old_collection.py b/src/music_kraken/objects/old_collection.py index 4f50ff1..4aa8f21 100644 --- a/src/music_kraken/objects/old_collection.py +++ b/src/music_kraken/objects/old_collection.py @@ -1,221 +1,256 @@ -from typing import List, Iterable, Dict, TypeVar, Generic, Iterator +from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type from collections import defaultdict -from dataclasses import dataclass from .parents import DatabaseObject -from ..utils.hooks import HookEventTypes, Hooks, Event - - -class CollectionHooks(HookEventTypes): - APPEND_NEW = "append_new" - +from ..utils.support_classes.hacking import MetaClass T = TypeVar('T', bound=DatabaseObject) -@dataclass -class AppendResult: - was_in_collection: bool - current_element: DatabaseObject - was_the_same: bool - - class Collection(Generic[T]): - """ - This a class for the iterables - like tracklist or discography - """ _data: List[T] - _by_url: dict - _by_attribute: dict + _indexed_values: Dict[str, set] + _indexed_to_objects: Dict[any, list] - def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None: - # Attribute needs to point to - self.element_type = element_type + shallow_list = property(fget=lambda self: self.data) - self._data: List[T] = list() + def __init__( + self, data: Optional[Iterable[T]] = None, + sync_on_append: Dict[str, "Collection"] = None, + contain_given_in_attribute: Dict[str, "Collection"] = None, + contain_attribute_in_given: Dict[str, "Collection"] = None, + append_object_to_attribute: Dict[str, DatabaseObject] = None + ) -> None: + self._contains_ids = set() + self._data = [] + self.upper_collections: List[Collection[T]] = [] + self.contained_collections: List[Collection[T]] = [] - """ - example of attribute_to_object_map - the song objects are references pointing to objects - in _data - - ```python - { - 'id': {323: song_1, 563: song_2, 666: song_3}, - 'url': {'www.song_2.com': song_2} - } - ``` - """ - self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict) - self._used_ids: set = set() + # List of collection attributes that should be modified on append + # Key: collection attribute (str) of appended element + # Value: main collection to sync to + self.sync_on_append: Dict[str, Collection] = sync_on_append or {} + self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {} + self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {} + self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {} - self.hooks: Hooks = Hooks(self) + self.contain_self_on_append: List[str] = [] - if data is not None: - self.extend(data, merge_on_conflict=True) + self._indexed_values = defaultdict(set) + self._indexed_to_objects = defaultdict(list) - def sort(self, reverse: bool = False, **kwargs): - self._data.sort(reverse=reverse, **kwargs) + self.extend(data) - def map_element(self, element: T): - for name, value in element.indexing_values: + def _map_element(self, __object: T, from_map: bool = False): + self._contains_ids.add(__object.id) + + for name, value in __object.indexing_values: if value is None: continue - self._attribute_to_object_map[name][value] = element + self._indexed_values[name].add(value) + self._indexed_to_objects[value].append(__object) - self._used_ids.add(element.id) + if not from_map: + for attribute, new_object in self.contain_given_in_attribute.items(): + __object.__getattribute__(attribute).contain_collection_inside(new_object) - def unmap_element(self, element: T): - for name, value in element.indexing_values: + for attribute, new_object in self.contain_given_in_attribute.items(): + new_object.contain_collection_inside(__object.__getattribute__(attribute)) + + for attribute, new_object in self.append_object_to_attribute.items(): + __object.__getattribute__(attribute).append(new_object, from_map=True) + + def _unmap_element(self, __object: T): + self._contains_ids.remove(__object.id) + + for name, value in __object.indexing_values: if value is None: continue + if value not in self._indexed_values[name]: + continue - if value in self._attribute_to_object_map[name]: - if element is self._attribute_to_object_map[name][value]: - try: - self._attribute_to_object_map[name].pop(value) - except KeyError: - pass + try: + self._indexed_to_objects[value].remove(__object) + except ValueError: + continue - def append(self, element: T, merge_on_conflict: bool = True, - merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult: + if not len(self._indexed_to_objects[value]): + self._indexed_values[name].remove(value) + + def _contained_in_self(self, __object: T) -> bool: + if __object.id in self._contains_ids: + return True + + for name, value in __object.indexing_values: + if value is None: + continue + if value in self._indexed_values[name]: + return True + return False + + def _get_root_collections(self) -> List["Collection"]: + if not len(self.upper_collections): + return [self] + + root_collections = [] + for upper_collection in self.upper_collections: + root_collections.extend(upper_collection._get_root_collections()) + return root_collections + + @property + def _is_root(self) -> bool: + return len(self.upper_collections) <= 0 + + def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]: + results = [] + + if self._contained_in_self(__object): + return [self] + + for collection in self.contained_collections: + results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) + if break_at_first: + return results + + return results + + def _get_parents_of_multiple_contained_children(self, __object: T): + results = [] + if len(self.contained_collections) < 2 or self._contained_in_self(__object): + return results + + count = 0 + + for collection in self.contained_collections: + sub_results = collection._get_parents_of_multiple_contained_children(__object) + + if len(sub_results) > 0: + count += 1 + results.extend(sub_results) + + if count >= 2: + results.append(self) + + return results + + def _merge_in_self(self, __object: T, from_map: bool = False): """ - :param element: - :param merge_on_conflict: - :param merge_into_existing: - :return did_not_exist: + 1. find existing objects + 2. merge into existing object + 3. remap existing object """ - if element is None: - return AppendResult(False, None, False) - - for existing_element in self._data: - if element is existing_element: - return AppendResult(False, None, False) - - # if the element type has been defined in the initializer it checks if the type matches - if self.element_type is not None and not isinstance(element, self.element_type): - raise TypeError(f"{type(element)} is not the set type {self.element_type}") - - # return if the same instance of the object is in the list - for existing in self._data: - if element is existing: - return AppendResult(True, element, True) - - for name, value in element.indexing_values: - if value in self._attribute_to_object_map[name]: - existing_object = self._attribute_to_object_map[name][value] - - if not merge_on_conflict: - return AppendResult(True, existing_object, False) - - # if the object does already exist - # thus merging and don't add it afterward - if merge_into_existing: - existing_object.merge(element) - # in case any relevant data has been added (e.g. it remaps the old object) - self.map_element(existing_object) - return AppendResult(True, existing_object, False) - - element.merge(existing_object) - - exists_at = self._data.index(existing_object) - self._data[exists_at] = element - - self.unmap_element(existing_object) - self.map_element(element) - return AppendResult(True, existing_object, False) - - if not no_hook: - self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element) - self._data.append(element) - self.map_element(element) - - return AppendResult(False, element, False) - - def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True, - merge_into_existing: bool = True, no_hook: bool = False): - if element_list is None: + if __object.id in self._contains_ids: return - if len(element_list) <= 0: + + existing_object: DatabaseObject = None + + for name, value in __object.indexing_values: + if value is None: + continue + if value in self._indexed_values[name]: + existing_object = self._indexed_to_objects[value][0] + if existing_object.id == __object.id: + return None + + break + + if existing_object is None: + return None + + existing_object.merge(__object, replace_all_refs=True) + + # just a check if it really worked + if existing_object.id != __object.id: + raise ValueError("This should NEVER happen. Merging doesn't work.") + + self._map_element(existing_object, from_map=from_map) + + def contains(self, __object: T) -> bool: + return len(self._contained_in_sub(__object)) > 0 + + def _append(self, __object: T, from_map: bool = False): + for attribute, to_sync_with in self.sync_on_append.items(): + pass + to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute)) + + self._map_element(__object, from_map=from_map) + self._data.append(__object) + + def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): + if __object is None: return - if element_list is self: + if __object.id in self._contains_ids: return - for element in element_list: - self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook) - def sync_collection(self, collection_attribute: str): - def on_append(event: Event, new_object: T, *args, **kwargs): - new_collection = new_object.__getattribute__(collection_attribute) - if self is new_collection: - return + exists_in_collection = self._contained_in_sub(__object) + if len(exists_in_collection) and self is exists_in_collection[0]: + # assuming that the object already is contained in the correct collections + if not already_is_parent: + self._merge_in_self(__object, from_map=from_map) + return - self.extend(new_object.__getattribute__(collection_attribute), no_hook=True) - new_object.__setattr__(collection_attribute, self) + if not len(exists_in_collection): + self._append(__object, from_map=from_map) + else: + pass + exists_in_collection[0]._merge_in_self(__object, from_map=from_map) - self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append) + if not already_is_parent or not self._is_root: + for parent_collection in self._get_parents_of_multiple_contained_children(__object): + pass + parent_collection.append(__object, already_is_parent=True, from_map=from_map) - def sync_main_collection(self, main_collection: "Collection", collection_attribute: str): - def on_append(event: Event, new_object: T, *args, **kwargs): - new_collection = new_object.__getattribute__(collection_attribute) - if main_collection is new_collection: - return - - main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True) - new_object.__setattr__(collection_attribute, main_collection) + def extend(self, __iterable: Optional[Iterable[T]]): + if __iterable is None: + return - self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append) + for __object in __iterable: + self.append(__object) - """ - def on_append(event: Event, new_object: T, *args, **kwargs): - new_collection: Collection = new_object.__getattribute__(collection_attribute) - if self is new_collection: - return - - self.extend(new_collection.shallow_list, no_hook=False) - new_object.__setattr__(collection_attribute, self) + def sync_with_other_collection(self, equal_collection: "Collection"): + """ + If two collections always need to have the same values, this can be used. - self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append) - """ + Internally: + 1. import the data from other to self + - _data + - contained_collections + 2. replace all refs from the other object, with refs from this object + """ + if equal_collection is self: + return + + # don't add the elements from the subelements from the other collection. + # this will be done in the next step. + self.extend(equal_collection._data) + # add all submodules + for equal_sub_collection in equal_collection.contained_collections: + self.contain_collection_inside(equal_sub_collection) + + # now the ugly part + # replace all refs of the other element with this one + self._risky_merge(equal_collection) + + def contain_collection_inside(self, sub_collection: "Collection"): + """ + This collection will ALWAYS contain everything from the passed in collection + """ + if sub_collection in self.contained_collections: + return + + self.contained_collections.append(sub_collection) + sub_collection.upper_collections.append(self) + + @property + def data(self) -> List[T]: + return [*self._data, + *(__object for collection in self.contained_collections for __object in collection.shallow_list)] + + def __len__(self) -> int: + return len(self._data) + sum(len(collection) for collection in self.contained_collections) def __iter__(self) -> Iterator[T]: for element in self._data: - yield element - - def __str__(self) -> str: - return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)]) - - def __len__(self) -> int: - return len(self._data) - - def __getitem__(self, key) -> T: - if type(key) != int: - return ValueError("key needs to be an integer") - - return self._data[key] - - def __setitem__(self, key, value: T): - if type(key) != int: - return ValueError("key needs to be an integer") - - old_item = self._data[key] - self.unmap_element(old_item) - self.map_element(value) - - self._data[key] = value - - @property - def shallow_list(self) -> List[T]: - """ - returns a shallow copy of the data list - """ - return self._data.copy() - - @property - def empty(self) -> bool: - return len(self._data) == 0 - - def clear(self): - self.__init__(element_type=self.element_type) + yield element \ No newline at end of file diff --git a/src/music_kraken/objects/parents.py b/src/music_kraken/objects/parents.py index 9431db2..13a90f6 100644 --- a/src/music_kraken/objects/parents.py +++ b/src/music_kraken/objects/parents.py @@ -9,12 +9,13 @@ from .option import Options from ..utils.shared import HIGHEST_ID from ..utils.config import main_settings, logging_settings from ..utils.support_classes.hacking import MetaClass - +from ..utils.exception.objects import IsDynamicException LOGGER = logging_settings["object_logger"] P = TypeVar('P') + @dataclass class StaticAttribute(Generic[P]): name: str @@ -27,6 +28,139 @@ class StaticAttribute(Generic[P]): is_upwards_collection: bool = False +class InnerData: + """ + This is the core class, which is used for every Data class. + The attributes are set, and can be merged. + + The concept is, that the outer class proxies this class. + If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected. + """ + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + self.__setattr__(key, value) + + def __merge__(self, __other: InnerData, override: bool = False): + """ + TODO + is default is totally ignored + + :param __other: + :param override: + :return: + """ + + for key, value in __other.__dict__.items(): + # just set the other value if self doesn't already have it + if key not in self.__dict__: + self.__setattr__(key, value) + continue + + # if the object of value implemented __merge__, it merges + existing = self.__getattribute__(key) + if hasattr(type(existing), "__merge__"): + existing.merge_into_self(value, override) + continue + + # override the existing value if requested + if override: + self.__setattr__(key, value) + + + +class OuterProxy: + """ + Wraps the inner data, and provides apis, to naturally access those values. + """ + + _default_factories: dict + + def __init__(self, _id: int = None, dynamic: bool = False, **kwargs): + _automatic_id: bool = False + + if _id is None and not dynamic: + """ + generates a random integer id + the range is defined in the config + """ + _id = random.randint(0, HIGHEST_ID) + _automatic_id = True + + kwargs["automatic_id"] = _automatic_id + kwargs["id"] = _id + kwargs["dynamic"] = dynamic + + for name, factory in type(self)._default_factories.items(): + if name not in kwargs: + kwargs[name] = factory() + + self._inner: InnerData = InnerData(**kwargs) + self.__init_collections__() + + for name, data_list in kwargs.items(): + if isinstance(data_list, list) and name.endswith("_list"): + collection_name = name.replace("_list", "_collection") + + if collection_name not in self.__dict__: + continue + + collection = self.__getattribute__(collection_name) + collection.extend(data_list) + + def __init_collections__(self): + pass + + def __getattribute__(self, __name: str) -> Any: + """ + Returns the attribute of _inner if the attribute exists, + else it returns the attribute of self. + + That the _inner gets checked first is essential for the type hints. + :param __name: + :return: + """ + + _inner: InnerData = super().__getattribute__("_inner") + try: + return _inner.__getattribute__(__name) + except AttributeError: + return super().__getattribute__(__name) + + def __setattr__(self, __name, __value): + if not __name.startswith("_") and hasattr(self, "_inner"): + _inner: InnerData = super().__getattribute__("_inner") + return _inner.__setattr__(__name, __value) + + return super().__setattr__(__name, __value) + + def __hash__(self): + """ + :raise: IsDynamicException + :return: + """ + + if self.dynamic: + return id(self._inner) + + return self.id + + def __eq__(self, other: Any): + return self.__hash__() == other.__hash__() + + def merge(self, __other: OuterProxy, override: bool = False): + """ + 1. merges the data of __other in self + 2. replaces the data of __other with the data of self + + :param __other: + :param override: + :return: + """ + self._inner.__merge__(__other._inner, override=override) + __other._inner = self._inner + + class Attribute(Generic[P]): def __init__(self, database_object: "DatabaseObject", static_attribute: StaticAttribute) -> None: self.database_object: DatabaseObject = database_object @@ -38,12 +172,11 @@ class Attribute(Generic[P]): def get(self) -> P: return self.database_object.__getattribute__(self.name) - + def set(self, value: P): self.database_object.__setattr__(self.name, value) - class DatabaseObject(metaclass=MetaClass): COLLECTION_STRING_ATTRIBUTES: tuple = tuple() SIMPLE_STRING_ATTRIBUTES: dict = dict() @@ -77,7 +210,7 @@ class DatabaseObject(metaclass=MetaClass): for static_attribute in self.STATIC_ATTRIBUTES: attribute: Attribute = Attribute(self, static_attribute) self._attributes.append(attribute) - + if static_attribute.is_collection: if static_attribute.is_collection: self._collection_attributes.append(attribute) @@ -94,6 +227,8 @@ class DatabaseObject(metaclass=MetaClass): self.dynamic = dynamic self.build_version = -1 + super().__init__() + @property def upwards_collection(self) -> Collection: for attribute in self._upwards_collection_attributes: @@ -114,10 +249,19 @@ class DatabaseObject(metaclass=MetaClass): raise TypeError("Dynamic DatabaseObjects are unhashable.") return self.id + def __deep_eq__(self, other) -> bool: + if not isinstance(other, type(self)): + return False + + return super().__eq__(other) + def __eq__(self, other) -> bool: if not isinstance(other, type(self)): return False + if super().__eq__(other): + return True + # add the checks for dynamic, to not throw an exception if not self.dynamic and not other.dynamic and self.id == other.id: return True @@ -152,10 +296,10 @@ class DatabaseObject(metaclass=MetaClass): if other is None: return - - if self.id == other.id: + + if self.__deep_eq__(other): return - + if not isinstance(other, type(self)): LOGGER.warning(f"can't merge \"{type(other)}\" into \"{type(self)}\"") return @@ -163,6 +307,7 @@ class DatabaseObject(metaclass=MetaClass): for collection in self._collection_attributes: if hasattr(self, collection.name) and hasattr(other, collection.name): if collection.get() is not getattr(other, collection.name): + pass collection.get().extend(getattr(other, collection.name)) for simple_attribute, default_value in type(self).SIMPLE_STRING_ATTRIBUTES.items(): @@ -190,7 +335,7 @@ class DatabaseObject(metaclass=MetaClass): @property def option_string(self) -> str: return self.__repr__() - + def _build_recursive_structures(self, build_version: int, merge: False): pass @@ -202,7 +347,7 @@ class DatabaseObject(metaclass=MetaClass): no need to override if only the recursive structure should be build. override self.build_recursive_structures() instead """ - + self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into) def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]): diff --git a/src/music_kraken/objects/song.py b/src/music_kraken/objects/song.py index d7b1cef..0af1489 100644 --- a/src/music_kraken/objects/song.py +++ b/src/music_kraken/objects/song.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import random from collections import defaultdict from typing import List, Optional, Dict, Tuple, Type @@ -15,11 +17,13 @@ from .metadata import ( Metadata ) from .option import Options -from .parents import MainObject, DatabaseObject, StaticAttribute +from .parents import DatabaseObject, StaticAttribute from .source import Source, SourceCollection from .target import Target from ..utils.string_processing import unify +from .parents import OuterProxy as Base + from ..utils.config import main_settings """ @@ -30,12 +34,39 @@ CountryTyping = type(list(pycountry.countries)[0]) OPTION_STRING_DELIMITER = " | " -class Song(MainObject): +class Song(Base): """ Class representing a song object, with attributes id, mb_id, title, album_name, isrc, length, tracksort, genre, source_list, target, lyrics_list, album, main_artist_list, and feature_artist_list. """ + title: str + unified_title: str + isrc: str + length: int + genre: str + note: FormattedText + + source_collection: SourceCollection + target_collection: Collection[Target] + lyrics_collection: Collection[Lyrics] + main_artist_collection: Collection[Artist] + feature_artist_collection: Collection[Artist] + album_collection: Collection[Album] + + _default_factories = { + "note": FormattedText, + "length": lambda: 0, + "source_collection": SourceCollection, + "target_collection": Collection, + "lyrics_collection": Collection, + + "main_artist_collection": Collection, + "album_collection": Collection, + "feature_artist_collection": Collection + } + + """ COLLECTION_STRING_ATTRIBUTES = ( "lyrics_collection", "album_collection", "main_artist_collection", "feature_artist_collection", "source_collection") @@ -48,118 +79,38 @@ class Song(MainObject): "genre": None, "notes": FormattedText() } + """ UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection") + """ + title: str = None, + unified_title: str = None, + isrc: str = None, + length: int = None, + tracksort: int = None, + genre: str = None, + source_list: List[Source] = None, + target_list: List[Target] = None, + lyrics_list: List[Lyrics] = None, + album_list: List['Album'] = None, + main_artist_list: List['Artist'] = None, + feature_artist_list: List['Artist'] = None, + notes: FormattedText = None, + """ + def __init_collections__(self) -> None: + self.album_collection.contain_given_in_attribute = { + "artist_collection": self.main_artist_collection, + } + self.album_collection.append_object_to_attribute = { + "song_collection": self, + } - STATIC_ATTRIBUTES = [ - StaticAttribute(name="title", weight=.5), - StaticAttribute(name="unified_title", weight=.3), - StaticAttribute(name="isrc", weight=1), - StaticAttribute(name="length"), - StaticAttribute(name="tracksort", default_value=0), - StaticAttribute(name="genre"), - StaticAttribute(name="notes", default_value=FormattedText()), - - StaticAttribute(name="source_collection", is_collection=True), - StaticAttribute(name="lyrics_collection", is_collection=True), - StaticAttribute(name="album_collection", is_collection=True, is_upwards_collection=True), - StaticAttribute(name="main_artist_collection", is_collection=True, is_upwards_collection=True), - StaticAttribute(name="feature_artist_collection", is_collection=True, is_upwards_collection=True) - ] - - def __init__( - self, - _id: int = None, - dynamic: bool = False, - title: str = None, - unified_title: str = None, - isrc: str = None, - length: int = None, - tracksort: int = None, - genre: str = None, - source_list: List[Source] = None, - target_list: List[Target] = None, - lyrics_list: List[Lyrics] = None, - album_list: List['Album'] = None, - main_artist_list: List['Artist'] = None, - feature_artist_list: List['Artist'] = None, - notes: FormattedText = None, - **kwargs - ) -> None: - super().__init__(_id=_id, dynamic=dynamic, **kwargs) - # attributes - self.title: str = title - self.unified_title: str = unified_title - if unified_title is None and title is not None: - self.unified_title = unify(title) - - self.isrc: str = isrc - self.length: int = length - self.tracksort: int = tracksort or 0 - self.genre: str = genre - self.notes: FormattedText = notes or FormattedText() - - self.source_collection: SourceCollection = SourceCollection(source_list) - self.target_collection: Collection[Target] = Collection(data=target_list) - self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list) - - # main_artist_collection = album.artist collection - self.main_artist_collection: Collection[Artist] = Collection(data=[]) - - # this album_collection equals no collection - self.album_collection: Collection[Album] = Collection(data=album_list, - contain_given_in_attribute={ - "artist_collection": self.main_artist_collection - }, append_object_to_attribute={ - "song_collection": self - }) - - self.main_artist_collection.contain_given_in_attribute = {"main_album_collection": self.album_collection} - self.main_artist_collection.extend(main_artist_list) - - self.feature_artist_collection: Collection[Artist] = Collection( - data=feature_artist_list, - append_object_to_attribute={ - "feature_song_collection": self - } - ) - - def _build_recursive_structures(self, build_version: int, merge: bool): - if build_version == self.build_version: - return - self.build_version = build_version - - album: Album - for album in self.album_collection: - album.song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False) - album._build_recursive_structures(build_version=build_version, merge=merge) - - artist: Artist - for artist in self.feature_artist_collection: - artist.feature_song_collection.append(self, merge_on_conflict=merge, merge_into_existing=False) - artist._build_recursive_structures(build_version=build_version, merge=merge) - - for artist in self.main_artist_collection: - for album in self.album_collection: - artist.main_album_collection.append(album, merge_on_conflict=merge, merge_into_existing=False) - artist._build_recursive_structures(build_version=build_version, merge=merge) - - def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]): - if object_type is Song: - return - - if object_type is Lyrics: - self.lyrics_collection.extend(object_list) - return - - if object_type is Artist: - self.main_artist_collection.extend(object_list) - return - - if object_type is Album: - self.album_collection.extend(object_list) - return - + self.main_artist_collection.contain_given_in_attribute = { + "main_album_collection": self.album_collection + } + self.feature_artist_collection.append_object_to_attribute = { + "feature_song_collection": self + } @property def indexing_values(self) -> List[Tuple[str, object]]: @@ -245,7 +196,7 @@ All objects dependent on Album """ -class Album(MainObject): +class Album(Base): COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection", "song_collection") SIMPLE_STRING_ATTRIBUTES = { "title": None, @@ -259,6 +210,16 @@ class Album(MainObject): "notes": FormattedText() } + title: str + unified_title: str + album_status: str + album_type: AlbumType + language: LanguageSelector + + _default_factories = { + "album_type": AlbumType.OTHER + } + DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection", ) UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection") @@ -298,7 +259,7 @@ class Album(MainObject): notes: FormattedText = None, **kwargs ) -> None: - MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs) + Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs) self.title: str = title self.unified_title: str = unified_title @@ -512,7 +473,7 @@ All objects dependent on Artist """ -class Artist(MainObject): +class Artist(Base): COLLECTION_STRING_ATTRIBUTES = ( "feature_song_collection", "main_album_collection", @@ -570,7 +531,7 @@ class Artist(MainObject): unformated_location: str = None, **kwargs ): - MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs) + Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs) self.name: str = name self.unified_name: str = unified_name @@ -806,7 +767,7 @@ Label """ -class Label(MainObject): +class Label(Base): COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection") SIMPLE_STRING_ATTRIBUTES = { "name": None, @@ -837,7 +798,7 @@ class Label(MainObject): source_list: List[Source] = None, **kwargs ): - MainObject.__init__(self, _id=_id, dynamic=dynamic, **kwargs) + Base.__init__(self, _id=_id, dynamic=dynamic, **kwargs) self.name: str = name self.unified_name: str = unified_name diff --git a/src/music_kraken/objects/source.py b/src/music_kraken/objects/source.py index a4e489a..4fb1e40 100644 --- a/src/music_kraken/objects/source.py +++ b/src/music_kraken/objects/source.py @@ -121,7 +121,8 @@ class Source(DatabaseObject): class SourceCollection(Collection): - def __init__(self, source_list: List[Source]): + def __init__(self, source_list: List[Source] = None): + source_list = source_list if source_list is not None else [] self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list) super().__init__(data=source_list) diff --git a/src/music_kraken/utils/exception/objects.py b/src/music_kraken/utils/exception/objects.py new file mode 100644 index 0000000..ff3026d --- /dev/null +++ b/src/music_kraken/utils/exception/objects.py @@ -0,0 +1,10 @@ +class ObjectException(Exception): + pass + + +class IsDynamicException(Exception): + """ + Gets raised, if a dynamic data object tries to perform an action, + which does not make sense for a dynamic object. + """ + pass diff --git a/src/music_kraken/utils/support_classes/hacking.py b/src/music_kraken/utils/support_classes/hacking.py index abcf67f..1125768 100644 --- a/src/music_kraken/utils/support_classes/hacking.py +++ b/src/music_kraken/utils/support_classes/hacking.py @@ -1,7 +1,8 @@ +import weakref from types import FunctionType from functools import wraps -from typing import Dict +from typing import Dict, Set class Lake: def __init__(self): @@ -17,15 +18,34 @@ class Lake: return self.id_to_object[_id] except KeyError: self.add(db_object) - return db_object + return db_object def add(self, db_object: object): self.id_to_object[id(db_object)] = db_object def override(self, to_override: object, new_db_object: object): - self.redirects[id(to_override)] = id(new_db_object) - if id(to_override) in self.id_to_object: - del self.id_to_object[id(to_override)] + _id = id(to_override) + while _id in self.redirects: + _id = self.redirects[_id] + + if id(new_db_object) in self.id_to_object: + print("!!!!!") + + self.add(new_db_object) + self.redirects[_id] = id(new_db_object) + # if _id in self.id_to_object: + # del self.id_to_object[_id] + + def is_same(self, __object: object, other: object) -> bool: + _self_id = id(__object) + while _self_id in self.redirects: + _self_id = self.redirects[_self_id] + + _other_id = id(other) + while _other_id in self.redirects: + _other_id = self.redirects[_other_id] + + return _self_id == _other_id lake = Lake() @@ -35,11 +55,20 @@ def wrapper(method): @wraps(method) def wrapped(*args, **kwargs): return method(*(lake.get_real_object(args[0]), *args[1:]), **kwargs) + return wrapped - class BaseClass: + def __new__(cls, *args, **kwargs): + instance = cls(*args, **kwargs) + print("new") + lake.add(instance) + return instance + + def __eq__(self, other): + return lake.is_same(self, other) + def _risky_merge(self, to_replace): lake.override(to_replace, self) @@ -49,17 +78,27 @@ class MetaClass(type): bases = (*bases, BaseClass) newClassDict = {} + ignore_functions: Set[str] = {"__new__", "__init__"} + for attributeName, attribute in classDict.items(): - if isinstance(attribute, FunctionType) and attributeName not in ("__new__", "__init__"): + if isinstance(attribute, FunctionType) and (attributeName not in ignore_functions): + """ + The funktion new and init shouldn't be accounted for because we can assume the class is + independent on initialization. + """ attribute = wrapper(attribute) + newClassDict[attributeName] = attribute - for key, value in object.__dict__.items( ): - if hasattr( value, '__call__' ) and value not in newClassDict and key not in ("__new__", "__init__"): + print() + + for key, value in object.__dict__.items(): + # hasattr( value, '__call__' ) and + if hasattr(value, '__call__') and value not in newClassDict and key not in ("__new__", "__init__"): newClassDict[key] = wrapper(value) new_instance = type.__new__(meta, classname, bases, newClassDict) lake.add(new_instance) - return new_instance \ No newline at end of file + return new_instance