diff --git a/.vscode/settings.json b/.vscode/settings.json index 75710b5..2a9cc2b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,9 +19,13 @@ "APIC", "Bandcamp", "dotenv", + "encyclopaedia", "levenshtein", + "metallum", + "musify", "OKBLUE", "Referer", - "tracksort" + "tracksort", + "youtube" ] } \ No newline at end of file diff --git a/development/__init__.py b/development/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/development/actual_donwload.py b/development/actual_donwload.py index 4b242ed..76cc2a3 100644 --- a/development/actual_donwload.py +++ b/development/actual_donwload.py @@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG) if __name__ == "__main__": commands = [ - "s: #a Toxoplasma", - "d: 16", + "s: #a And End...", + "d: 10", ] diff --git a/development/objects_collection.py b/development/objects_collection.py new file mode 100644 index 0000000..9e147f5 --- /dev/null +++ b/development/objects_collection.py @@ -0,0 +1,92 @@ +import music_kraken +from music_kraken.objects import Song, Album, Artist, Collection + +if __name__ == "__main__": + artist: Artist = Artist( + name="artist", + main_album_list=[ + Album( + title="album", + song_list=[ + Song( + title="song", + album_list=[ + Album( + title="album", + albumsort=123, + main_artist=Artist(name="artist"), + ), + ], + ), + Song( + title="other_song", + album_list=[ + Album(title="album", albumsort=423), + ], + ), + ] + ), + Album(title="album", barcode="1234567890123"), + ] + ) + + + other_artist: Artist = Artist( + name="artist", + main_album_list=[ + Album( + title="album", + song_list=[ + Song( + title="song", + album_list=[ + Album( + title="album", + albumsort=123, + main_artist=Artist(name="other_artist"), + ), + ], + ), + Song( + title="other_song", + album_list=[ + Album(title="album", albumsort=423), + ], + ), + ] + ), + Album(title="album", barcode="1234567890123"), + ] + ) + + artist.merge(other_artist) + + a = artist.main_album_collection[0] + b = a.song_collection[0].album_collection[0] + c = a.song_collection[1].album_collection[0] + d = b.song_collection[0].album_collection[0] + e = d.song_collection[0].album_collection[0] + f = e.song_collection[0].album_collection[0] + g = f.song_collection[0].album_collection[0] + + print(a.id, a.title, a.barcode, a.albumsort) + print(b.id, b.title, b.barcode, b.albumsort) + print(c.id, c.title, c.barcode, c.albumsort) + print(d.id, d.title, d.barcode, d.albumsort) + print(e.id, e.title, e.barcode, e.albumsort) + print(f.id, f.title, f.barcode, f.albumsort) + print(g.id, g.title, g.barcode, g.albumsort) + print() + + d.title = "new_title" + + print(a.id, a.title, a.barcode, a.albumsort) + print(b.id, b.title, b.barcode, b.albumsort) + print(c.id, c.title, c.barcode, c.albumsort) + print(d.id, d.title, d.barcode, d.albumsort) + print(e.id, e.title, e.barcode, e.albumsort) + print(f.id, f.title, f.barcode, f.albumsort) + print(g.id, g.title, g.barcode, g.albumsort) + print() + + print(artist.main_album_collection._indexed_values) \ No newline at end of file diff --git a/music_kraken/__init__.py b/music_kraken/__init__.py index 73dbbf1..7697a3b 100644 --- a/music_kraken/__init__.py +++ b/music_kraken/__init__.py @@ -46,7 +46,7 @@ init_logging() from . import cli if DEBUG: - sys.setrecursionlimit(100) + sys.setrecursionlimit(500) if main_settings['modify_gc']: diff --git a/music_kraken/audio/metadata.py b/music_kraken/audio/metadata.py index 114879a..a0d8386 100644 --- a/music_kraken/audio/metadata.py +++ b/music_kraken/audio/metadata.py @@ -29,6 +29,8 @@ class AudioMetadata: """ https://www.programcreek.com/python/example/84797/mutagen.id3.ID3 """ + if value is None: + continue self.frames.add(value) def add_song_metadata(self, song: Song): diff --git a/music_kraken/objects/collection.py b/music_kraken/objects/collection.py index 5e078fb..92617f0 100644 --- a/music_kraken/objects/collection.py +++ b/music_kraken/objects/collection.py @@ -110,7 +110,7 @@ class Collection(Generic[T]): if self._contained_in_self(__object): return [self] - for collection in self.children: + for collection in (*self.children, *self.parents): results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) if break_at_first: diff --git a/music_kraken/objects/lyrics.py b/music_kraken/objects/lyrics.py index 3650bfa..09f7a3a 100644 --- a/music_kraken/objects/lyrics.py +++ b/music_kraken/objects/lyrics.py @@ -34,6 +34,6 @@ class Lyrics(OuterProxy): @property def metadata(self) -> Metadata: return Metadata({ - id3Mapping.UNSYNCED_LYRICS: self.text.html + id3Mapping.UNSYNCED_LYRICS: [self.text.html] }) diff --git a/music_kraken/objects/new_collection.py b/music_kraken/objects/new_collection.py deleted file mode 100644 index 1a556b6..0000000 --- a/music_kraken/objects/new_collection.py +++ /dev/null @@ -1,257 +0,0 @@ -from __future__ import annotations - -from collections import defaultdict -from typing import TypeVar, Generic, Dict, Optional, Iterable, List -from .parents import OuterProxy - -T = TypeVar('T', bound=OuterProxy) - - -class Collection(Generic[T]): - _data: List[T] - - _indexed_values: Dict[str, set] - _indexed_to_objects: Dict[any, list] - - shallow_list = property(fget=lambda self: self.data) - - def __init__( - self, - data: Optional[Iterable[T]] = None, - sync_on_append: Dict[str, "Collection"] = None, - contain_given_in_attribute: Dict[str, "Collection"] = None, - contain_attribute_in_given: Dict[str, "Collection"] = None, - append_object_to_attribute: Dict[str, T] = None - ) -> None: - self._contains_ids = set() - self._data = [] - self.upper_collections: List[Collection[T]] = [] - self.contained_collections: List[Collection[T]] = [] - - # List of collection attributes that should be modified on append - # Key: collection attribute (str) of appended element - # Value: main collection to sync to - self.sync_on_append: Dict[str, Collection] = sync_on_append or {} - self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {} - self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {} - self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} - - self.contain_self_on_append: List[str] = [] - - self._indexed_values = defaultdict(set) - self._indexed_to_objects = defaultdict(list) - - self.extend(data) - - def _map_element(self, __object: T, from_map: bool = False): - self._contains_ids.add(__object.id) - - for name, value in __object.indexing_values: - if value is None: - continue - - self._indexed_values[name].add(value) - self._indexed_to_objects[value].append(__object) - - if not from_map: - for attribute, new_object in self.contain_given_in_attribute.items(): - __object.__getattribute__(attribute).contain_collection_inside(new_object) - - for attribute, new_object in self.contain_given_in_attribute.items(): - new_object.contain_collection_inside(__object.__getattribute__(attribute)) - - for attribute, new_object in self.append_object_to_attribute.items(): - __object.__getattribute__(attribute).append(new_object, from_map=True) - - def _unmap_element(self, __object: T): - self._contains_ids.remove(__object.id) - - for name, value in __object.indexing_values: - if value is None: - continue - if value not in self._indexed_values[name]: - continue - - try: - self._indexed_to_objects[value].remove(__object) - except ValueError: - continue - - if not len(self._indexed_to_objects[value]): - self._indexed_values[name].remove(value) - - def _contained_in_self(self, __object: T) -> bool: - if __object.id in self._contains_ids: - return True - - for name, value in __object.indexing_values: - if value is None: - continue - if value in self._indexed_values[name]: - return True - return False - - def _get_root_collections(self) -> List["Collection"]: - if not len(self.upper_collections): - return [self] - - root_collections = [] - for upper_collection in self.upper_collections: - root_collections.extend(upper_collection._get_root_collections()) - return root_collections - - @property - def _is_root(self) -> bool: - return len(self.upper_collections) <= 0 - - def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]: - results = [] - - if self._contained_in_self(__object): - return [self] - - for collection in self.contained_collections: - results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) - if break_at_first: - return results - - return results - - def _get_parents_of_multiple_contained_children(self, __object: T): - results = [] - if len(self.contained_collections) < 2 or self._contained_in_self(__object): - return results - - count = 0 - - for collection in self.contained_collections: - sub_results = collection._get_parents_of_multiple_contained_children(__object) - - if len(sub_results) > 0: - count += 1 - results.extend(sub_results) - - if count >= 2: - results.append(self) - - return results - - def _merge_in_self(self, __object: T, from_map: bool = False): - """ - 1. find existing objects - 2. merge into existing object - 3. remap existing object - """ - if __object.id in self._contains_ids: - return - - existing_object: DatabaseObject = None - - for name, value in __object.indexing_values: - if value is None: - continue - if value in self._indexed_values[name]: - existing_object = self._indexed_to_objects[value][0] - if existing_object.id == __object.id: - return None - - break - - if existing_object is None: - return None - - existing_object.merge(__object, replace_all_refs=True) - - # just a check if it really worked - if existing_object.id != __object.id: - raise ValueError("This should NEVER happen. Merging doesn't work.") - - self._map_element(existing_object, from_map=from_map) - - def contains(self, __object: T) -> bool: - return len(self._contained_in_sub(__object)) > 0 - - def _append(self, __object: T, from_map: bool = False): - for attribute, to_sync_with in self.sync_on_append.items(): - pass - to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute)) - - self._map_element(__object, from_map=from_map) - self._data.append(__object) - - def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): - if __object is None: - return - if __object.id in self._contains_ids: - return - - exists_in_collection = self._contained_in_sub(__object) - if len(exists_in_collection) and self is exists_in_collection[0]: - # assuming that the object already is contained in the correct collections - if not already_is_parent: - self._merge_in_self(__object, from_map=from_map) - return - - if not len(exists_in_collection): - self._append(__object, from_map=from_map) - else: - pass - exists_in_collection[0]._merge_in_self(__object, from_map=from_map) - - if not already_is_parent or not self._is_root: - for parent_collection in self._get_parents_of_multiple_contained_children(__object): - pass - parent_collection.append(__object, already_is_parent=True, from_map=from_map) - - def extend(self, __iterable: Optional[Iterable[T]]): - if __iterable is None: - return - - for __object in __iterable: - self.append(__object) - - def sync_with_other_collection(self, equal_collection: "Collection"): - """ - If two collections always need to have the same values, this can be used. - - Internally: - 1. import the data from other to self - - _data - - contained_collections - 2. replace all refs from the other object, with refs from this object - """ - if equal_collection is self: - return - - # don't add the elements from the subelements from the other collection. - # this will be done in the next step. - self.extend(equal_collection._data) - # add all submodules - for equal_sub_collection in equal_collection.contained_collections: - self.contain_collection_inside(equal_sub_collection) - - # now the ugly part - # replace all refs of the other element with this one - self._risky_merge(equal_collection) - - def contain_collection_inside(self, sub_collection: "Collection"): - """ - This collection will ALWAYS contain everything from the passed in collection - """ - if sub_collection in self.contained_collections: - return - - self.contained_collections.append(sub_collection) - sub_collection.upper_collections.append(self) - - @property - def data(self) -> List[T]: - return [*self._data, - *(__object for collection in self.contained_collections for __object in collection.shallow_list)] - - def __len__(self) -> int: - return len(self._data) + sum(len(collection) for collection in self.contained_collections) - - def __iter__(self) -> Iterator[T]: - for element in self._data: - yield element diff --git a/music_kraken/objects/old_collection.py b/music_kraken/objects/old_collection.py deleted file mode 100644 index 4aa8f21..0000000 --- a/music_kraken/objects/old_collection.py +++ /dev/null @@ -1,256 +0,0 @@ -from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type -from collections import defaultdict - -from .parents import DatabaseObject -from ..utils.support_classes.hacking import MetaClass - -T = TypeVar('T', bound=DatabaseObject) - - -class Collection(Generic[T]): - _data: List[T] - - _indexed_values: Dict[str, set] - _indexed_to_objects: Dict[any, list] - - shallow_list = property(fget=lambda self: self.data) - - def __init__( - self, data: Optional[Iterable[T]] = None, - sync_on_append: Dict[str, "Collection"] = None, - contain_given_in_attribute: Dict[str, "Collection"] = None, - contain_attribute_in_given: Dict[str, "Collection"] = None, - append_object_to_attribute: Dict[str, DatabaseObject] = None - ) -> None: - self._contains_ids = set() - self._data = [] - self.upper_collections: List[Collection[T]] = [] - self.contained_collections: List[Collection[T]] = [] - - # List of collection attributes that should be modified on append - # Key: collection attribute (str) of appended element - # Value: main collection to sync to - self.sync_on_append: Dict[str, Collection] = sync_on_append or {} - self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {} - self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {} - self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {} - - self.contain_self_on_append: List[str] = [] - - self._indexed_values = defaultdict(set) - self._indexed_to_objects = defaultdict(list) - - self.extend(data) - - def _map_element(self, __object: T, from_map: bool = False): - self._contains_ids.add(__object.id) - - for name, value in __object.indexing_values: - if value is None: - continue - - self._indexed_values[name].add(value) - self._indexed_to_objects[value].append(__object) - - if not from_map: - for attribute, new_object in self.contain_given_in_attribute.items(): - __object.__getattribute__(attribute).contain_collection_inside(new_object) - - for attribute, new_object in self.contain_given_in_attribute.items(): - new_object.contain_collection_inside(__object.__getattribute__(attribute)) - - for attribute, new_object in self.append_object_to_attribute.items(): - __object.__getattribute__(attribute).append(new_object, from_map=True) - - def _unmap_element(self, __object: T): - self._contains_ids.remove(__object.id) - - for name, value in __object.indexing_values: - if value is None: - continue - if value not in self._indexed_values[name]: - continue - - try: - self._indexed_to_objects[value].remove(__object) - except ValueError: - continue - - if not len(self._indexed_to_objects[value]): - self._indexed_values[name].remove(value) - - def _contained_in_self(self, __object: T) -> bool: - if __object.id in self._contains_ids: - return True - - for name, value in __object.indexing_values: - if value is None: - continue - if value in self._indexed_values[name]: - return True - return False - - def _get_root_collections(self) -> List["Collection"]: - if not len(self.upper_collections): - return [self] - - root_collections = [] - for upper_collection in self.upper_collections: - root_collections.extend(upper_collection._get_root_collections()) - return root_collections - - @property - def _is_root(self) -> bool: - return len(self.upper_collections) <= 0 - - def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]: - results = [] - - if self._contained_in_self(__object): - return [self] - - for collection in self.contained_collections: - results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) - if break_at_first: - return results - - return results - - def _get_parents_of_multiple_contained_children(self, __object: T): - results = [] - if len(self.contained_collections) < 2 or self._contained_in_self(__object): - return results - - count = 0 - - for collection in self.contained_collections: - sub_results = collection._get_parents_of_multiple_contained_children(__object) - - if len(sub_results) > 0: - count += 1 - results.extend(sub_results) - - if count >= 2: - results.append(self) - - return results - - def _merge_in_self(self, __object: T, from_map: bool = False): - """ - 1. find existing objects - 2. merge into existing object - 3. remap existing object - """ - if __object.id in self._contains_ids: - return - - existing_object: DatabaseObject = None - - for name, value in __object.indexing_values: - if value is None: - continue - if value in self._indexed_values[name]: - existing_object = self._indexed_to_objects[value][0] - if existing_object.id == __object.id: - return None - - break - - if existing_object is None: - return None - - existing_object.merge(__object, replace_all_refs=True) - - # just a check if it really worked - if existing_object.id != __object.id: - raise ValueError("This should NEVER happen. Merging doesn't work.") - - self._map_element(existing_object, from_map=from_map) - - def contains(self, __object: T) -> bool: - return len(self._contained_in_sub(__object)) > 0 - - def _append(self, __object: T, from_map: bool = False): - for attribute, to_sync_with in self.sync_on_append.items(): - pass - to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute)) - - self._map_element(__object, from_map=from_map) - self._data.append(__object) - - def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): - if __object is None: - return - if __object.id in self._contains_ids: - return - - exists_in_collection = self._contained_in_sub(__object) - if len(exists_in_collection) and self is exists_in_collection[0]: - # assuming that the object already is contained in the correct collections - if not already_is_parent: - self._merge_in_self(__object, from_map=from_map) - return - - if not len(exists_in_collection): - self._append(__object, from_map=from_map) - else: - pass - exists_in_collection[0]._merge_in_self(__object, from_map=from_map) - - if not already_is_parent or not self._is_root: - for parent_collection in self._get_parents_of_multiple_contained_children(__object): - pass - parent_collection.append(__object, already_is_parent=True, from_map=from_map) - - def extend(self, __iterable: Optional[Iterable[T]]): - if __iterable is None: - return - - for __object in __iterable: - self.append(__object) - - def sync_with_other_collection(self, equal_collection: "Collection"): - """ - If two collections always need to have the same values, this can be used. - - Internally: - 1. import the data from other to self - - _data - - contained_collections - 2. replace all refs from the other object, with refs from this object - """ - if equal_collection is self: - return - - # don't add the elements from the subelements from the other collection. - # this will be done in the next step. - self.extend(equal_collection._data) - # add all submodules - for equal_sub_collection in equal_collection.contained_collections: - self.contain_collection_inside(equal_sub_collection) - - # now the ugly part - # replace all refs of the other element with this one - self._risky_merge(equal_collection) - - def contain_collection_inside(self, sub_collection: "Collection"): - """ - This collection will ALWAYS contain everything from the passed in collection - """ - if sub_collection in self.contained_collections: - return - - self.contained_collections.append(sub_collection) - sub_collection.upper_collections.append(self) - - @property - def data(self) -> List[T]: - return [*self._data, - *(__object for collection in self.contained_collections for __object in collection.shallow_list)] - - def __len__(self) -> int: - return len(self._data) + sum(len(collection) for collection in self.contained_collections) - - def __iter__(self) -> Iterator[T]: - for element in self._data: - yield element \ No newline at end of file diff --git a/music_kraken/objects/parents.py b/music_kraken/objects/parents.py index 80bd237..6385a2d 100644 --- a/music_kraken/objects/parents.py +++ b/music_kraken/objects/parents.py @@ -7,7 +7,7 @@ from functools import lru_cache from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set from .metadata import Metadata -from ..utils import get_unix_time +from ..utils import get_unix_time, object_trace from ..utils.config import logging_settings, main_settings from ..utils.shared import HIGHEST_ID from ..utils.hacking import MetaClass @@ -26,7 +26,11 @@ class InnerData: If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected. """ + _refers_to_instances: set = None + def __init__(self, object_type, **kwargs): + self._refers_to_instances =set() + # initialize the default values self.__default_values = {} for name, factory in object_type._default_factories.items(): @@ -101,6 +105,9 @@ class OuterProxy: self._fetched_from: dict = {} self._inner: InnerData = InnerData(type(self), **kwargs) + self._inner._refers_to_instances.add(self) + + object_trace(f"creating {type(self).__name__} [{self.title_string}]") self.__init_collections__() for name, data_list in collection_data.items(): @@ -174,11 +181,25 @@ class OuterProxy: :return: """ if __other is None: - _ = "debug" return - self._inner.__merge__(__other._inner, override=override) - __other._inner = self._inner + object_trace(f"merging {type(self).__name__} [{self.title_string}] with {type(__other).__name__} [{__other.title_string}]") + + a = self + b = __other + + if a._inner is b._inner: + return + + # switch instances if more efficient + if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): + a, b = b, a + + a._inner.__merge__(b._inner, override=override) + a._inner._refers_to_instances.update(b._inner._refers_to_instances) + + for instance in b._inner._refers_to_instances: + instance._inner = a._inner def mark_as_fetched(self, *url_hash_list: List[str]): for url_hash in url_hash_list: diff --git a/music_kraken/objects/song.py b/music_kraken/objects/song.py index c9c8c96..716694c 100644 --- a/music_kraken/objects/song.py +++ b/music_kraken/objects/song.py @@ -119,7 +119,7 @@ class Song(Base): def indexing_values(self) -> List[Tuple[str, object]]: return [ ('id', self.id), - ('title', self.unified_title), + ('title', unify(self.unified_title)), ('isrc', self.isrc), *[('url', source.url) for source in self.source_collection] ] @@ -244,6 +244,9 @@ class Album(Base): self.song_collection.contain_attribute_in_given = { "main_artist_collection": self.artist_collection } + self.song_collection.append_object_to_attribute = { + "album_collection": self + } def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): if object_type is Song: @@ -265,7 +268,7 @@ class Album(Base): def indexing_values(self) -> List[Tuple[str, object]]: return [ ('id', self.id), - ('title', self.unified_title), + ('title', unify(self.title)), ('barcode', self.barcode), *[('url', source.url) for source in self.source_collection] ] @@ -530,7 +533,7 @@ class Artist(Base): def indexing_values(self) -> List[Tuple[str, object]]: return [ ('id', self.id), - ('name', self.unified_name), + ('name', unify(self.name)), *[('url', source.url) for source in self.source_collection], *[('contact', contact.value) for contact in self.contact_collection] ] @@ -643,7 +646,7 @@ class Label(Base): def indexing_values(self) -> List[Tuple[str, object]]: return [ ('id', self.id), - ('name', self.unified_name), + ('name', unify(self.name)), *[('url', source.url) for source in self.source_collection] ] diff --git a/music_kraken/objects/source.py b/music_kraken/objects/source.py index 3a1fec4..5a8a560 100644 --- a/music_kraken/objects/source.py +++ b/music_kraken/objects/source.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from collections import defaultdict from enum import Enum from typing import List, Dict, Set, Tuple, Optional, Iterable @@ -103,12 +105,23 @@ class Source(OuterProxy): ('audio_url', self.audio_url), ] + def __merge__(self, __other: Source, override: bool = False): + if override: + self.audio_url = __other.audio_url + + if self.audio_url is None or (override and __other.audio_url is not None): + self.audio_url = __other.audio_url + def __str__(self): return self.__repr__() def __repr__(self) -> str: return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})" + @property + def title_string(self) -> str: + return self.url + page_str = property(fget=lambda self: self.page_enum.value) type_str = property(fget=lambda self: self.type_enum.value) homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum)) diff --git a/music_kraken/pages/youtube_music/youtube_music.py b/music_kraken/pages/youtube_music/youtube_music.py index c2913ab..5e82a22 100644 --- a/music_kraken/pages/youtube_music/youtube_music.py +++ b/music_kraken/pages/youtube_music/youtube_music.py @@ -11,6 +11,7 @@ from functools import lru_cache import youtube_dl from youtube_dl.extractor.youtube import YoutubeIE +from youtube_dl.utils import DownloadError from ...utils.exception.config import SettingValueError from ...utils.config import main_settings, youtube_settings, logging_settings @@ -201,6 +202,7 @@ class YoutubeMusic(SuperYouTube): self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self) self.download_values_by_url: dict = {} + self.not_download: Dict[str, DownloadError] = {} def _fetch_from_main_page(self): """ @@ -483,7 +485,13 @@ class YoutubeMusic(SuperYouTube): def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: - ydl_res: dict = self.ydl.extract_info(url=source.url, download=False) + ydl_res: dict = {} + try: + ydl_res: dict = self.ydl.extract_info(url=source.url, download=False) + except DownloadError as e: + self.not_download[source.hash_url] = e + self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}") + return Song() self.fetch_media_url(source=source, ydl_res=ydl_res) @@ -556,17 +564,20 @@ class YoutubeMusic(SuperYouTube): def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: media = self.fetch_media_url(source) - result = self.download_connection.stream_into( - media["url"], - target, - name=desc, - raw_url=True, - raw_headers=True, - disable_cache=True, - headers=media.get("headers", {}), - # chunk_size=media.get("chunk_size", main_settings["chunk_size"]), - method="GET", - ) + if source.hash_url not in self.not_download: + result = self.download_connection.stream_into( + media["url"], + target, + name=desc, + raw_url=True, + raw_headers=True, + disable_cache=True, + headers=media.get("headers", {}), + # chunk_size=media.get("chunk_size", main_settings["chunk_size"]), + method="GET", + ) + else: + result = DownloadResult(error_message=str(self.not_download[source.hash_url])) if result.is_fatal_error: result.merge(super().download_song_to_target(source=source, target=target, desc=desc)) diff --git a/music_kraken/utils/__init__.py b/music_kraken/utils/__init__.py index 6b4754e..96b4379 100644 --- a/music_kraken/utils/__init__.py +++ b/music_kraken/utils/__init__.py @@ -3,7 +3,7 @@ from pathlib import Path import json import logging -from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE +from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE from .config import config, read_config, write_config from .enums.colors import BColors from .path_manager import LOCATIONS @@ -52,6 +52,12 @@ def trace(msg: str): output("trace: " + msg, BColors.OKBLUE) +def object_trace(obj): + if not DEBUG_OBJECT_TRACE: + return + + output("object: " + str(obj), BColors.GREY) + """ misc functions diff --git a/music_kraken/utils/shared.py b/music_kraken/utils/shared.py index cf3cda7..b3f30e5 100644 --- a/music_kraken/utils/shared.py +++ b/music_kraken/utils/shared.py @@ -15,6 +15,7 @@ __stage__ = os.getenv("STAGE", "prod") DEBUG = (__stage__ == "dev") and True DEBUG_LOGGING = DEBUG and False DEBUG_TRACE = DEBUG and True +DEBUG_OBJECT_TRACE = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_PAGES = DEBUG and False DEBUG_DUMP = DEBUG and True diff --git a/music_kraken/utils/string_processing.py b/music_kraken/utils/string_processing.py index f499f9c..570d18e 100644 --- a/music_kraken/utils/string_processing.py +++ b/music_kraken/utils/string_processing.py @@ -16,9 +16,12 @@ def unify(string: str) -> str: """ returns a unified str, to make comparisons easy. a unified string has the following attributes: - - is lowercase + - is lowercase """ + if string is None: + return None + try: string = translit(string, reversed=True) except LanguageDetectionError: diff --git a/tests/test_collection.py b/tests/test_collection.py new file mode 100644 index 0000000..85b1941 --- /dev/null +++ b/tests/test_collection.py @@ -0,0 +1,86 @@ +import unittest + +from music_kraken.objects import Song, Album, Artist, Collection, Country + +class TestCollection(unittest.TestCase): + @staticmethod + def complicated_object() -> Artist: + return Artist( + name="artist", + country=Country.by_alpha_2("DE"), + main_album_list=[ + Album( + title="album", + song_list=[ + Song( + title="song", + album_list=[ + Album(title="album", albumsort=123), + ], + ), + Song( + title="other_song", + album_list=[ + Album(title="album", albumsort=423), + ], + ), + ] + ), + Album(title="album", barcode="1234567890123"), + ] + ) + + def test_song_album_relation(self): + """ + Tests that + album = album.any_song.one_album + is the same object + """ + + a = self.complicated_object().main_album_collection[0] + b = a.song_collection[0].album_collection[0] + c = a.song_collection[1].album_collection[0] + d = b.song_collection[0].album_collection[0] + e = d.song_collection[0].album_collection[0] + f = e.song_collection[0].album_collection[0] + g = f.song_collection[0].album_collection[0] + + self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id) + self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album") + self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123") + self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123) + + d.title = "new_title" + + self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title") + + def test_album_artist_relation(self): + """ + Tests that + artist = artist.any_album.any_song.one_artist + is the same object + """ + + a = self.complicated_object() + b = a.main_album_collection[0].artist_collection[0] + c = b.main_album_collection[0].artist_collection[0] + d = c.main_album_collection[0].artist_collection[0] + + self.assertTrue(a.id == b.id == c.id == d.id) + self.assertTrue(a.name == b.name == c.name == d.name == "artist") + self.assertTrue(a.country == b.country == c.country == d.country) + + """ + def test_song_artist_relations(self): + a = self.complicated_object() + b = a.main_album_collection[0].song_collection[0].main_artist_collection[0] + c = b.main_album_collection[0].song_collection[0].main_artist_collection[0] + d = c.main_album_collection[0].song_collection[0].main_artist_collection[0] + + self.assertTrue(a.id == b.id == c.id == d.id) + self.assertTrue(a.name == b.name == c.name == d.name == "artist") + self.assertTrue(a.country == b.country == c.country == d.country) + """ + +if __name__ == "__main__": + unittest.main()