Compare commits

...

16 Commits

Author SHA1 Message Date
fa9fd5d72b Merge pull request 'fix/collections' (#7) from fix/collections into experimental
Reviewed-on: #7
2024-04-16 11:35:48 +00:00
3f3bb77cc9 fix: remove all none metadata from metadata collection 2024-04-16 13:28:16 +02:00
9addcf1862 fix: raised the recursion limit in debug to 500 2024-04-16 13:23:20 +02:00
eec252cb16 feat: cleaned up the previous fix 2024-04-15 14:12:26 +02:00
a5ede2a6ad fix: if 2 proxies refer to multiple objects the merge unsyncs them causing a recursion depth error 2024-04-15 13:56:40 +02:00
5284c1f55c feat: renamed test file 2024-04-15 13:42:58 +02:00
Hellow
4473ff08d3 feat: tested album artist relation 2024-04-13 15:36:16 +02:00
Hellow
bdbaeceda8 feat: cleaned up stuff 2024-04-13 12:41:36 +02:00
9366eb46fd feat: added first test 2024-04-12 17:58:18 +02:00
7af8bacfab feat: added a far better example 2024-04-12 17:23:44 +02:00
60dc5b2558 fix: syncronization of data objects in more complex constelations 2024-04-12 17:11:17 +02:00
70b86b5c47 feat: catching yt-dl errors 2024-04-12 16:00:51 +02:00
6936c9da9d fix: albums are now also contained in the child song collection album attributes 2024-04-12 14:35:02 +02:00
28e1350b01 fix: albums are now also contained in the child song collection album attributes 2024-04-12 14:34:25 +02:00
3aef267608 fix: albums are now also contained in the child song collection album attributes 2024-04-12 14:33:33 +02:00
29e2b4616e fix: dynamic unified title 2024-04-12 14:14:10 +02:00
18 changed files with 270 additions and 541 deletions

View File

@ -19,9 +19,13 @@
"APIC",
"Bandcamp",
"dotenv",
"encyclopaedia",
"levenshtein",
"metallum",
"musify",
"OKBLUE",
"Referer",
"tracksort"
"tracksort",
"youtube"
]
}

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Toxoplasma",
"d: 16",
"s: #a And End...",
"d: 10",
]

View File

@ -0,0 +1,92 @@
import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
other_artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="other_artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
artist.merge(other_artist)
a = artist.main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
d.title = "new_title"
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
print(artist.main_album_collection._indexed_values)

View File

@ -46,7 +46,7 @@ init_logging()
from . import cli
if DEBUG:
sys.setrecursionlimit(100)
sys.setrecursionlimit(500)
if main_settings['modify_gc']:

View File

@ -29,6 +29,8 @@ class AudioMetadata:
"""
https://www.programcreek.com/python/example/84797/mutagen.id3.ID3
"""
if value is None:
continue
self.frames.add(value)
def add_song_metadata(self, song: Song):

View File

@ -110,7 +110,7 @@ class Collection(Generic[T]):
if self._contained_in_self(__object):
return [self]
for collection in self.children:
for collection in (*self.children, *self.parents):
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:

View File

@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
@property
def metadata(self) -> Metadata:
return Metadata({
id3Mapping.UNSYNCED_LYRICS: self.text.html
id3Mapping.UNSYNCED_LYRICS: [self.text.html]
})

View File

@ -1,257 +0,0 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
from .parents import OuterProxy
T = TypeVar('T', bound=OuterProxy)
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, T] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -1,256 +0,0 @@
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
from collections import defaultdict
from .parents import DatabaseObject
from ..utils.support_classes.hacking import MetaClass
T = TypeVar('T', bound=DatabaseObject)
class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data)
def __init__(
self, data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, "Collection"] = None,
contain_given_in_attribute: Dict[str, "Collection"] = None,
contain_attribute_in_given: Dict[str, "Collection"] = None,
append_object_to_attribute: Dict[str, DatabaseObject] = None
) -> None:
self._contains_ids = set()
self._data = []
self.upper_collections: List[Collection[T]] = []
self.contained_collections: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
self.contain_self_on_append: List[str] = []
self._indexed_values = defaultdict(set)
self._indexed_to_objects = defaultdict(list)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
self._indexed_values[name].add(value)
self._indexed_to_objects[value].append(__object)
if not from_map:
for attribute, new_object in self.contain_given_in_attribute.items():
__object.__getattribute__(attribute).contain_collection_inside(new_object)
for attribute, new_object in self.contain_given_in_attribute.items():
new_object.contain_collection_inside(__object.__getattribute__(attribute))
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, from_map=True)
def _unmap_element(self, __object: T):
self._contains_ids.remove(__object.id)
for name, value in __object.indexing_values:
if value is None:
continue
if value not in self._indexed_values[name]:
continue
try:
self._indexed_to_objects[value].remove(__object)
except ValueError:
continue
if not len(self._indexed_to_objects[value]):
self._indexed_values[name].remove(value)
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
return True
return False
def _get_root_collections(self) -> List["Collection"]:
if not len(self.upper_collections):
return [self]
root_collections = []
for upper_collection in self.upper_collections:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.upper_collections) <= 0
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.contained_collections:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.contained_collections:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def _merge_in_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
return
existing_object: DatabaseObject = None
for name, value in __object.indexing_values:
if value is None:
continue
if value in self._indexed_values[name]:
existing_object = self._indexed_to_objects[value][0]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object, replace_all_refs=True)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _append(self, __object: T, from_map: bool = False):
for attribute, to_sync_with in self.sync_on_append.items():
pass
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
self._map_element(__object, from_map=from_map)
self._data.append(__object)
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
if __object is None:
return
if __object.id in self._contains_ids:
return
exists_in_collection = self._contained_in_sub(__object)
if len(exists_in_collection) and self is exists_in_collection[0]:
# assuming that the object already is contained in the correct collections
if not already_is_parent:
self._merge_in_self(__object, from_map=from_map)
return
if not len(exists_in_collection):
self._append(__object, from_map=from_map)
else:
pass
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
if not already_is_parent or not self._is_root:
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
pass
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
def extend(self, __iterable: Optional[Iterable[T]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
def sync_with_other_collection(self, equal_collection: "Collection"):
"""
If two collections always need to have the same values, this can be used.
Internally:
1. import the data from other to self
- _data
- contained_collections
2. replace all refs from the other object, with refs from this object
"""
if equal_collection is self:
return
# don't add the elements from the subelements from the other collection.
# this will be done in the next step.
self.extend(equal_collection._data)
# add all submodules
for equal_sub_collection in equal_collection.contained_collections:
self.contain_collection_inside(equal_sub_collection)
# now the ugly part
# replace all refs of the other element with this one
self._risky_merge(equal_collection)
def contain_collection_inside(self, sub_collection: "Collection"):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if sub_collection in self.contained_collections:
return
self.contained_collections.append(sub_collection)
sub_collection.upper_collections.append(self)
@property
def data(self) -> List[T]:
return [*self._data,
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
def __iter__(self) -> Iterator[T]:
for element in self._data:
yield element

View File

@ -7,7 +7,7 @@ from functools import lru_cache
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
from .metadata import Metadata
from ..utils import get_unix_time
from ..utils import get_unix_time, object_trace
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.hacking import MetaClass
@ -26,7 +26,11 @@ class InnerData:
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
"""
_refers_to_instances: set = None
def __init__(self, object_type, **kwargs):
self._refers_to_instances =set()
# initialize the default values
self.__default_values = {}
for name, factory in object_type._default_factories.items():
@ -101,6 +105,9 @@ class OuterProxy:
self._fetched_from: dict = {}
self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
self.__init_collections__()
for name, data_list in collection_data.items():
@ -174,11 +181,25 @@ class OuterProxy:
:return:
"""
if __other is None:
_ = "debug"
return
self._inner.__merge__(__other._inner, override=override)
__other._inner = self._inner
object_trace(f"merging {type(self).__name__} [{self.title_string}] with {type(__other).__name__} [{__other.title_string}]")
a = self
b = __other
if a._inner is b._inner:
return
# switch instances if more efficient
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
a._inner.__merge__(b._inner, override=override)
a._inner._refers_to_instances.update(b._inner._refers_to_instances)
for instance in b._inner._refers_to_instances:
instance._inner = a._inner
def mark_as_fetched(self, *url_hash_list: List[str]):
for url_hash in url_hash_list:

View File

@ -119,7 +119,7 @@ class Song(Base):
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', self.unified_title),
('title', unify(self.unified_title)),
('isrc', self.isrc),
*[('url', source.url) for source in self.source_collection]
]
@ -244,6 +244,9 @@ class Album(Base):
self.song_collection.contain_attribute_in_given = {
"main_artist_collection": self.artist_collection
}
self.song_collection.append_object_to_attribute = {
"album_collection": self
}
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
@ -265,7 +268,7 @@ class Album(Base):
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', self.unified_title),
('title', unify(self.title)),
('barcode', self.barcode),
*[('url', source.url) for source in self.source_collection]
]
@ -530,7 +533,7 @@ class Artist(Base):
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', self.unified_name),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection],
*[('contact', contact.value) for contact in self.contact_collection]
]
@ -643,7 +646,7 @@ class Label(Base):
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', self.unified_name),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection]
]

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable
@ -103,12 +105,23 @@ class Source(OuterProxy):
('audio_url', self.audio_url),
]
def __merge__(self, __other: Source, override: bool = False):
if override:
self.audio_url = __other.audio_url
if self.audio_url is None or (override and __other.audio_url is not None):
self.audio_url = __other.audio_url
def __str__(self):
return self.__repr__()
def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
@property
def title_string(self) -> str:
return self.url
page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))

View File

@ -11,6 +11,7 @@ from functools import lru_cache
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
@ -201,6 +202,7 @@ class YoutubeMusic(SuperYouTube):
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
self.download_values_by_url: dict = {}
self.not_download: Dict[str, DownloadError] = {}
def _fetch_from_main_page(self):
"""
@ -483,7 +485,13 @@ class YoutubeMusic(SuperYouTube):
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
ydl_res: dict = {}
try:
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
except DownloadError as e:
self.not_download[source.hash_url] = e
self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}")
return Song()
self.fetch_media_url(source=source, ydl_res=ydl_res)
@ -556,17 +564,20 @@ class YoutubeMusic(SuperYouTube):
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
media = self.fetch_media_url(source)
result = self.download_connection.stream_into(
media["url"],
target,
name=desc,
raw_url=True,
raw_headers=True,
disable_cache=True,
headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
method="GET",
)
if source.hash_url not in self.not_download:
result = self.download_connection.stream_into(
media["url"],
target,
name=desc,
raw_url=True,
raw_headers=True,
disable_cache=True,
headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
method="GET",
)
else:
result = DownloadResult(error_message=str(self.not_download[source.hash_url]))
if result.is_fatal_error:
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))

View File

@ -3,7 +3,7 @@ from pathlib import Path
import json
import logging
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
@ -52,6 +52,12 @@ def trace(msg: str):
output("trace: " + msg, BColors.OKBLUE)
def object_trace(obj):
if not DEBUG_OBJECT_TRACE:
return
output("object: " + str(obj), BColors.GREY)
"""
misc functions

View File

@ -15,6 +15,7 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and True

View File

@ -16,9 +16,12 @@ def unify(string: str) -> str:
"""
returns a unified str, to make comparisons easy.
a unified string has the following attributes:
- is lowercase
- is lowercase
"""
if string is None:
return None
try:
string = translit(string, reversed=True)
except LanguageDetectionError:

86
tests/test_collection.py Normal file
View File

@ -0,0 +1,86 @@
import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase):
@staticmethod
def complicated_object() -> Artist:
return Artist(
name="artist",
country=Country.by_alpha_2("DE"),
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
]
)
def test_song_album_relation(self):
"""
Tests that
album = album.any_song.one_album
is the same object
"""
a = self.complicated_object().main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
if __name__ == "__main__":
unittest.main()