diff --git a/.vscode/settings.json b/.vscode/settings.json index 662ba25..64b7f98 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,6 +16,7 @@ }, "python.formatting.provider": "none", "cSpell.words": [ + "albumsort", "APIC", "Bandcamp", "dotenv", @@ -28,9 +29,11 @@ "pathvalidate", "Referer", "sponsorblock", + "tracklist", "tracksort", "translit", "unmap", - "youtube" + "youtube", + "youtubei" ] } \ No newline at end of file diff --git a/development/actual_donwload.py b/development/actual_donwload.py index a8eb732..c821734 100644 --- a/development/actual_donwload.py +++ b/development/actual_donwload.py @@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG) if __name__ == "__main__": commands = [ - "s: #a Crystal F", - "d: 20", + "s: #a Psychonaut 4", + "d: 0" ] diff --git a/development/objects_collection.py b/development/objects_collection.py index 642bb18..893e2c5 100644 --- a/development/objects_collection.py +++ b/development/objects_collection.py @@ -2,30 +2,24 @@ import music_kraken from music_kraken.objects import Song, Album, Artist, Collection if __name__ == "__main__": - album_1 = Album( - title="album", - song_list=[ - Song(title="song", main_artist_list=[Artist(name="artist")]), - ], - artist_list=[ - Artist(name="artist 3"), - ] + song_1 = Song( + title="song", + feature_artist_list=[Artist( + name="main_artist" + )] ) - album_2 = Album( - title="album", - song_list=[ - Song(title="song", main_artist_list=[Artist(name="artist 2")]), - ], - artist_list=[ - Artist(name="artist"), - ] + other_artist = Artist(name="other_artist") + + song_2 = Song( + title = "song", + main_artist_list=[other_artist] ) - album_1.merge(album_2) + other_artist.name = "main_artist" - print() - print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ") + song_1.merge(song_2) - print(id(album_1.artist_collection), id(album_2.artist_collection)) - print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection)) \ No newline at end of file + print("#" * 120) + print("main", *song_1.main_artist_collection) + print("feat", *song_1.feature_artist_collection) diff --git a/music_kraken/audio/metadata.py b/music_kraken/audio/metadata.py index 1d37419..1431112 100644 --- a/music_kraken/audio/metadata.py +++ b/music_kraken/audio/metadata.py @@ -79,7 +79,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song): with temp_target.open("wb") as f: f.write(r.content) - converted_target: Target = Target.temp(name=f"{song.title}.jpeg") + converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}") with Image.open(temp_target.file_path) as img: # crop the image if it isn't square in the middle with minimum data loss width, height = img.size diff --git a/music_kraken/objects/artwork.py b/music_kraken/objects/artwork.py index 43ea87e..d5ba54b 100644 --- a/music_kraken/objects/artwork.py +++ b/music_kraken/objects/artwork.py @@ -53,9 +53,9 @@ class Artwork: def get_variant_name(self, variant: ArtworkVariant) -> str: return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}" - def __merge__(self, other: Artwork, override: bool = False) -> None: + def __merge__(self, other: Artwork, **kwargs) -> None: for key, value in other._variant_mapping.items(): - if key not in self._variant_mapping or override: + if key not in self._variant_mapping: self._variant_mapping[key] = value def __eq__(self, other: Artwork) -> bool: diff --git a/music_kraken/objects/collection.py b/music_kraken/objects/collection.py index 02bff19..b8b2d4a 100644 --- a/music_kraken/objects/collection.py +++ b/music_kraken/objects/collection.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections import defaultdict -from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any +from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set from .parents import OuterProxy from ..utils import object_trace +from ..utils import output, BColors T = TypeVar('T', bound=OuterProxy) @@ -13,8 +14,8 @@ class Collection(Generic[T]): _data: List[T] - _indexed_values: Dict[str, set] - _indexed_to_objects: Dict[any, list] + _indexed_from_id: Dict[int, Dict[str, Any]] + _indexed_values: Dict[str, Dict[Any, T]] shallow_list = property(fget=lambda self: self.data) @@ -36,9 +37,9 @@ class Collection(Generic[T]): self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {} + self.pull_from: List[Collection] = [] + self.push_to: List[Collection] = [] - self._id_to_index_values: Dict[int, set] = defaultdict(set) - # This is to cleanly unmap previously mapped items by their id self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) # this is to keep track and look up the actual objects @@ -47,10 +48,11 @@ class Collection(Generic[T]): self.extend(data) def __repr__(self) -> str: - return f"Collection({id(self)})" + return f"Collection({' | '.join(self._collection_for.values())} {id(self)})" - def _map_element(self, __object: T, from_map: bool = False): - self._unmap_element(__object.id) + def _map_element(self, __object: T, no_unmap: bool = False, **kwargs): + if not no_unmap: + self._unmap_element(__object.id) self._indexed_from_id[__object.id]["id"] = __object.id self._indexed_values["id"][__object.id] = __object @@ -74,73 +76,129 @@ class Collection(Generic[T]): del self._indexed_from_id[obj_id] - def _find_object(self, __object: T) -> Optional[T]: + def _remap(self): + # reinitialize the mapping to clean it without time consuming operations + self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) + self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict) + + for e in self._data: + self._map_element(e, no_unmap=True) + + + def _find_object(self, __object: T, **kwargs) -> Optional[T]: + self._remap() + + if __object.id in self._indexed_from_id: + return self._indexed_values["id"][__object.id] + for name, value in __object.indexing_values: if value in self._indexed_values[name]: return self._indexed_values[name][value] - def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): + return None + + def _append_new_object(self, other: T, **kwargs): + """ + This function appends the other object to the current collection. + This only works if not another object, which represents the same real life object exists in the collection. + """ + + self._data.append(other) + + # all of the existing hooks to get the defined datastructure + for collection_attribute, generator in self.extend_object_to_attribute.items(): + other.__getattribute__(collection_attribute).extend(generator, **kwargs) + + for attribute, new_object in self.append_object_to_attribute.items(): + other.__getattribute__(attribute).append(new_object, **kwargs) + + for attribute, a in self.sync_on_append.items(): + # syncing two collections by reference + b = other.__getattribute__(attribute) + if a is b: + continue + + object_trace(f"Syncing [{a}] = [{b}]") + + b_data = b.data.copy() + b_collection_for = b._collection_for.copy() + + del b + + for synced_with, key in b_collection_for.items(): + synced_with.__setattr__(key, a) + a._collection_for[synced_with] = key + + a.extend(b_data, **kwargs) + + def append(self, other: Optional[T], **kwargs): """ If an object, that represents the same entity exists in a relevant collection, merge into this object. (and remap) Else append to this collection. - :param __object: - :param already_is_parent: - :param from_map: + :param other: :return: """ - if __object is None: + if other is None: + return + if other.id in self._indexed_from_id: return - existing_object = self._find_object(__object) + object_trace(f"Appending {other.option_string} to {self}") + + for c in self.pull_from: + r = c._find_object(other) + if r is not None: + output("found pull from", r, other, self, color=BColors.RED, sep="\t") + other.merge(r, **kwargs) + c.remove(r, existing=r, **kwargs) + break + + existing_object = self._find_object(other) + + # switching collection in the case of push to + for c in self.push_to: + r = c._find_object(other) + if r is not None: + output("found push to", r, other, self, color=BColors.RED, sep="\t") + return c.append(other, **kwargs) + if existing_object is None: - # append - self._data.append(__object) - self._map_element(__object) + self._append_new_object(other, **kwargs) + else: + existing_object.merge(other, **kwargs) - for collection_attribute, child_collection in self.extend_object_to_attribute.items(): - __object.__getattribute__(collection_attribute).extend(child_collection) + def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs): + for other in other_list: + existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None) + if existing is None: + if not silent: + raise ValueError(f"Object {other} not found in {self}") + return other + + """ + for collection_attribute, generator in self.extend_object_to_attribute.items(): + other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs) for attribute, new_object in self.append_object_to_attribute.items(): - __object.__getattribute__(attribute).append(new_object) + other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs) + """ - # only modify collections if the object actually has been appended - for attribute, a in self.sync_on_append.items(): - b = __object.__getattribute__(attribute) - object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]") + self._data.remove(existing) + self._unmap_element(existing) - data_to_extend = b.data + def contains(self, __object: T) -> bool: + return self._find_object(__object) is not None - a._collection_for.update(b._collection_for) - for synced_with, key in b._collection_for.items(): - synced_with.__setattr__(key, a) - - a.extend(data_to_extend) - - - else: - # merge only if the two objects are not the same - if existing_object.id == __object.id: - return - - old_id = existing_object.id - - existing_object.merge(__object) - - if existing_object.id != old_id: - self._unmap_element(old_id) - - self._map_element(existing_object) - - def extend(self, __iterable: Optional[Generator[T, None, None]]): - if __iterable is None: + def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs): + if other_collections is None: return - for __object in __iterable: - self.append(__object) + for other_object in other_collections: + self.append(other_object, **kwargs) @property def data(self) -> List[T]: @@ -156,8 +214,9 @@ class Collection(Generic[T]): def __iter__(self) -> Iterator[T]: yield from self._data - def __merge__(self, __other: Collection, override: bool = False): - self.extend(__other) + def __merge__(self, other: Collection, **kwargs): + object_trace(f"merging {str(self)} | {str(other)}") + self.extend(other, **kwargs) def __getitem__(self, item: int): return self._data[item] @@ -166,3 +225,9 @@ class Collection(Generic[T]): if item >= len(self._data): return default return self._data[item] + + def __eq__(self, other: Collection) -> bool: + if self.empty and other.empty: + return True + + return self._data == other._data diff --git a/music_kraken/objects/parents.py b/music_kraken/objects/parents.py index 59a3d10..a79887a 100644 --- a/music_kraken/objects/parents.py +++ b/music_kraken/objects/parents.py @@ -9,9 +9,9 @@ from pathlib import Path import inspect from .metadata import Metadata -from ..utils import get_unix_time, object_trace +from ..utils import get_unix_time, object_trace, generate_id from ..utils.config import logging_settings, main_settings -from ..utils.shared import HIGHEST_ID +from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID from ..utils.hacking import MetaClass LOGGER = logging_settings["object_logger"] @@ -29,6 +29,9 @@ class InnerData: """ _refers_to_instances: set = None + """ + Attribute versions keep track, of if the attribute has been changed. + """ def __init__(self, object_type, **kwargs): self._refers_to_instances = set() @@ -42,21 +45,28 @@ class InnerData: for key, value in kwargs.items(): if hasattr(value, "__is_collection__"): value._collection_for[self] = key + self.__setattr__(key, value) def __hash__(self): return self.id - def __merge__(self, __other: InnerData, override: bool = False): + def __merge__(self, __other: InnerData, **kwargs): """ :param __other: - :param override: :return: """ self._fetched_from.update(__other._fetched_from) for key, value in __other.__dict__.copy().items(): + if key.startswith("_"): + continue + + if hasattr(value, "__is_collection__") and key in self.__dict__: + self.__getattribute__(key).__merge__(value, **kwargs) + continue + # just set the other value if self doesn't already have it if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)): self.__setattr__(key, value) @@ -64,13 +74,8 @@ class InnerData: # if the object of value implemented __merge__, it merges existing = self.__getattribute__(key) - if hasattr(type(existing), "__merge__"): - existing.__merge__(value, override) - continue - - # override the existing value if requested - if override: - self.__setattr__(key, value) + if hasattr(existing, "__merge__"): + existing.__merge__(value, **kwargs) class OuterProxy: @@ -84,8 +89,6 @@ class OuterProxy: DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() - TITEL = "id" - def __init__(self, _id: int = None, dynamic: bool = False, **kwargs): _automatic_id: bool = False @@ -94,7 +97,7 @@ class OuterProxy: generates a random integer id the range is defined in the config """ - _id = random.randint(0, HIGHEST_ID) + _id = generate_id() _automatic_id = True kwargs["automatic_id"] = _automatic_id @@ -116,7 +119,7 @@ class OuterProxy: self._inner: InnerData = InnerData(type(self), **kwargs) self._inner._refers_to_instances.add(self) - object_trace(f"creating {type(self).__name__} [{self.title_string}]") + object_trace(f"creating {type(self).__name__} [{self.option_string}]") self.__init_collections__() @@ -173,13 +176,12 @@ class OuterProxy: def __eq__(self, other: Any): return self.__hash__() == other.__hash__() - def merge(self, __other: Optional[OuterProxy], override: bool = False): + def merge(self, __other: Optional[OuterProxy], **kwargs): """ 1. merges the data of __other in self 2. replaces the data of __other with the data of self :param __other: - :param override: :return: """ if __other is None: @@ -196,7 +198,7 @@ class OuterProxy: if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): a, b = b, a - object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]") + object_trace(f"merging {a.option_string} | {b.option_string}") old_inner = b._inner @@ -204,11 +206,11 @@ class OuterProxy: instance._inner = a._inner a._inner._refers_to_instances.add(instance) - a._inner.__merge__(old_inner, override=override) + a._inner.__merge__(old_inner, **kwargs) del old_inner - def __merge__(self, __other: Optional[OuterProxy], override: bool = False): - self.merge(__other, override) + def __merge__(self, __other: Optional[OuterProxy], **kwargs): + self.merge(__other, **kwargs) def mark_as_fetched(self, *url_hash_list: List[str]): for url_hash in url_hash_list: @@ -235,7 +237,23 @@ class OuterProxy: @property def options(self) -> List[P]: - return [self] + r = [] + + for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES: + r.extend(self.__getattribute__(collection_string_attribute)) + + r.append(self) + + for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES: + r.extend(self.__getattribute__(collection_string_attribute)) + + return r + + @property + def option_string(self) -> str: + return self.title_string + + INDEX_DEPENDS_ON: List[str] = [] @property def indexing_values(self) -> List[Tuple[str, object]]: @@ -267,9 +285,10 @@ class OuterProxy: return r + TITEL = "id" @property def title_string(self) -> str: - return str(self.__getattribute__(self.TITEL)) + return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") def __repr__(self): return f"{type(self).__name__}({self.title_string})" diff --git a/music_kraken/objects/song.py b/music_kraken/objects/song.py index be6d751..9f9ba7e 100644 --- a/music_kraken/objects/song.py +++ b/music_kraken/objects/song.py @@ -22,6 +22,7 @@ from .parents import OuterProxy, P from .source import Source, SourceCollection from .target import Target from .country import Language, Country +from ..utils.shared import DEBUG_PRINT_ID from ..utils.string_processing import unify from .parents import OuterProxy as Base @@ -43,7 +44,8 @@ def get_collection_string( template: str, ignore_titles: Set[str] = None, background: BColors = OPTION_BACKGROUND, - foreground: BColors = OPTION_FOREGROUND + foreground: BColors = OPTION_FOREGROUND, + add_id: bool = DEBUG_PRINT_ID, ) -> str: if collection.empty: return "" @@ -55,8 +57,15 @@ def get_collection_string( r = background + def get_element_str(element) -> str: + nonlocal add_id + r = element.title_string.strip() + if add_id and False: + r += " " + str(element.id) + return r + element: Base - titel_list: List[str] = [element.title_string.strip() for element in collection if element.title_string not in ignore_titles] + titel_list: List[str] = [get_element_str(element) for element in collection if element.title_string not in ignore_titles] for i, titel in enumerate(titel_list): delimiter = ", " @@ -117,7 +126,7 @@ class Song(Base): Base.__init__(**locals()) - UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection") + UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection") TITEL = "title" def __init_collections__(self) -> None: @@ -135,6 +144,9 @@ class Song(Base): "feature_song_collection": self } + self.feature_artist_collection.push_to = [self.main_artist_collection] + self.main_artist_collection.pull_from = [self.feature_artist_collection] + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): if object_type is Song: return @@ -144,20 +156,21 @@ class Song(Base): return if isinstance(object_list, Artist): - self.main_artist_collection.extend(object_list) + self.feature_artist_collection.extend(object_list) return if isinstance(object_list, Album): self.album_collection.extend(object_list) return + INDEX_DEPENDS_ON = ("title", "isrc", "source_collection") + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('title', unify(self.title)), ('isrc', self.isrc), - *[('url', source.url) for source in self.source_collection] + *self.source_collection.indexing_values(), ] @property @@ -169,6 +182,8 @@ class Song(Base): id3Mapping.GENRE: [self.genre], id3Mapping.TRACKNUMBER: [self.tracksort_str], id3Mapping.COMMENT: [self.note.markdown], + id3Mapping.FILE_WEBPAGE_URL: self.source_collection.url_list, + id3Mapping.SOURCE_WEBPAGE_URL: self.source_collection.homepage_list, }) # metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3 @@ -189,7 +204,7 @@ class Song(Base): @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.feature_artist_collection, " feat. {}") @@ -269,7 +284,7 @@ class Album(Base): **kwargs) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",) - UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection") + UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") def __init_collections__(self): self.song_collection.append_object_to_attribute = { @@ -302,13 +317,14 @@ class Album(Base): self.label_collection.extend(object_list) return + INDEX_DEPENDS_ON = ("title", "barcode", "source_collection") + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('title', unify(self.title)), ('barcode', self.barcode), - *[('url', source.url) for source in self.source_collection] + *self.source_collection.indexing_values(), ] @property @@ -333,19 +349,13 @@ class Album(Base): @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.label_collection, " under {}") if len(self.song_collection) > 0: r += f" with {len(self.song_collection)} songs" return r - - @property - def options(self) -> List[P]: - options = [*self.artist_collection, self, *self.song_collection] - - return options def update_tracksort(self): """ @@ -372,18 +382,6 @@ class Album(Base): tracksort_map[i] = existing_list.pop(0) tracksort_map[i].tracksort = i - def compile(self, merge_into: bool = False): - """ - compiles the recursive structures, - and does depending on the object some other stuff. - - no need to override if only the recursive structure should be built. - override self.build_recursive_structures() instead - """ - - self.update_tracksort() - self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into) - @property def copyright(self) -> str: if self.date is None: @@ -429,7 +427,7 @@ class Artist(Base): lyrical_themes: List[str] general_genre: str - unformated_location: str + unformatted_location: str source_collection: SourceCollection contact_collection: Collection[Contact] @@ -442,7 +440,7 @@ class Artist(Base): "name": str, "unified_name": lambda: None, "country": lambda: None, - "unformated_location": lambda: None, + "unformatted_location": lambda: None, "formed_in": ID3Timestamp, "notes": FormattedText, @@ -461,17 +459,17 @@ class Artist(Base): # This is automatically generated def __init__(self, name: str = "", unified_name: str = None, country: Country = None, formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None, - general_genre: str = None, unformated_location: str = None, source_list: List[Source] = None, + general_genre: str = None, unformatted_location: str = None, source_list: List[Source] = None, contact_list: List[Contact] = None, feature_song_list: List[Song] = None, main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None: - + super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes, lyrical_themes=lyrical_themes, general_genre=general_genre, - unformated_location=unformated_location, source_list=source_list, contact_list=contact_list, + unformatted_location=unformatted_location, source_list=source_list, contact_list=contact_list, feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list, **kwargs) - DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("feature_song_collection", "main_album_collection") + DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) def __init_collections__(self): @@ -504,12 +502,6 @@ class Artist(Base): self.label_collection.extend(object_list) return - @property - def options(self) -> List[P]: - options = [self, *self.main_album_collection.shallow_list, *self.feature_album] - print(options) - return options - def update_albumsort(self): """ This updates the albumsort attributes, of the albums in @@ -567,40 +559,27 @@ class Artist(Base): # replace the old collection with the new one self.main_album_collection: Collection = Collection(data=album_list, element_type=Album) + INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('name', unify(self.name)), - *[('url', source.url) for source in self.source_collection], - *[('contact', contact.value) for contact in self.contact_collection] + *[('contact', contact.value) for contact in self.contact_collection], + *self.source_collection.indexing_values(), ] @property def metadata(self) -> Metadata: metadata = Metadata({ - id3Mapping.ARTIST: [self.name] + id3Mapping.ARTIST: [self.name], + id3Mapping.ARTIST_WEBPAGE_URL: self.source_collection.url_list, }) - metadata.merge_many([s.get_artist_metadata() for s in self.source_collection]) return metadata - """ - def __str__(self, include_notes: bool = False): - string = self.name or "" - if include_notes: - plaintext_notes = self.notes.get_plaintext() - if plaintext_notes is not None: - string += "\n" + plaintext_notes - return string - """ - - def __repr__(self): - return f"Artist(\"{self.name}\")" - @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.label_collection, " under {}") r += OPTION_BACKGROUND.value @@ -613,43 +592,6 @@ class Artist(Base): return r - @property - def options(self) -> List[P]: - options = [self] - options.extend(self.main_album_collection) - options.extend(self.feature_song_collection) - return options - - @property - def feature_album(self) -> Album: - return Album( - title="features", - album_status=AlbumStatus.UNRELEASED, - album_type=AlbumType.COMPILATION_ALBUM, - is_split=True, - albumsort=666, - dynamic=True, - song_list=self.feature_song_collection.shallow_list - ) - - def get_all_songs(self) -> List[Song]: - """ - returns a list of all Songs. - probably not that useful, because it is unsorted - """ - collection = self.feature_song_collection.copy() - for album in self.discography: - collection.extend(album.song_collection) - - return collection - - @property - def discography(self) -> List[Album]: - flat_copy_discography = self.main_album_collection.copy() - flat_copy_discography.append(self.feature_album) - - return flat_copy_discography - """ Label @@ -702,7 +644,6 @@ class Label(Base): @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('name', unify(self.name)), *[('url', source.url) for source in self.source_collection] ] diff --git a/music_kraken/objects/source.py b/music_kraken/objects/source.py index bb2e9e3..ff68d6a 100644 --- a/music_kraken/objects/source.py +++ b/music_kraken/objects/source.py @@ -2,142 +2,176 @@ from __future__ import annotations from collections import defaultdict from enum import Enum -from typing import List, Dict, Set, Tuple, Optional, Iterable -from urllib.parse import urlparse +from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator +from urllib.parse import urlparse, ParseResult +from dataclasses import dataclass, field +from functools import cached_property +from ..utils import generate_id from ..utils.enums.source import SourcePages, SourceTypes from ..utils.config import youtube_settings -from ..utils.string_processing import hash_url +from ..utils.string_processing import hash_url, shorten_display_url from .metadata import Mapping, Metadata from .parents import OuterProxy from .collection import Collection -class Source(OuterProxy): - url: str +@dataclass +class Source: page_enum: SourcePages - referer_page: SourcePages + url: str + referrer_page: SourcePages = None + audio_url: Optional[str] = None - audio_url: str + additional_data: dict = field(default_factory=dict) - _default_factories = { - "audio_url": lambda: None, - } - - # This is automatically generated - def __init__(self, page_enum: SourcePages, url: str, referer_page: SourcePages = None, audio_url: str = None, - **kwargs) -> None: - - if referer_page is None: - referer_page = page_enum - - super().__init__(url=url, page_enum=page_enum, referer_page=referer_page, audio_url=audio_url, **kwargs) + def __post_init__(self): + self.referrer_page = self.referrer_page or self.page_enum + + @property + def parsed_url(self) -> ParseResult: + return urlparse(self.url) @classmethod - def match_url(cls, url: str, referer_page: SourcePages) -> Optional["Source"]: + def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]: """ - this shouldn't be used, unlesse you are not certain what the source is for + this shouldn't be used, unless you are not certain what the source is for the reason is that it is more inefficient """ - parsed = urlparse(url) - url = parsed.geturl() + parsed_url = urlparse(url) + url = parsed_url.geturl() - if "musify" in parsed.netloc: - return cls(SourcePages.MUSIFY, url, referer_page=referer_page) + if "musify" in parsed_url.netloc: + return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page) - if parsed.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: - return cls(SourcePages.YOUTUBE, url, referer_page=referer_page) + if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: + return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page) if url.startswith("https://www.deezer"): - return cls(SourcePages.DEEZER, url, referer_page=referer_page) + return cls(SourcePages.DEEZER, url, referrer_page=referrer_page) if url.startswith("https://open.spotify.com"): - return cls(SourcePages.SPOTIFY, url, referer_page=referer_page) + return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page) if "bandcamp" in url: - return cls(SourcePages.BANDCAMP, url, referer_page=referer_page) + return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page) - if "wikipedia" in parsed.netloc: - return cls(SourcePages.WIKIPEDIA, url, referer_page=referer_page) + if "wikipedia" in parsed_url.netloc: + return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page) if url.startswith("https://www.metal-archives.com/"): - return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referer_page=referer_page) + return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page) # the less important once if url.startswith("https://www.facebook"): - return cls(SourcePages.FACEBOOK, url, referer_page=referer_page) + return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page) if url.startswith("https://www.instagram"): - return cls(SourcePages.INSTAGRAM, url, referer_page=referer_page) + return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page) if url.startswith("https://twitter"): - return cls(SourcePages.TWITTER, url, referer_page=referer_page) + return cls(SourcePages.TWITTER, url, referrer_page=referrer_page) if url.startswith("https://myspace.com"): - return cls(SourcePages.MYSPACE, url, referer_page=referer_page) - - def get_song_metadata(self) -> Metadata: - return Metadata({ - Mapping.FILE_WEBPAGE_URL: [self.url], - Mapping.SOURCE_WEBPAGE_URL: [self.homepage] - }) - - def get_artist_metadata(self) -> Metadata: - return Metadata({ - Mapping.ARTIST_WEBPAGE_URL: [self.url] - }) + return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page) @property def hash_url(self) -> str: return hash_url(self.url) @property - def metadata(self) -> Metadata: - return self.get_song_metadata() - - @property - def indexing_values(self) -> List[Tuple[str, object]]: - return [ - ('id', self.id), - ('url', self.url), - ('audio_url', self.audio_url), - ] - - def __str__(self): - return self.__repr__() + def indexing_values(self) -> list: + r = [hash_url(self.url)] + if self.audio_url: + r.append(hash_url(self.audio_url)) + return r def __repr__(self) -> str: - return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})" + return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})" - @property - def title_string(self) -> str: - return self.url + def __merge__(self, other: Source, **kwargs): + if self.audio_url is None: + self.audio_url = other.audio_url + self.additional_data.update(other.additional_data) page_str = property(fget=lambda self: self.page_enum.value) - type_str = property(fget=lambda self: self.type_enum.value) - homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum)) -class SourceCollection(Collection): +class SourceCollection: + __change_version__ = generate_id() + + _indexed_sources: Dict[str, Source] + _page_to_source_list: Dict[SourcePages, List[Source]] + def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): - self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list) + self._page_to_source_list = defaultdict(list) + self._indexed_sources = {} - super().__init__(data=data, **kwargs) + self.extend(data or []) - def _map_element(self, __object: Source, **kwargs): - super()._map_element(__object, **kwargs) + def has_source_page(self, *source_pages: SourcePages) -> bool: + return any(source_page in self._page_to_source_list for source_page in source_pages) - self._page_to_source_list[__object.page_enum].append(__object) + def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: + if not len(source_pages): + source_pages = self.source_pages + + for page in source_pages: + yield from self._page_to_source_list[page] + + def append(self, source: Source): + if source is None: + return + + existing_source = None + for key in source.indexing_values: + if key in self._indexed_sources: + existing_source = self._indexed_sources[key] + break + + if existing_source is not None: + existing_source.__merge__(source) + source = existing_source + else: + self._page_to_source_list[source.page_enum].append(source) + + changed = False + for key in source.indexing_values: + if key not in self._indexed_sources: + changed = True + self._indexed_sources[key] = source + + if changed: + self.__change_version__ = generate_id() + + def extend(self, sources: Iterable[Source]): + for source in sources: + self.append(source) + + def __iter__(self): + yield from self.get_sources() + + def __merge__(self, other: SourceCollection, **kwargs): + self.extend(other) @property - def source_pages(self) -> Set[SourcePages]: - return set(source.page_enum for source in self._data) + def source_pages(self) -> Iterable[SourcePages]: + return sorted(self._page_to_source_list.keys(), key=lambda page: page.value) - def get_sources_from_page(self, source_page: SourcePages) -> List[Source]: - """ - getting the sources for a specific page like - YouTube or musify - """ - return self._page_to_source_list[source_page].copy() + @property + def hash_url_list(self) -> List[str]: + return [hash_url(source.url) for source in self.get_sources()] + + @property + def url_list(self) -> List[str]: + return [source.url for source in self.get_sources()] + + @property + def homepage_list(self) -> List[str]: + return [source.homepage for source in self.source_pages] + + def indexing_values(self) -> Generator[Tuple[str, str], None, None]: + for index in self._indexed_sources: + yield "url", index \ No newline at end of file diff --git a/music_kraken/pages/abstract.py b/music_kraken/pages/abstract.py index 468067b..0ea15db 100644 --- a/music_kraken/pages/abstract.py +++ b/music_kraken/pages/abstract.py @@ -89,52 +89,6 @@ class NamingDict(dict): return self.default_value_for_name(attribute_name) -def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - if type(music_object) == Label: - return _clean_label(label=music_object, collections=collections) - if type(music_object) == Artist: - return _clean_artist(artist=music_object, collections=collections) - if type(music_object) == Album: - return _clean_album(album=music_object, collections=collections) - if type(music_object) == Song: - return _clean_song(song=music_object, collections=collections) - - -def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]): - if collection.element_type not in collection_dict: - return - - for i, element in enumerate(collection): - r = collection_dict[collection.element_type].append(element, merge_into_existing=True) - collection[i] = r.current_element - - if not r.was_the_same: - _clean_music_object(r.current_element, collection_dict) - - -def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(label.current_artist_collection, collections) - _clean_collection(label.album_collection, collections) - - -def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(artist.main_album_collection, collections) - _clean_collection(artist.feature_song_collection, collections) - _clean_collection(artist.label_collection, collections) - - -def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(album.label_collection, collections) - _clean_collection(album.song_collection, collections) - _clean_collection(album.artist_collection, collections) - - -def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(song.album_collection, collections) - _clean_collection(song.feature_artist_collection, collections) - _clean_collection(song.main_artist_collection, collections) - - class Page: """ This is an abstract class, laying out the @@ -246,7 +200,7 @@ class Page: # only certain database objects, have a source list if isinstance(music_object, INDEPENDENT_DB_OBJECTS): source: Source - for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE): + for source in music_object.source_collection.get_sources(self.SOURCE_TYPE): if music_object.already_fetched_from(source.hash_url): continue @@ -419,9 +373,10 @@ class Page: if song.target_collection.empty: song.target_collection.append(new_target) - sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) - if len(sources) == 0: - return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.") + if not song.source_collection.has_source_page(self.SOURCE_TYPE): + return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.") + + sources = song.source_collection.get_sources(self.SOURCE_TYPE) temp_target: Target = Target( relative_to_music_dir=False, @@ -448,14 +403,19 @@ class Page: self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.") return r - source = sources[0] - if not found_on_disc: - r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) + for source in sources: + r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) - if not r.is_fatal_error: - r.merge(self._post_process_targets(song, temp_target, - [] if found_on_disc else self.get_skip_intervals(song, source))) + if not r.is_fatal_error: + break + + if temp_target.exists: + r.merge(self._post_process_targets( + song=song, + temp_target=temp_target, + interval_list=[] if found_on_disc else self.get_skip_intervals(song, source) + )) return r diff --git a/music_kraken/pages/bandcamp.py b/music_kraken/pages/bandcamp.py index 90064db..dcfebbf 100644 --- a/music_kraken/pages/bandcamp.py +++ b/music_kraken/pages/bandcamp.py @@ -185,7 +185,7 @@ class Bandcamp(Page): if li is None and li['href'] is not None: continue - source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE)) return Artist( name=name, diff --git a/music_kraken/pages/encyclopaedia_metallum.py b/music_kraken/pages/encyclopaedia_metallum.py index d9ce0ca..dba4527 100644 --- a/music_kraken/pages/encyclopaedia_metallum.py +++ b/music_kraken/pages/encyclopaedia_metallum.py @@ -486,7 +486,7 @@ class EncyclopaediaMetallum(Page): href = anchor["href"] if href is not None: - source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(href, referrer_page=self.SOURCE_TYPE)) # The following code is only legacy code, which I just kep because it doesn't harm. # The way ma returns sources changed. @@ -504,7 +504,7 @@ class EncyclopaediaMetallum(Page): if url is None: continue - source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(url, referrer_page=self.SOURCE_TYPE)) return source_list diff --git a/music_kraken/pages/musify.py b/music_kraken/pages/musify.py index 28ac0a9..5f1b7aa 100644 --- a/music_kraken/pages/musify.py +++ b/music_kraken/pages/musify.py @@ -503,7 +503,7 @@ class Musify(Page): source_list.append(Source( SourcePages.YOUTUBE, iframe["src"], - referer_page=self.SOURCE_TYPE + referrer_page=self.SOURCE_TYPE )) return Song( @@ -690,13 +690,6 @@ class Musify(Page): new_song = self._parse_song_card(card_soup) album.song_collection.append(new_song) - if stop_at_level > 1: - song: Song - for song in album.song_collection: - sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) - for source in sources: - song.merge(self.fetch_song(source=source)) - album.update_tracksort() return album @@ -812,7 +805,7 @@ class Musify(Page): href = additional_source.get("href") if href is None: continue - new_src = Source.match_url(href, referer_page=self.SOURCE_TYPE) + new_src = Source.match_url(href, referrer_page=self.SOURCE_TYPE) if new_src is None: continue source_list.append(new_src) diff --git a/music_kraken/pages/youtube_music/_list_render.py b/music_kraken/pages/youtube_music/_list_render.py index 8076e54..bb6f40b 100644 --- a/music_kraken/pages/youtube_music/_list_render.py +++ b/music_kraken/pages/youtube_music/_list_render.py @@ -25,7 +25,6 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]: results.extend(parse_renderer(sub_renderer)) return results - def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]: return parse_run_list(renderer.get("text", {}).get("runs", [])) @@ -54,19 +53,11 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]: for result in results: _map[type(result)].append(result) - for song in song_list: + if len(song_list) == 1: + song = song_list[0] + song.feature_artist_collection.extend(artist_list) song.album_collection.extend(album_list) - song.main_artist_collection.extend(artist_list) - - for album in album_list: - album.artist_collection.extend(artist_list) - - if len(song_list) > 0: - return song_list - if len(album_list) > 0: - return album_list - if len(artist_list) > 0: - return artist_list + return [song] return results diff --git a/music_kraken/pages/youtube_music/_music_object_render.py b/music_kraken/pages/youtube_music/_music_object_render.py index f10d11a..831d50d 100644 --- a/music_kraken/pages/youtube_music/_music_object_render.py +++ b/music_kraken/pages/youtube_music/_music_object_render.py @@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: _temp_nav = run_element.get("navigationEndpoint", {}) is_video = "watchEndpoint" in _temp_nav - navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {}) + navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {})) element_type = PageType.SONG page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "") @@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: except ValueError: return - element_id = navigation_endpoint.get("videoId" if is_video else "browseId") + element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId")) element_text = run_element.get("text") if element_id is None or element_text is None: @@ -60,7 +60,11 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]): source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}") - return Song(title=clean_song_title(element_text), source_list=[source]) + + return Song( + title=clean_song_title(element_text), + source_list=[source] + ) if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]): source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}") diff --git a/music_kraken/pages/youtube_music/youtube_music.py b/music_kraken/pages/youtube_music/youtube_music.py index 6ecbeaf..bbb8d22 100644 --- a/music_kraken/pages/youtube_music/youtube_music.py +++ b/music_kraken/pages/youtube_music/youtube_music.py @@ -8,6 +8,7 @@ import json from dataclasses import dataclass import re from functools import lru_cache +from collections import defaultdict import youtube_dl from youtube_dl.extractor.youtube import YoutubeIE @@ -17,7 +18,7 @@ from ...utils.exception.config import SettingValueError from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.string_processing import clean_song_title -from ...utils import get_current_millis +from ...utils import get_current_millis, traverse_json_path from ...utils import dump_to_file @@ -30,12 +31,16 @@ from ...objects import ( Song, Album, Label, - Target + Target, + Lyrics, + FormattedText ) from ...connection import Connection +from ...utils.enums.album import AlbumType from ...utils.support_classes.download_result import DownloadResult from ._list_render import parse_renderer +from ._music_object_render import parse_run_element from .super_youtube import SuperYouTube @@ -162,6 +167,12 @@ class MusicKrakenYoutubeIE(YoutubeIE): +ALBUM_TYPE_MAP = { + "Single": AlbumType.SINGLE, + "Album": AlbumType.STUDIO_ALBUM, + "EP": AlbumType.EP, +} + class YoutubeMusic(SuperYouTube): # CHANGE @@ -401,7 +412,7 @@ class YoutubeMusic(SuperYouTube): return results def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: - artist = Artist() + artist = Artist(source_list=[source]) # construct the request url = urlparse(source.url) @@ -421,6 +432,19 @@ class YoutubeMusic(SuperYouTube): if DEBUG: dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) + # artist details + data: dict = r.json() + header = data.get("header", {}) + musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) + + title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) + subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) + + if len(title_runs) > 0: + artist.name = title_runs[0].get("text", artist.name) + + + # fetch discography renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) @@ -465,6 +489,46 @@ class YoutubeMusic(SuperYouTube): if DEBUG: dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) + data = r.json() + + # album details + header = data.get("header", {}) + musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) + + title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) + subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) + + if len(title_runs) > 0: + album.title = title_runs[0].get("text", album.title) + + def other_parse_run(run: dict) -> str: + nonlocal album + + if "text" not in run: + return + text = run["text"] + + is_text_field = len(run.keys()) == 1 + + # regex that text is a year + if is_text_field and re.match(r"\d{4}", text): + album.date = ID3Timestamp.strptime(text, "%Y") + return + + if text in ALBUM_TYPE_MAP: + album.album_type = ALBUM_TYPE_MAP[text] + return + + if not is_text_field: + r = parse_run_element(run) + if r is not None: + album.add_list_of_other_objects([r]) + return + + for _run in subtitle_runs: + other_parse_run(_run) + + # tracklist renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) @@ -472,20 +536,67 @@ class YoutubeMusic(SuperYouTube): for i, content in enumerate(renderer_list): dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False) - results = [] - - """ - cant use fixed indices, because if something has no entries, the list dissappears - instead I have to try parse everything, and just reject community playlists and profiles. - """ for renderer in renderer_list: - results.extend(parse_renderer(renderer)) + album.add_list_of_other_objects(parse_renderer(renderer)) - album.add_list_of_other_objects(results) + for song in album.song_collection: + for song_source in song.source_collection: + song_source.additional_data["playlist_id"] = browse_id return album + def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str: + request_data = { + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, + "videoId": video_id, + } + if playlist_id is not None: + request_data["playlistId"] = playlist_id + + tab_request = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"), + json=request_data, + name=f"fetch_song_tabs_{video_id}.json", + ) + + if tab_request is None: + return None + + dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False) + + tab_data: dict = tab_request.json() + + tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[]) + browse_id = None + for tab in tabs: + pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="") + if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower(): + browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None) + break + + if browse_id is None: + return None + + + r = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"), + json={ + "browseId": browse_id, + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}} + }, + name=f"fetch_song_lyrics_{video_id}.json" + ) + + dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False) + + data = r.json() + lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None) + if lyrics_text is None: + return None + + return Lyrics(FormattedText(plain=lyrics_text)) + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: ydl_res: dict = {} @@ -498,7 +609,19 @@ class YoutubeMusic(SuperYouTube): self.fetch_media_url(source=source, ydl_res=ydl_res) - artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic") + artist_names = [] + uploader = ydl_res.get("uploader", "") + if uploader.endswith(" - Topic"): + artist_names = [uploader.rstrip(" - Topic")] + + artist_list = [ + Artist( + name=name, + source_list=[Source( + SourcePages.YOUTUBE_MUSIC, + f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" + )] + ) for name in artist_names] album_list = [] if "album" in ydl_res: @@ -507,25 +630,57 @@ class YoutubeMusic(SuperYouTube): date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"), )) - return Song( + artist_name = artist_names[0] if len(artist_names) > 0 else None + song = Song( title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)), note=ydl_res.get("descriptions"), album_list=album_list, length=int(ydl_res.get("duration", 0)) * 1000, artwork=Artwork(*ydl_res.get("thumbnails", [])), - main_artist_list=[Artist( - name=artist_name, - source_list=[Source( - SourcePages.YOUTUBE_MUSIC, - f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" - )] - )], + main_artist_list=artist_list, source_list=[Source( SourcePages.YOUTUBE_MUSIC, f"https://music.youtube.com/watch?v={ydl_res.get('id')}" ), source], ) + # other song details + parsed_url = urlparse(source.url) + browse_id = parse_qs(parsed_url.query)['v'][0] + request_data = { + "captionParams": {}, + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, + "videoId": browse_id, + } + if "playlist_id" in source.additional_data: + request_data["playlistId"] = source.additional_data["playlist_id"] + + initial_details = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"), + json=request_data, + name=f"fetch_song_{browse_id}.json", + ) + + if initial_details is None: + return song + + dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False) + + data = initial_details.json() + video_details = data.get("videoDetails", {}) + + browse_id = video_details.get("videoId", browse_id) + song.title = video_details.get("title", song.title) + if video_details.get("isLiveContent", False): + for album in song.album_list: + album.album_type = AlbumType.LIVE_ALBUM + for thumbnail in video_details.get("thumbnails", []): + song.artwork.append(**thumbnail) + + song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId"))) + + return song + def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict: def _get_best_format(format_list: List[Dict]) -> dict: diff --git a/music_kraken/utils/__init__.py b/music_kraken/utils/__init__.py index 9226441..e85fa1a 100644 --- a/music_kraken/utils/__init__.py +++ b/music_kraken/utils/__init__.py @@ -3,24 +3,30 @@ from pathlib import Path import json import logging import inspect +from typing import List, Union from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK from .config import config, read_config, write_config from .enums.colors import BColors from .path_manager import LOCATIONS +from .hacking import merge_args """ IO functions """ def _apply_color(msg: str, color: BColors) -> str: + if not isinstance(msg, str): + msg = str(msg) + if color is BColors.ENDC: return msg return color.value + msg + BColors.ENDC.value -def output(msg: str, color: BColors = BColors.ENDC): - print(_apply_color(msg, color)) +@merge_args(print) +def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs): + print(*(_apply_color(s, color) for s in msg), **kwargs) def user_input(msg: str, color: BColors = BColors.ENDC): @@ -71,6 +77,43 @@ def object_trace(obj): misc functions """ +def traverse_json_path(data, path: Union[str, List[str]], default=None): + """ + Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices. + """ + + if isinstance(path, str): + path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".") + path = [p for p in path.split(".") if len(p) > 0] + + if len(path) <= 0: + return data + + current = path[0] + path = path[1:] + + new_data = None + + if isinstance(data, dict): + new_data = data.get(current) + + elif isinstance(data, list): + try: + new_data = data[int(current)] + except (IndexError, ValueError): + pass + + if new_data is None: + return default + + return traverse_json_path(data=new_data, path=path, default=default) + +_auto_increment = 0 +def generate_id() -> int: + global _auto_increment + _auto_increment += 1 + return _auto_increment + def get_current_millis() -> int: dt = datetime.now() return int(dt.microsecond / 1_000) diff --git a/music_kraken/utils/enums/source.py b/music_kraken/utils/enums/source.py index a5e213e..be3171f 100644 --- a/music_kraken/utils/enums/source.py +++ b/music_kraken/utils/enums/source.py @@ -9,42 +9,32 @@ class SourceTypes(Enum): class SourcePages(Enum): - YOUTUBE = "youtube" - MUSIFY = "musify" - YOUTUBE_MUSIC = "youtube music" - GENIUS = "genius" - MUSICBRAINZ = "musicbrainz" + YOUTUBE = "youtube", "https://www.youtube.com/" + MUSIFY = "musify", "https://musify.club/" + YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/" + GENIUS = "genius", "https://genius.com/" + MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/" ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum" - BANDCAMP = "bandcamp" - DEEZER = "deezer" - SPOTIFY = "spotify" + BANDCAMP = "bandcamp", "https://bandcamp.com/" + DEEZER = "deezer", "https://www.deezer.com/" + SPOTIFY = "spotify", "https://open.spotify.com/" # This has nothing to do with audio, but bands can be here - WIKIPEDIA = "wikipedia" - INSTAGRAM = "instagram" - FACEBOOK = "facebook" - TWITTER = "twitter" # I will use nitter though lol - MYSPACE = "myspace" # Yes somehow this ancient site is linked EVERYWHERE + WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page" + INSTAGRAM = "instagram", "https://www.instagram.com/" + FACEBOOK = "facebook", "https://www.facebook.com/" + TWITTER = "twitter", "https://twitter.com/" + MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE - MANUAL = "manual" + MANUAL = "manual", "" - PRESET = "preset" + PRESET = "preset", "" - @classmethod - def get_homepage(cls, attribute) -> str: - homepage_map = { - cls.YOUTUBE: "https://www.youtube.com/", - cls.MUSIFY: "https://musify.club/", - cls.MUSICBRAINZ: "https://musicbrainz.org/", - cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/", - cls.GENIUS: "https://genius.com/", - cls.BANDCAMP: "https://bandcamp.com/", - cls.DEEZER: "https://www.deezer.com/", - cls.INSTAGRAM: "https://www.instagram.com/", - cls.FACEBOOK: "https://www.facebook.com/", - cls.SPOTIFY: "https://open.spotify.com/", - cls.TWITTER: "https://twitter.com/", - cls.MYSPACE: "https://myspace.com/", - cls.WIKIPEDIA: "https://en.wikipedia.org/wiki/Main_Page" - } - return homepage_map[attribute] \ No newline at end of file + def __new__(cls, value, homepage = None): + member = object.__new__(cls) + + member._value_ = value + member.homepage = homepage + + return member + \ No newline at end of file diff --git a/music_kraken/utils/hacking.py b/music_kraken/utils/hacking.py index e68356e..0e949d8 100644 --- a/music_kraken/utils/hacking.py +++ b/music_kraken/utils/hacking.py @@ -78,7 +78,14 @@ def _merge( drop_args = [] if drop_kwonlyargs is None: drop_kwonlyargs = [] - source_spec = inspect.getfullargspec(source) + + is_builtin = False + try: + source_spec = inspect.getfullargspec(source) + except TypeError: + is_builtin = True + source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], []) + dest_spec = inspect.getfullargspec(dest) if source_spec.varargs or source_spec.varkw: @@ -128,13 +135,15 @@ def _merge( 'co_kwonlyargcount': len(kwonlyargs_merged), 'co_posonlyargcount': dest.__code__.co_posonlyargcount, 'co_nlocals': len(args_all), - 'co_flags': source.__code__.co_flags, 'co_varnames': args_all, 'co_filename': dest.__code__.co_filename, 'co_name': dest.__code__.co_name, 'co_firstlineno': dest.__code__.co_firstlineno, } + if hasattr(source, "__code__"): + replace_kwargs['co_flags'] = source.__code__.co_flags + if PY310: replace_kwargs['co_linetable'] = dest.__code__.co_linetable else: @@ -151,7 +160,7 @@ def _merge( len(kwonlyargs_merged), _blank.__code__.co_nlocals, _blank.__code__.co_stacksize, - source.__code__.co_flags, + source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags, _blank.__code__.co_code, (), (), args_all, dest.__code__.co_filename, dest.__code__.co_name, @@ -171,6 +180,9 @@ def _merge( dest_ret = dest.__annotations__['return'] for v in ('__kwdefaults__', '__annotations__'): + if not hasattr(source, v): + continue + out = getattr(source, v) if out is None: out = {} diff --git a/music_kraken/utils/shared.py b/music_kraken/utils/shared.py index a2b06b8..8f671f9 100644 --- a/music_kraken/utils/shared.py +++ b/music_kraken/utils/shared.py @@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_PAGES = DEBUG and False DEBUG_DUMP = DEBUG and False +DEBUG_PRINT_ID = DEBUG and True if DEBUG: print("DEBUG ACTIVE") diff --git a/music_kraken/utils/string_processing.py b/music_kraken/utils/string_processing.py index 9acd3c8..22ae63e 100644 --- a/music_kraken/utils/string_processing.py +++ b/music_kraken/utils/string_processing.py @@ -6,6 +6,7 @@ from functools import lru_cache from transliterate.exceptions import LanguageDetectionError from transliterate import translit from pathvalidate import sanitize_filename +from urllib.parse import urlparse, ParseResult, parse_qs COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( @@ -21,6 +22,7 @@ def unify(string: str) -> str: returns a unified str, to make comparisons easy. a unified string has the following attributes: - is lowercase + - is transliterated to Latin characters from e.g. Cyrillic """ if string is None: @@ -30,8 +32,9 @@ def unify(string: str) -> str: string = translit(string, reversed=True) except LanguageDetectionError: pass - - return string.lower() + + string = unify_punctuation(string) + return string.lower().strip() def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: @@ -49,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni string = string[1:] string = string.replace("/", "_").replace("\\", "_") + + try: + string = translit(string, reversed=True) + except LanguageDetectionError: + pass + string = sanitize_filename(string) + return string if isinstance(string, Path): @@ -127,13 +137,45 @@ UNIFY_TO = " " ALLOWED_LENGTH_DISTANCE = 20 -def unify_punctuation(to_unify: str) -> str: +def unify_punctuation(to_unify: str, unify_to: str = UNIFY_TO) -> str: for char in string.punctuation: - to_unify = to_unify.replace(char, UNIFY_TO) + to_unify = to_unify.replace(char, unify_to) return to_unify -def hash_url(url: str) -> int: - return url.strip().lower().lstrip("https://").lstrip("http://") +@lru_cache(maxsize=128) +def hash_url(url: Union[str, ParseResult]) -> str: + if isinstance(url, str): + url = urlparse(url) + + unify_to = "-" + + def unify_part(part: str) -> str: + nonlocal unify_to + return unify_punctuation(part.lower(), unify_to=unify_to).strip(unify_to) + + # netloc + netloc = unify_part(url.netloc) + if netloc.startswith("www" + unify_to): + netloc = netloc[3 + len(unify_to):] + + # query + query = url.query + query_dict: Optional[dict] = None + try: + query_dict: dict = parse_qs(url.query, strict_parsing=True) + except ValueError: + # the query couldn't be parsed + pass + + if isinstance(query_dict, dict): + # sort keys alphabetically + query = "" + for key, value in sorted(query_dict.items(), key=lambda i: i[0]): + query += f"{key.strip()}-{''.join(i.strip() for i in value)}" + + r = f"{netloc}_{unify_part(url.path)}_{unify_part(query)}" + r = r.lower().strip() + return r def remove_feature_part_from_track(title: str) -> str: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_hash_url.py b/tests/test_hash_url.py new file mode 100644 index 0000000..f87b2ff --- /dev/null +++ b/tests/test_hash_url.py @@ -0,0 +1,35 @@ +import unittest + +from music_kraken.utils.string_processing import hash_url + + +class TestCollection(unittest.TestCase): + def test_remove_schema(self): + self.assertFalse(hash_url("https://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("ftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("sftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("http://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + + def test_no_punctuation(self): + self.assertNotIn(hash_url("https://www.you_tube.com/watch?v=3jZ_D3ELwOQ"), "you_tube") + self.assertNotIn(hash_url("https://docs.gitea.com/next/install.ation/comparison"), ".") + + def test_three_parts(self): + """ + The url is parsed into three parts [netloc; path; query] + Which are then appended to each other with an underscore between. + """ + + self.assertTrue(hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web").count("_") == 2) + + def test_sort_query(self): + """ + The query is sorted alphabetically + """ + hashed = hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web") + sorted_keys = ["ia-", "q-", "t-"] + + self.assertTrue(hashed.index(sorted_keys[0]) < hashed.index(sorted_keys[1]) < hashed.index(sorted_keys[2])) + +if __name__ == "__main__": + unittest.main()