Merge pull request 'fix/collections' (#7) from fix/collections into experimental

Reviewed-on: #7
This commit is contained in:
Hazel 2024-04-16 11:35:48 +00:00
commit fa9fd5d72b
18 changed files with 270 additions and 541 deletions

View File

@ -19,9 +19,13 @@
"APIC", "APIC",
"Bandcamp", "Bandcamp",
"dotenv", "dotenv",
"encyclopaedia",
"levenshtein", "levenshtein",
"metallum",
"musify",
"OKBLUE", "OKBLUE",
"Referer", "Referer",
"tracksort" "tracksort",
"youtube"
] ]
} }

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Toxoplasma", "s: #a And End...",
"d: 16", "d: 10",
] ]

View File

@ -0,0 +1,92 @@
import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
other_artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="other_artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
artist.merge(other_artist)
a = artist.main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
d.title = "new_title"
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
print(artist.main_album_collection._indexed_values)

View File

@ -46,7 +46,7 @@ init_logging()
from . import cli from . import cli
if DEBUG: if DEBUG:
sys.setrecursionlimit(100) sys.setrecursionlimit(500)
if main_settings['modify_gc']: if main_settings['modify_gc']:

View File

@ -29,6 +29,8 @@ class AudioMetadata:
""" """
https://www.programcreek.com/python/example/84797/mutagen.id3.ID3 https://www.programcreek.com/python/example/84797/mutagen.id3.ID3
""" """
if value is None:
continue
self.frames.add(value) self.frames.add(value)
def add_song_metadata(self, song: Song): def add_song_metadata(self, song: Song):

View File

@ -110,7 +110,7 @@ class Collection(Generic[T]):
if self._contained_in_self(__object): if self._contained_in_self(__object):
return [self] return [self]
for collection in self.children: for collection in (*self.children, *self.parents):
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first)) results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first: if break_at_first:

View File

@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
@property @property
def metadata(self) -> Metadata: def metadata(self) -> Metadata:
return Metadata({ return Metadata({
id3Mapping.UNSYNCED_LYRICS: self.text.html id3Mapping.UNSYNCED_LYRICS: [self.text.html]
}) })

View File

@ -1,257 +0,0 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
from .parents import OuterProxy
T = TypeVar('T', bound=OuterProxy)
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, T] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -1,256 +0,0 @@
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
from .parents import DatabaseObject
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self, data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, DatabaseObject] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -7,7 +7,7 @@ from functools import lru_cache
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time from ..utils import get_unix_time, object_trace
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID from ..utils.shared import HIGHEST_ID
from ..utils.hacking import MetaClass from ..utils.hacking import MetaClass
@ -26,7 +26,11 @@ class InnerData:
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected. If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
""" """
_refers_to_instances: set = None
def __init__(self, object_type, **kwargs): def __init__(self, object_type, **kwargs):
self._refers_to_instances =set()
# initialize the default values # initialize the default values
self.__default_values = {} self.__default_values = {}
for name, factory in object_type._default_factories.items(): for name, factory in object_type._default_factories.items():
@ -101,6 +105,9 @@ class OuterProxy:
self._fetched_from: dict = {} self._fetched_from: dict = {}
self._inner: InnerData = InnerData(type(self), **kwargs) self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
self.__init_collections__() self.__init_collections__()
for name, data_list in collection_data.items(): for name, data_list in collection_data.items():
@ -174,11 +181,25 @@ class OuterProxy:
:return: :return:
""" """
if __other is None: if __other is None:
_ = "debug"
return return
self._inner.__merge__(__other._inner, override=override) object_trace(f"merging {type(self).__name__} [{self.title_string}] with {type(__other).__name__} [{__other.title_string}]")
__other._inner = self._inner
a = self
b = __other
if a._inner is b._inner:
return
# switch instances if more efficient
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
a._inner.__merge__(b._inner, override=override)
a._inner._refers_to_instances.update(b._inner._refers_to_instances)
for instance in b._inner._refers_to_instances:
instance._inner = a._inner
def mark_as_fetched(self, *url_hash_list: List[str]): def mark_as_fetched(self, *url_hash_list: List[str]):
for url_hash in url_hash_list: for url_hash in url_hash_list:

View File

@ -119,7 +119,7 @@ class Song(Base):
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id), ('id', self.id),
('title', self.unified_title), ('title', unify(self.unified_title)),
('isrc', self.isrc), ('isrc', self.isrc),
*[('url', source.url) for source in self.source_collection] *[('url', source.url) for source in self.source_collection]
] ]
@ -244,6 +244,9 @@ class Album(Base):
self.song_collection.contain_attribute_in_given = { self.song_collection.contain_attribute_in_given = {
"main_artist_collection": self.artist_collection "main_artist_collection": self.artist_collection
} }
self.song_collection.append_object_to_attribute = {
"album_collection": self
}
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
@ -265,7 +268,7 @@ class Album(Base):
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id), ('id', self.id),
('title', self.unified_title), ('title', unify(self.title)),
('barcode', self.barcode), ('barcode', self.barcode),
*[('url', source.url) for source in self.source_collection] *[('url', source.url) for source in self.source_collection]
] ]
@ -530,7 +533,7 @@ class Artist(Base):
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id), ('id', self.id),
('name', self.unified_name), ('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection], *[('url', source.url) for source in self.source_collection],
*[('contact', contact.value) for contact in self.contact_collection] *[('contact', contact.value) for contact in self.contact_collection]
] ]
@ -643,7 +646,7 @@ class Label(Base):
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id), ('id', self.id),
('name', self.unified_name), ('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection] *[('url', source.url) for source in self.source_collection]
] ]

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable from typing import List, Dict, Set, Tuple, Optional, Iterable
@ -103,12 +105,23 @@ class Source(OuterProxy):
('audio_url', self.audio_url), ('audio_url', self.audio_url),
] ]
def __merge__(self, __other: Source, override: bool = False):
if override:
self.audio_url = __other.audio_url
if self.audio_url is None or (override and __other.audio_url is not None):
self.audio_url = __other.audio_url
def __str__(self): def __str__(self):
return self.__repr__() return self.__repr__()
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})" return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
@property
def title_string(self) -> str:
return self.url
page_str = property(fget=lambda self: self.page_enum.value) page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value) type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum)) homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))

View File

@ -11,6 +11,7 @@ from functools import lru_cache
import youtube_dl import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError
from ...utils.exception.config import SettingValueError from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.config import main_settings, youtube_settings, logging_settings
@ -201,6 +202,7 @@ class YoutubeMusic(SuperYouTube):
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self) self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
self.download_values_by_url: dict = {} self.download_values_by_url: dict = {}
self.not_download: Dict[str, DownloadError] = {}
def _fetch_from_main_page(self): def _fetch_from_main_page(self):
""" """
@ -483,7 +485,13 @@ class YoutubeMusic(SuperYouTube):
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = {}
try:
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False) ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
except DownloadError as e:
self.not_download[source.hash_url] = e
self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}")
return Song()
self.fetch_media_url(source=source, ydl_res=ydl_res) self.fetch_media_url(source=source, ydl_res=ydl_res)
@ -556,6 +564,7 @@ class YoutubeMusic(SuperYouTube):
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
media = self.fetch_media_url(source) media = self.fetch_media_url(source)
if source.hash_url not in self.not_download:
result = self.download_connection.stream_into( result = self.download_connection.stream_into(
media["url"], media["url"],
target, target,
@ -567,6 +576,8 @@ class YoutubeMusic(SuperYouTube):
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]), # chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
method="GET", method="GET",
) )
else:
result = DownloadResult(error_message=str(self.not_download[source.hash_url]))
if result.is_fatal_error: if result.is_fatal_error:
result.merge(super().download_song_to_target(source=source, target=target, desc=desc)) result.merge(super().download_song_to_target(source=source, target=target, desc=desc))

View File

@ -3,7 +3,7 @@ from pathlib import Path
import json import json
import logging import logging
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .path_manager import LOCATIONS from .path_manager import LOCATIONS
@ -52,6 +52,12 @@ def trace(msg: str):
output("trace: " + msg, BColors.OKBLUE) output("trace: " + msg, BColors.OKBLUE)
def object_trace(obj):
if not DEBUG_OBJECT_TRACE:
return
output("object: " + str(obj), BColors.GREY)
""" """
misc functions misc functions

View File

@ -15,6 +15,7 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and True DEBUG_DUMP = DEBUG and True

View File

@ -19,6 +19,9 @@ def unify(string: str) -> str:
- is lowercase - is lowercase
""" """
if string is None:
return None
try: try:
string = translit(string, reversed=True) string = translit(string, reversed=True)
except LanguageDetectionError: except LanguageDetectionError:

86
tests/test_collection.py Normal file
View File

@ -0,0 +1,86 @@
import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase):
@staticmethod
def complicated_object() -> Artist:
return Artist(
name="artist",
country=Country.by_alpha_2("DE"),
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
def test_song_album_relation(self):
"""
Tests that
album = album.any_song.one_album
is the same object
"""
a = self.complicated_object().main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
if __name__ == "__main__":
unittest.main()