diff --git a/.vscode/settings.json b/.vscode/settings.json index 662ba25..d8e7fc7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,11 +16,13 @@ }, "python.formatting.provider": "none", "cSpell.words": [ + "albumsort", "APIC", "Bandcamp", "dotenv", "encyclopaedia", "ENDC", + "Gitea", "levenshtein", "metallum", "musify", @@ -28,9 +30,11 @@ "pathvalidate", "Referer", "sponsorblock", + "tracklist", "tracksort", "translit", "unmap", - "youtube" + "youtube", + "youtubei" ] } \ No newline at end of file diff --git a/.woodpecker.yml b/.woodpecker.yml index f751a41..a584f8e 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -11,7 +11,6 @@ steps: build-stable: image: python commands: - - sed -i 's/name = "music-kraken"/name = "music-kraken-stable"/' pyproject.toml - python -m pip install -r requirements-dev.txt - python3 -m build environment: diff --git a/README.md b/README.md index bc14167..c7b3590 100644 --- a/README.md +++ b/README.md @@ -2,61 +2,43 @@ [![Woodpecker CI Status](https://ci.elara.ws/api/badges/59/status.svg)](https://ci.elara.ws/repos/59) - + -- [Music Kraken](#music-kraken) - - [Installation](#installation) - - [From source](#from-source) - - [Notes for WSL](#notes-for-wsl) - - [Quick-Guide](#quick-guide) - - [Query](#query) - - [CONTRIBUTE](#contribute) - - [Matrix Space](#matrix-space) - - [TODO till the next release](#todo-till-the-next-release) -- [Programming Interface / Use as Library](#programming-interface--use-as-library) - - [Quick Overview](#quick-overview) - - [Data Model](#data-model) - - [Data Objects](#data-objects) - - [Creation](#creation) +- [Installation](#installation) +- [Quick-Guide](#quick-guide) + - [How to search properly](#query) +- [Matrix Space](#matrix-space) + +If you want to use this a library or contribute, check out [the wiki](https://gitea.elara.ws/music-kraken/music-kraken-core/wiki) for more information. --- ## Installation -You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package, -or simply the source code from [GitHub](https://github.com/HeIIow2/music-downloader). Note that even though -everything **SHOULD** work cross-platform, I have only tested it on Ubuntu. -If you enjoy this project, feel free to give it a star on GitHub. +You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package, +or simply the source code from [Gitea](https://gitea.elara.ws/music-kraken/music-kraken-core). ** -> THE PyPI PACKAGE IS OUTDATED +**NOTES** + +- Even though everything **SHOULD** work cross-platform, I have only tested it on Ubuntu. +- If you enjoy this project, feel free to give it a star on GitHub. ### From source -if you use Debian or Ubuntu: - ```sh -git clone https://github.com/HeIIow2/music-downloader -sudo apt install pandoc - -cd music-downloader/ -python3 -m pip install -r requirements.txt +git clone https://gitea.elara.ws/music-kraken/music-kraken-core.git +python3 -m pip install -e music-kraken-core/ ``` -then you can add to `~/.bashrc` +To update the program, if installed like this, go into the `music-kraken-core` directory and run `git pull`. -``` -alias music-kraken='cd your/directory/music-downloader/src; python3 -m music_kraken' -alias 🥺='sudo' -``` +### Get it running on other Systems -```sh -source ~/.bashrc -music-kraken -``` +Here are the collected issues, that are related to running the program on different systems. If you have any issues, feel free to open a new one. -### Notes for WSL +#### Windows + WSL -If you choose to run it in WSL, make sure ` ~/.local/bin` is added to your `$PATH` [#2][i2] +Add ` ~/.local/bin` to your `$PATH`. [#2][i2] ## Quick-Guide @@ -87,10 +69,6 @@ The escape character is as usual `\`. --- -## CONTRIBUTE - -I am happy about every pull request. To contribute look [here](contribute.md). - ## Matrix Space @@ -99,171 +77,5 @@ I decided against creating a discord server, due to various communities get ofte **Click [this invitation](https://matrix.to/#/#music-kraken:matrix.org) _([https://matrix.to/#/#music-kraken:matrix.org](https://matrix.to/#/#music-kraken:matrix.org))_ to join.** -## TODO till the next release - -> These Points will most likely be in the changelogs. - -- [x] Migrate away from pandoc, to a more lightweight alternative, that can be installed over PiPY. -- [ ] Update the Documentation of the internal structure. _(could be pushed back one release)_ - ---- - -# Programming Interface / Use as Library - -This application is $100\%$ centered around Data. Thus, the most important thing for working with musik kraken is, to understand how I structured the data. - -## Quick Overview - -- explanation of the [Data Model](#data-model) -- how to use the [Data Objects](#data-objects) -- further Dokumentation of _hopefully_ [most relevant classes](documentation/objects.md) -- the [old implementation](documentation/old_implementation.md) - -```mermaid ---- -title: Quick Overview (outdated) ---- -sequenceDiagram - -participant pg as Page (eg. YouTube, MB, Musify, ...) -participant obj as DataObjects (eg. Song, Artist, ...) -participant db as DataBase - -obj ->> db: write -db ->> obj: read - -pg -> obj: find a source for any page, for object. -obj -> pg: add more detailed data from according page. -obj -> pg: if available download audio to target. -``` - -## Data Model - -The Data Structure, that the whole programm is built on looks as follows: - -```mermaid ---- -title: Music Data ---- -erDiagram - - - -Target { - -} - -Lyrics { - -} - -Song { - -} - -Album { - -} - -Artist { - -} - -Label { - -} - -Source { - -} - -Source }o--|| Song : "" -Source }o--|| Lyrics : "" -Source }o--|| Album : "" -Source }o--|| Artist : "" -Source }o--|| Label : "" - -Song }o--o{ Album : AlbumSong -Album }o--o{ Artist : ArtistAlbum -Song }o--o{ Artist : "ArtistSong (features)" - -Label }o--o{ Album : LabelAlbum -Label }o--o{ Artist : LabelSong - -Song ||--o{ Lyrics : "" -Song ||--o{ Target : "" -``` - -Ok now this **WILL** look intimidating, thus I break it down quickly. -*That is also the reason I didn't add all Attributes here.* - -The most important Entities are: - -- Song -- Album -- Artist -- Label - -All of them *(and Lyrics)* can have multiple Sources, and every Source can only Point to one of those Element. - -The `Target` Entity represents the location on the hard drive a Song has. One Song can have multiple download Locations. - -The `Lyrics` Entity simply represents the Lyrics of each Song. One Song can have multiple Lyrics, e.g. Translations. - -Here is the simplified Diagramm without only the main Entities. - - -```mermaid ---- -title: simplified Music Data ---- -erDiagram - -Song { - -} - -Album { - -} - -Artist { - -} - -Label { - -} - -Song }o--o{ Album : AlbumSong -Album }o--o{ Artist : ArtistAlbum -Song }o--o{ Artist : "ArtistSong (features)" - -Label }o--o{ Album : LabelAlbum -Label }o--o{ Artist : LabelSong - -``` - -Looks way more manageable, doesn't it? - -The reason every relation here is a `n:m` *(many to many)* relation is not, that it makes sense in the aspekt of modeling reality, but to be able to put data from many Sources in the same Data Model. -Every Service models Data a bit different, and projecting a one-to-many relationship to a many to many relationship without data loss is easy. The other way around it is basically impossible - -## Data Objects - -> Not 100% accurate yet and *might* change slightly - -### Creation - -```python -# needs to be added -``` - - - -If you just want to start implementing, then just use the code example I provided, I don't care. -For those who don't want any bugs and use it as intended *(which is recommended, cuz I am only one person so there are defs bugs)* continue reading, and read the whole documentation, which may exist in the future xD - - [i10]: https://github.com/HeIIow2/music-downloader/issues/10 [i2]: https://github.com/HeIIow2/music-downloader/issues/2 diff --git a/development/actual_donwload.py b/development/actual_donwload.py index a8eb732..548e228 100644 --- a/development/actual_donwload.py +++ b/development/actual_donwload.py @@ -7,7 +7,8 @@ logging.getLogger().setLevel(logging.DEBUG) if __name__ == "__main__": commands = [ "s: #a Crystal F", - "d: 20", + "10", + "2", ] diff --git a/development/objects_collection.py b/development/objects_collection.py index 642bb18..893e2c5 100644 --- a/development/objects_collection.py +++ b/development/objects_collection.py @@ -2,30 +2,24 @@ import music_kraken from music_kraken.objects import Song, Album, Artist, Collection if __name__ == "__main__": - album_1 = Album( - title="album", - song_list=[ - Song(title="song", main_artist_list=[Artist(name="artist")]), - ], - artist_list=[ - Artist(name="artist 3"), - ] + song_1 = Song( + title="song", + feature_artist_list=[Artist( + name="main_artist" + )] ) - album_2 = Album( - title="album", - song_list=[ - Song(title="song", main_artist_list=[Artist(name="artist 2")]), - ], - artist_list=[ - Artist(name="artist"), - ] + other_artist = Artist(name="other_artist") + + song_2 = Song( + title = "song", + main_artist_list=[other_artist] ) - album_1.merge(album_2) + other_artist.name = "main_artist" - print() - print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ") + song_1.merge(song_2) - print(id(album_1.artist_collection), id(album_2.artist_collection)) - print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection)) \ No newline at end of file + print("#" * 120) + print("main", *song_1.main_artist_collection) + print("feat", *song_1.feature_artist_collection) diff --git a/music_kraken/audio/metadata.py b/music_kraken/audio/metadata.py index b59bc98..dbcd36a 100644 --- a/music_kraken/audio/metadata.py +++ b/music_kraken/audio/metadata.py @@ -80,7 +80,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song): with temp_target.open("wb") as f: f.write(r.content) - converted_target: Target = Target.temp(name=f"{song.title}.jpeg") + converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}") with Image.open(temp_target.file_path) as img: # crop the image if it isn't square in the middle with minimum data loss width, height = img.size diff --git a/music_kraken/cli/main_downloader.py b/music_kraken/cli/main_downloader.py index dad0b5d..f9321b4 100644 --- a/music_kraken/cli/main_downloader.py +++ b/music_kraken/cli/main_downloader.py @@ -6,16 +6,18 @@ import re from .utils import cli_function from .options.first_config import initial_config +from ..utils import output, BColors from ..utils.config import write_config, main_settings from ..utils.shared import URL_PATTERN from ..utils.string_processing import fit_to_file_system from ..utils.support_classes.query import Query from ..utils.support_classes.download_result import DownloadResult +from ..utils.exception import MKInvalidInputException from ..utils.exception.download import UrlNotFoundException from ..utils.enums.colors import BColors from .. import console -from ..download.results import Results, Option, PageResults +from ..download.results import Results, Option, PageResults, GoToResults from ..download.page_attributes import Pages from ..pages import Page from ..objects import Song, Album, Artist, DatabaseObject @@ -174,7 +176,7 @@ class Downloader: print() page_count = 0 - for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options): + for option in self.current_results.formatted_generator(): if isinstance(option, Option): _downloadable = self.pages.is_downloadable(option.music_object) @@ -249,7 +251,7 @@ class Downloader: f"Recommendations and suggestions on sites to implement appreciated.\n" f"But don't be a bitch if I don't end up implementing it.") return - self.set_current_options(PageResults(page, data_object.options)) + self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options)) self.print_current_options() return @@ -299,95 +301,119 @@ class Downloader: self.set_current_options(self.pages.search(parsed_query)) self.print_current_options() - def goto(self, index: int): + def goto(self, data_object: DatabaseObject): page: Type[Page] - music_object: DatabaseObject - try: - page, music_object = self.current_results.get_music_object_by_index(index) - except KeyError: - print() - print(f"The option {index} doesn't exist.") - print() - return + self.pages.fetch_details(data_object, stop_at_level=1) - self.pages.fetch_details(music_object) - - print(music_object) - print(music_object.options) - self.set_current_options(PageResults(page, music_object.options)) + self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options)) self.print_current_options() - def download(self, download_str: str, download_all: bool = False) -> bool: - to_download: List[DatabaseObject] = [] - - if re.match(URL_PATTERN, download_str) is not None: - _, music_objects = self.pages.fetch_url(download_str) - to_download.append(music_objects) - + def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: + output() + if len(data_objects) == 1: + output(f"Downloading {data_objects[0].option_string}...", color=BColors.BOLD) else: - index: str - for index in download_str.split(", "): - if not index.strip().isdigit(): - print() - print(f"Every download thingie has to be an index, not {index}.") - print() - return False - - for index in download_str.split(", "): - to_download.append(self.current_results.get_music_object_by_index(int(index))[1]) - - print() - print("Downloading:") - for download_object in to_download: - print(download_object.option_string) - print() + output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n") _result_map: Dict[DatabaseObject, DownloadResult] = dict() - for database_object in to_download: - r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all, - process_metadata_anyway=self.process_metadata_anyway) + for database_object in data_objects: + r = self.pages.download( + music_object=database_object, + genre=self.genre, + **kwargs + ) _result_map[database_object] = r for music_object, result in _result_map.items(): - print() - print(music_object.option_string) - print(result) + output() + output(music_object.option_string) + output(result) return True def process_input(self, input_str: str) -> bool: - input_str = input_str.strip() - processed_input: str = input_str.lower() + try: + input_str = input_str.strip() + processed_input: str = input_str.lower() - if processed_input in EXIT_COMMANDS: - return True + if processed_input in EXIT_COMMANDS: + return True - if processed_input == ".": - self.print_current_options() - return False - - if processed_input == "..": - if self.previous_option(): + if processed_input == ".": self.print_current_options() + return False + + if processed_input == "..": + if self.previous_option(): + self.print_current_options() + return False + + command = "" + query = processed_input + if ":" in processed_input: + _ = processed_input.split(":") + command, query = _[0], ":".join(_[1:]) + + do_search = "s" in command + do_download = "d" in command + do_merge = "m" in command + + if do_search and do_download: + raise MKInvalidInputException(message="You can't search and download at the same time.") + + if do_search and do_merge: + raise MKInvalidInputException(message="You can't search and merge at the same time.") + + if do_search: + self.search(":".join(input_str.split(":")[1:])) + return False + + indices = [] + for possible_index in query.split(","): + possible_index = possible_index.strip() + if possible_index == "": + continue + + i = 0 + try: + i = int(possible_index) + except ValueError: + raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.") + + if i < 0 or i >= len(self.current_results): + raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.") + + indices.append(i) + + selected_objects = [self.current_results[i] for i in indices] + + if do_merge: + old_selected_objects = selected_objects + + a = old_selected_objects[0] + for b in old_selected_objects[1:]: + if type(a) != type(b): + raise MKInvalidInputException(message="You can't merge different types of objects.") + a.merge(b) + + selected_objects = [a] + + if do_download: + self.download(selected_objects) + return False + + if len(selected_objects) != 1: + raise MKInvalidInputException(message="You can only go to one object at a time without merging.") + + self.goto(selected_objects[0]) return False + except MKInvalidInputException as e: + output("\n" + e.message + "\n", color=BColors.FAIL) + help_message() - if processed_input.startswith("s: "): - self.search(input_str[3:]) - return False - - if processed_input.startswith("d: "): - return self.download(input_str[3:]) - - if processed_input.isdigit(): - self.goto(int(processed_input)) - return False - - if processed_input != "help": - print(f"{BColors.WARNING.value}Invalid input.{BColors.ENDC.value}") - help_message() return False def mainloop(self): diff --git a/music_kraken/download/results.py b/music_kraken/download/results.py index c0dff08..a8fead7 100644 --- a/music_kraken/download/results.py +++ b/music_kraken/download/results.py @@ -13,31 +13,35 @@ class Option: class Results: - def __init__(self) -> None: + def __init__(self, max_items_per_page: int = 10, **kwargs) -> None: self._by_index: Dict[int, DatabaseObject] = dict() self._page_by_index: Dict[int: Type[Page]] = dict() + + self.max_items_per_page = max_items_per_page def __iter__(self) -> Generator[DatabaseObject, None, None]: - for option in self.formated_generator(): + for option in self.formatted_generator(): if isinstance(option, Option): yield option.music_object - def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]: + def formatted_generator(self) -> Generator[Union[Type[Page], Option], None, None]: self._by_index = dict() self._page_by_index = dict() - - def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]: - # if this throws a key error, either the formatted generator needs to be iterated, or the option doesn't exist. - return self._page_by_index[index], self._by_index[index] + + def __len__(self) -> int: + return max(self._by_index.keys()) + + def __getitem__(self, index: int): + return self._by_index[index] class SearchResults(Results): def __init__( self, - pages: Tuple[Type[Page], ...] = None - + pages: Tuple[Type[Page], ...] = None, + **kwargs, ) -> None: - super().__init__() + super().__init__(**kwargs) self.pages = pages or [] # this would initialize a list for every page, which I don't think I want @@ -54,9 +58,12 @@ class SearchResults(Results): def get_page_results(self, page: Type[Page]) -> "PageResults": return PageResults(page, self.results.get(page, [])) + + def __len__(self) -> int: + return sum(min(self.max_items_per_page, len(results)) for results in self.results.values()) - def formated_generator(self, max_items_per_page: int = 10): - super().formated_generator() + def formatted_generator(self): + super().formatted_generator() i = 0 for page in self.results: @@ -70,19 +77,37 @@ class SearchResults(Results): i += 1 j += 1 - if j >= max_items_per_page: + if j >= self.max_items_per_page: break +class GoToResults(Results): + def __init__(self, results: List[DatabaseObject], **kwargs): + self.results: List[DatabaseObject] = results + + super().__init__(**kwargs) + + def __getitem__(self, index: int): + return self.results[index] + + def __len__(self) -> int: + return len(self.results) + + def formatted_generator(self): + yield from (Option(i, o) for i, o in enumerate(self.results)) + + + class PageResults(Results): - def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None: - super().__init__() + def __init__(self, page: Type[Page], results: List[DatabaseObject], **kwargs) -> None: + super().__init__(**kwargs) self.page: Type[Page] = page self.results: List[DatabaseObject] = results + - def formated_generator(self, max_items_per_page: int = 10): - super().formated_generator() + def formatted_generator(self, max_items_per_page: int = 10): + super().formatted_generator() i = 0 yield self.page @@ -92,3 +117,6 @@ class PageResults(Results): self._by_index[i] = option self._page_by_index[i] = self.page i += 1 + + def __len__(self) -> int: + return len(self.results) diff --git a/music_kraken/objects/artwork.py b/music_kraken/objects/artwork.py index 43ea87e..d5ba54b 100644 --- a/music_kraken/objects/artwork.py +++ b/music_kraken/objects/artwork.py @@ -53,9 +53,9 @@ class Artwork: def get_variant_name(self, variant: ArtworkVariant) -> str: return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}" - def __merge__(self, other: Artwork, override: bool = False) -> None: + def __merge__(self, other: Artwork, **kwargs) -> None: for key, value in other._variant_mapping.items(): - if key not in self._variant_mapping or override: + if key not in self._variant_mapping: self._variant_mapping[key] = value def __eq__(self, other: Artwork) -> bool: diff --git a/music_kraken/objects/collection.py b/music_kraken/objects/collection.py index 02bff19..9fd9f90 100644 --- a/music_kraken/objects/collection.py +++ b/music_kraken/objects/collection.py @@ -1,9 +1,12 @@ from __future__ import annotations from collections import defaultdict -from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any +from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set +import copy + from .parents import OuterProxy from ..utils import object_trace +from ..utils import output, BColors T = TypeVar('T', bound=OuterProxy) @@ -13,8 +16,8 @@ class Collection(Generic[T]): _data: List[T] - _indexed_values: Dict[str, set] - _indexed_to_objects: Dict[any, list] + _indexed_from_id: Dict[int, Dict[str, Any]] + _indexed_values: Dict[str, Dict[Any, T]] shallow_list = property(fget=lambda self: self.data) @@ -36,9 +39,9 @@ class Collection(Generic[T]): self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {} + self.pull_from: List[Collection] = [] + self.push_to: List[Collection] = [] - self._id_to_index_values: Dict[int, set] = defaultdict(set) - # This is to cleanly unmap previously mapped items by their id self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) # this is to keep track and look up the actual objects @@ -46,11 +49,19 @@ class Collection(Generic[T]): self.extend(data) - def __repr__(self) -> str: - return f"Collection({id(self)})" + def __hash__(self) -> int: + return id(self) - def _map_element(self, __object: T, from_map: bool = False): - self._unmap_element(__object.id) + @property + def collection_names(self) -> List[str]: + return list(set(self._collection_for.values())) + + def __repr__(self) -> str: + return f"Collection({' | '.join(self.collection_names)} {id(self)})" + + def _map_element(self, __object: T, no_unmap: bool = False, **kwargs): + if not no_unmap: + self._unmap_element(__object.id) self._indexed_from_id[__object.id]["id"] = __object.id self._indexed_values["id"][__object.id] = __object @@ -74,73 +85,125 @@ class Collection(Generic[T]): del self._indexed_from_id[obj_id] - def _find_object(self, __object: T) -> Optional[T]: + def _remap(self): + # reinitialize the mapping to clean it without time consuming operations + self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) + self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict) + + for e in self._data: + self._map_element(e, no_unmap=True) + + + def _find_object(self, __object: T, **kwargs) -> Optional[T]: + self._remap() + + if __object.id in self._indexed_from_id: + return self._indexed_values["id"][__object.id] + for name, value in __object.indexing_values: if value in self._indexed_values[name]: return self._indexed_values[name][value] - def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False): + return None + + def _append_new_object(self, other: T, **kwargs): + """ + This function appends the other object to the current collection. + This only works if not another object, which represents the same real life object exists in the collection. + """ + + self._data.append(other) + other._inner._is_in_collection.add(self) + + # all of the existing hooks to get the defined datastructures + for collection_attribute, generator in self.extend_object_to_attribute.items(): + other.__getattribute__(collection_attribute).extend(generator, **kwargs) + + for attribute, new_object in self.append_object_to_attribute.items(): + other.__getattribute__(attribute).append(new_object, **kwargs) + + for attribute, a in self.sync_on_append.items(): + # syncing two collections by reference + b = other.__getattribute__(attribute) + if a is b: + continue + + object_trace(f"Syncing [{a}] = [{b}]") + + b_data = b.data.copy() + b_collection_for = b._collection_for.copy() + + del b + + for synced_with, key in b_collection_for.items(): + synced_with.__setattr__(key, a) + a._collection_for[synced_with] = key + + a.extend(b_data, **kwargs) + + def append(self, other: Optional[T], **kwargs): """ If an object, that represents the same entity exists in a relevant collection, merge into this object. (and remap) Else append to this collection. - :param __object: - :param already_is_parent: - :param from_map: + :param other: :return: """ - if __object is None: + if other is None: + return + if other.id in self._indexed_from_id: return - existing_object = self._find_object(__object) + object_trace(f"Appending {other.option_string} to {self}") - if existing_object is None: - # append - self._data.append(__object) - self._map_element(__object) - - for collection_attribute, child_collection in self.extend_object_to_attribute.items(): - __object.__getattribute__(collection_attribute).extend(child_collection) - - for attribute, new_object in self.append_object_to_attribute.items(): - __object.__getattribute__(attribute).append(new_object) - - # only modify collections if the object actually has been appended - for attribute, a in self.sync_on_append.items(): - b = __object.__getattribute__(attribute) - object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]") - - data_to_extend = b.data - - a._collection_for.update(b._collection_for) - for synced_with, key in b._collection_for.items(): - synced_with.__setattr__(key, a) - - a.extend(data_to_extend) + # switching collection in the case of push to + for c in self.push_to: + r = c._find_object(other) + if r is not None: + # output("found push to", r, other, c, self, color=BColors.RED, sep="\t") + return c.append(other, **kwargs) + for c in self.pull_from: + r = c._find_object(other) + if r is not None: + # output("found pull from", r, other, c, self, color=BColors.RED, sep="\t") + c.remove(r, existing=r, **kwargs) + + existing = self._find_object(other) + if existing is None: + self._append_new_object(other, **kwargs) else: - # merge only if the two objects are not the same - if existing_object.id == __object.id: - return + existing.merge(other, **kwargs) - old_id = existing_object.id + def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, remove_from_other_collection=True, **kwargs): + other: T + for other in other_list: + existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None) + if existing is None: + if not silent: + raise ValueError(f"Object {other} not found in {self}") + return other - existing_object.merge(__object) + if remove_from_other_collection: + for c in copy.copy(other._inner._is_in_collection): + c.remove(other, silent=True, remove_from_other_collection=False, **kwargs) + other._inner._is_in_collection = set() + else: + self._data.remove(existing) + self._unmap_element(existing) - if existing_object.id != old_id: - self._unmap_element(old_id) + def contains(self, __object: T) -> bool: + return self._find_object(__object) is not None - self._map_element(existing_object) - - def extend(self, __iterable: Optional[Generator[T, None, None]]): - if __iterable is None: + def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs): + if other_collections is None: return - for __object in __iterable: - self.append(__object) + for other_object in other_collections: + self.append(other_object, **kwargs) @property def data(self) -> List[T]: @@ -156,8 +219,9 @@ class Collection(Generic[T]): def __iter__(self) -> Iterator[T]: yield from self._data - def __merge__(self, __other: Collection, override: bool = False): - self.extend(__other) + def __merge__(self, other: Collection, **kwargs): + object_trace(f"merging {str(self)} | {str(other)}") + self.extend(other, **kwargs) def __getitem__(self, item: int): return self._data[item] @@ -166,3 +230,9 @@ class Collection(Generic[T]): if item >= len(self._data): return default return self._data[item] + + def __eq__(self, other: Collection) -> bool: + if self.empty and other.empty: + return True + + return self._data == other._data diff --git a/music_kraken/objects/parents.py b/music_kraken/objects/parents.py index 59a3d10..b4f867a 100644 --- a/music_kraken/objects/parents.py +++ b/music_kraken/objects/parents.py @@ -9,9 +9,9 @@ from pathlib import Path import inspect from .metadata import Metadata -from ..utils import get_unix_time, object_trace +from ..utils import get_unix_time, object_trace, generate_id from ..utils.config import logging_settings, main_settings -from ..utils.shared import HIGHEST_ID +from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID from ..utils.hacking import MetaClass LOGGER = logging_settings["object_logger"] @@ -29,9 +29,15 @@ class InnerData: """ _refers_to_instances: set = None + _is_in_collection: set = None + """ + Attribute versions keep track, of if the attribute has been changed. + """ def __init__(self, object_type, **kwargs): self._refers_to_instances = set() + self._is_in_collection = set() + self._fetched_from: dict = {} # initialize the default values @@ -42,21 +48,29 @@ class InnerData: for key, value in kwargs.items(): if hasattr(value, "__is_collection__"): value._collection_for[self] = key + self.__setattr__(key, value) def __hash__(self): return self.id - def __merge__(self, __other: InnerData, override: bool = False): + def __merge__(self, __other: InnerData, **kwargs): """ :param __other: - :param override: :return: """ self._fetched_from.update(__other._fetched_from) + self._is_in_collection.update(__other._is_in_collection) for key, value in __other.__dict__.copy().items(): + if key.startswith("_"): + continue + + if hasattr(value, "__is_collection__") and key in self.__dict__: + self.__getattribute__(key).__merge__(value, **kwargs) + continue + # just set the other value if self doesn't already have it if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)): self.__setattr__(key, value) @@ -64,13 +78,8 @@ class InnerData: # if the object of value implemented __merge__, it merges existing = self.__getattribute__(key) - if hasattr(type(existing), "__merge__"): - existing.__merge__(value, override) - continue - - # override the existing value if requested - if override: - self.__setattr__(key, value) + if hasattr(existing, "__merge__"): + existing.__merge__(value, **kwargs) class OuterProxy: @@ -84,8 +93,6 @@ class OuterProxy: DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() - TITEL = "id" - def __init__(self, _id: int = None, dynamic: bool = False, **kwargs): _automatic_id: bool = False @@ -94,7 +101,7 @@ class OuterProxy: generates a random integer id the range is defined in the config """ - _id = random.randint(0, HIGHEST_ID) + _id = generate_id() _automatic_id = True kwargs["automatic_id"] = _automatic_id @@ -116,7 +123,7 @@ class OuterProxy: self._inner: InnerData = InnerData(type(self), **kwargs) self._inner._refers_to_instances.add(self) - object_trace(f"creating {type(self).__name__} [{self.title_string}]") + object_trace(f"creating {type(self).__name__} [{self.option_string}]") self.__init_collections__() @@ -173,13 +180,12 @@ class OuterProxy: def __eq__(self, other: Any): return self.__hash__() == other.__hash__() - def merge(self, __other: Optional[OuterProxy], override: bool = False): + def merge(self, __other: Optional[OuterProxy], **kwargs): """ 1. merges the data of __other in self 2. replaces the data of __other with the data of self :param __other: - :param override: :return: """ if __other is None: @@ -196,7 +202,7 @@ class OuterProxy: if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): a, b = b, a - object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]") + object_trace(f"merging {a.option_string} | {b.option_string}") old_inner = b._inner @@ -204,11 +210,11 @@ class OuterProxy: instance._inner = a._inner a._inner._refers_to_instances.add(instance) - a._inner.__merge__(old_inner, override=override) + a._inner.__merge__(old_inner, **kwargs) del old_inner - def __merge__(self, __other: Optional[OuterProxy], override: bool = False): - self.merge(__other, override) + def __merge__(self, __other: Optional[OuterProxy], **kwargs): + self.merge(__other, **kwargs) def mark_as_fetched(self, *url_hash_list: List[str]): for url_hash in url_hash_list: @@ -235,7 +241,23 @@ class OuterProxy: @property def options(self) -> List[P]: - return [self] + r = [] + + for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES: + r.extend(self.__getattribute__(collection_string_attribute)) + + r.append(self) + + for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES: + r.extend(self.__getattribute__(collection_string_attribute)) + + return r + + @property + def option_string(self) -> str: + return self.title_string + + INDEX_DEPENDS_ON: List[str] = [] @property def indexing_values(self) -> List[Tuple[str, object]]: @@ -267,9 +289,10 @@ class OuterProxy: return r + TITEL = "id" @property def title_string(self) -> str: - return str(self.__getattribute__(self.TITEL)) + return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") def __repr__(self): return f"{type(self).__name__}({self.title_string})" diff --git a/music_kraken/objects/song.py b/music_kraken/objects/song.py index be6d751..33f68a0 100644 --- a/music_kraken/objects/song.py +++ b/music_kraken/objects/song.py @@ -3,6 +3,7 @@ from __future__ import annotations import random from collections import defaultdict from typing import List, Optional, Dict, Tuple, Type, Union +import copy import pycountry @@ -22,6 +23,7 @@ from .parents import OuterProxy, P from .source import Source, SourceCollection from .target import Target from .country import Language, Country +from ..utils.shared import DEBUG_PRINT_ID from ..utils.string_processing import unify from .parents import OuterProxy as Base @@ -43,7 +45,8 @@ def get_collection_string( template: str, ignore_titles: Set[str] = None, background: BColors = OPTION_BACKGROUND, - foreground: BColors = OPTION_FOREGROUND + foreground: BColors = OPTION_FOREGROUND, + add_id: bool = DEBUG_PRINT_ID, ) -> str: if collection.empty: return "" @@ -55,8 +58,15 @@ def get_collection_string( r = background + def get_element_str(element) -> str: + nonlocal add_id + r = element.title_string.strip() + if add_id and False: + r += " " + str(element.id) + return r + element: Base - titel_list: List[str] = [element.title_string.strip() for element in collection if element.title_string not in ignore_titles] + titel_list: List[str] = [get_element_str(element) for element in collection if element.title_string not in ignore_titles] for i, titel in enumerate(titel_list): delimiter = ", " @@ -109,15 +119,29 @@ class Song(Base): "tracksort": lambda: 0, } - def __init__(self, title: str = "", unified_title: str = None, isrc: str = None, length: int = None, - genre: str = None, note: FormattedText = None, source_list: List[Source] = None, - target_list: List[Target] = None, lyrics_list: List[Lyrics] = None, - main_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None, - album_list: List[Album] = None, tracksort: int = 0, artwork: Optional[Artwork] = None, **kwargs) -> None: + def __init__( + self, + title: str = None, + isrc: str = None, + length: int = None, + genre: str = None, + note: FormattedText = None, + source_list: List[Source] = None, + target_list: List[Target] = None, + lyrics_list: List[Lyrics] = None, + main_artist_list: List[Artist] = None, + feature_artist_list: List[Artist] = None, + album_list: List[Album] = None, + tracksort: int = 0, + artwork: Optional[Artwork] = None, + **kwargs + ) -> None: + real_kwargs = copy.copy(locals()) + real_kwargs.update(real_kwargs.pop("kwargs", {})) - Base.__init__(**locals()) + Base.__init__(**real_kwargs) - UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection") + UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection") TITEL = "title" def __init_collections__(self) -> None: @@ -135,6 +159,9 @@ class Song(Base): "feature_song_collection": self } + self.feature_artist_collection.push_to = [self.main_artist_collection] + self.main_artist_collection.pull_from = [self.feature_artist_collection] + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): if object_type is Song: return @@ -144,20 +171,21 @@ class Song(Base): return if isinstance(object_list, Artist): - self.main_artist_collection.extend(object_list) + self.feature_artist_collection.extend(object_list) return if isinstance(object_list, Album): self.album_collection.extend(object_list) return + INDEX_DEPENDS_ON = ("title", "isrc", "source_collection") + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('title', unify(self.title)), ('isrc', self.isrc), - *[('url', source.url) for source in self.source_collection] + *self.source_collection.indexing_values(), ] @property @@ -169,6 +197,8 @@ class Song(Base): id3Mapping.GENRE: [self.genre], id3Mapping.TRACKNUMBER: [self.tracksort_str], id3Mapping.COMMENT: [self.note.markdown], + id3Mapping.FILE_WEBPAGE_URL: self.source_collection.url_list, + id3Mapping.SOURCE_WEBPAGE_URL: self.source_collection.homepage_list, }) # metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3 @@ -189,20 +219,12 @@ class Song(Base): @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.feature_artist_collection, " feat. {}") return r - @property - def options(self) -> List[P]: - options = self.main_artist_collection.shallow_list - options.extend(self.feature_artist_collection) - options.extend(self.album_collection) - options.append(self) - return options - @property def tracksort_str(self) -> str: """ @@ -258,18 +280,30 @@ class Album(Base): TITEL = "title" # This is automatically generated - def __init__(self, title: str = None, unified_title: str = None, album_status: AlbumStatus = None, - album_type: AlbumType = None, language: Language = None, date: ID3Timestamp = None, - barcode: str = None, albumsort: int = None, notes: FormattedText = None, - source_list: List[Source] = None, artist_list: List[Artist] = None, song_list: List[Song] = None, - label_list: List[Label] = None, **kwargs) -> None: - super().__init__(title=title, unified_title=unified_title, album_status=album_status, album_type=album_type, - language=language, date=date, barcode=barcode, albumsort=albumsort, notes=notes, - source_list=source_list, artist_list=artist_list, song_list=song_list, label_list=label_list, - **kwargs) + def __init__( + self, + title: str = None, + unified_title: str = None, + album_status: AlbumStatus = None, + album_type: AlbumType = None, + language: Language = None, + date: ID3Timestamp = None, + barcode: str = None, + albumsort: int = None, + notes: FormattedText = None, + source_list: List[Source] = None, + artist_list: List[Artist] = None, + song_list: List[Song] = None, + label_list: List[Label] = None, + **kwargs + ) -> None: + real_kwargs = copy.copy(locals()) + real_kwargs.update(real_kwargs.pop("kwargs", {})) + + Base.__init__(**real_kwargs) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",) - UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection") + UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") def __init_collections__(self): self.song_collection.append_object_to_attribute = { @@ -302,13 +336,14 @@ class Album(Base): self.label_collection.extend(object_list) return + INDEX_DEPENDS_ON = ("title", "barcode", "source_collection") + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('title', unify(self.title)), ('barcode', self.barcode), - *[('url', source.url) for source in self.source_collection] + *self.source_collection.indexing_values(), ] @property @@ -333,19 +368,13 @@ class Album(Base): @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.label_collection, " under {}") if len(self.song_collection) > 0: r += f" with {len(self.song_collection)} songs" return r - - @property - def options(self) -> List[P]: - options = [*self.artist_collection, self, *self.song_collection] - - return options def update_tracksort(self): """ @@ -372,18 +401,6 @@ class Album(Base): tracksort_map[i] = existing_list.pop(0) tracksort_map[i].tracksort = i - def compile(self, merge_into: bool = False): - """ - compiles the recursive structures, - and does depending on the object some other stuff. - - no need to override if only the recursive structure should be built. - override self.build_recursive_structures() instead - """ - - self.update_tracksort() - self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into) - @property def copyright(self) -> str: if self.date is None: @@ -415,21 +432,15 @@ class Album(Base): return self.album_type.value -""" -All objects dependent on Artist -""" - - class Artist(Base): name: str - unified_name: str country: Country formed_in: ID3Timestamp notes: FormattedText lyrical_themes: List[str] general_genre: str - unformated_location: str + unformatted_location: str source_collection: SourceCollection contact_collection: Collection[Contact] @@ -439,10 +450,9 @@ class Artist(Base): label_collection: Collection[Label] _default_factories = { - "name": str, - "unified_name": lambda: None, + "name": lambda: None, "country": lambda: None, - "unformated_location": lambda: None, + "unformatted_location": lambda: None, "formed_in": ID3Timestamp, "notes": FormattedText, @@ -459,19 +469,30 @@ class Artist(Base): TITEL = "name" # This is automatically generated - def __init__(self, name: str = "", unified_name: str = None, country: Country = None, - formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None, - general_genre: str = None, unformated_location: str = None, source_list: List[Source] = None, - contact_list: List[Contact] = None, feature_song_list: List[Song] = None, - main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None: + def __init__( + self, + name: str = None, + unified_name: str = None, + country: Country = None, + formed_in: ID3Timestamp = None, + notes: FormattedText = None, + lyrical_themes: List[str] = None, + general_genre: str = None, + unformatted_location: str = None, + source_list: List[Source] = None, + contact_list: List[Contact] = None, + feature_song_list: List[Song] = None, + main_album_list: List[Album] = None, + label_list: List[Label] = None, + **kwargs + ) -> None: + real_kwargs = copy.copy(locals()) + real_kwargs.update(real_kwargs.pop("kwargs", {})) - super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes, - lyrical_themes=lyrical_themes, general_genre=general_genre, - unformated_location=unformated_location, source_list=source_list, contact_list=contact_list, - feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list, - **kwargs) + Base.__init__(**real_kwargs) - DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("feature_song_collection", "main_album_collection") + + DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) def __init_collections__(self): @@ -504,12 +525,6 @@ class Artist(Base): self.label_collection.extend(object_list) return - @property - def options(self) -> List[P]: - options = [self, *self.main_album_collection.shallow_list, *self.feature_album] - print(options) - return options - def update_albumsort(self): """ This updates the albumsort attributes, of the albums in @@ -567,40 +582,27 @@ class Artist(Base): # replace the old collection with the new one self.main_album_collection: Collection = Collection(data=album_list, element_type=Album) + INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('name', unify(self.name)), - *[('url', source.url) for source in self.source_collection], - *[('contact', contact.value) for contact in self.contact_collection] + *[('contact', contact.value) for contact in self.contact_collection], + *self.source_collection.indexing_values(), ] @property def metadata(self) -> Metadata: metadata = Metadata({ - id3Mapping.ARTIST: [self.name] + id3Mapping.ARTIST: [self.name], + id3Mapping.ARTIST_WEBPAGE_URL: self.source_collection.url_list, }) - metadata.merge_many([s.get_artist_metadata() for s in self.source_collection]) return metadata - """ - def __str__(self, include_notes: bool = False): - string = self.name or "" - if include_notes: - plaintext_notes = self.notes.get_plaintext() - if plaintext_notes is not None: - string += "\n" + plaintext_notes - return string - """ - - def __repr__(self): - return f"Artist(\"{self.name}\")" - @property def option_string(self) -> str: - r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value + r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r += get_collection_string(self.label_collection, " under {}") r += OPTION_BACKGROUND.value @@ -613,48 +615,6 @@ class Artist(Base): return r - @property - def options(self) -> List[P]: - options = [self] - options.extend(self.main_album_collection) - options.extend(self.feature_song_collection) - return options - - @property - def feature_album(self) -> Album: - return Album( - title="features", - album_status=AlbumStatus.UNRELEASED, - album_type=AlbumType.COMPILATION_ALBUM, - is_split=True, - albumsort=666, - dynamic=True, - song_list=self.feature_song_collection.shallow_list - ) - - def get_all_songs(self) -> List[Song]: - """ - returns a list of all Songs. - probably not that useful, because it is unsorted - """ - collection = self.feature_song_collection.copy() - for album in self.discography: - collection.extend(album.song_collection) - - return collection - - @property - def discography(self) -> List[Album]: - flat_copy_discography = self.main_album_collection.copy() - flat_copy_discography.append(self.feature_album) - - return flat_copy_discography - - -""" -Label -""" - class Label(Base): COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection") @@ -683,12 +643,21 @@ class Label(Base): TITEL = "name" - def __init__(self, name: str = None, unified_name: str = None, notes: FormattedText = None, - source_list: List[Source] = None, contact_list: List[Contact] = None, - album_list: List[Album] = None, current_artist_list: List[Artist] = None, **kwargs) -> None: - super().__init__(name=name, unified_name=unified_name, notes=notes, source_list=source_list, - contact_list=contact_list, album_list=album_list, current_artist_list=current_artist_list, - **kwargs) + def __init__( + self, + name: str = None, + unified_name: str = None, + notes: FormattedText = None, + source_list: List[Source] = None, + contact_list: List[Contact] = None, + album_list: List[Album] = None, + current_artist_list: List[Artist] = None, + **kwargs + ) -> None: + real_kwargs = copy.copy(locals()) + real_kwargs.update(real_kwargs.pop("kwargs", {})) + + Base.__init__(**real_kwargs) def __init_collections__(self): self.album_collection.append_object_to_attribute = { @@ -702,7 +671,6 @@ class Label(Base): @property def indexing_values(self) -> List[Tuple[str, object]]: return [ - ('id', self.id), ('name', unify(self.name)), *[('url', source.url) for source in self.source_collection] ] diff --git a/music_kraken/objects/source.py b/music_kraken/objects/source.py index bb2e9e3..ff68d6a 100644 --- a/music_kraken/objects/source.py +++ b/music_kraken/objects/source.py @@ -2,142 +2,176 @@ from __future__ import annotations from collections import defaultdict from enum import Enum -from typing import List, Dict, Set, Tuple, Optional, Iterable -from urllib.parse import urlparse +from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator +from urllib.parse import urlparse, ParseResult +from dataclasses import dataclass, field +from functools import cached_property +from ..utils import generate_id from ..utils.enums.source import SourcePages, SourceTypes from ..utils.config import youtube_settings -from ..utils.string_processing import hash_url +from ..utils.string_processing import hash_url, shorten_display_url from .metadata import Mapping, Metadata from .parents import OuterProxy from .collection import Collection -class Source(OuterProxy): - url: str +@dataclass +class Source: page_enum: SourcePages - referer_page: SourcePages + url: str + referrer_page: SourcePages = None + audio_url: Optional[str] = None - audio_url: str + additional_data: dict = field(default_factory=dict) - _default_factories = { - "audio_url": lambda: None, - } - - # This is automatically generated - def __init__(self, page_enum: SourcePages, url: str, referer_page: SourcePages = None, audio_url: str = None, - **kwargs) -> None: - - if referer_page is None: - referer_page = page_enum - - super().__init__(url=url, page_enum=page_enum, referer_page=referer_page, audio_url=audio_url, **kwargs) + def __post_init__(self): + self.referrer_page = self.referrer_page or self.page_enum + + @property + def parsed_url(self) -> ParseResult: + return urlparse(self.url) @classmethod - def match_url(cls, url: str, referer_page: SourcePages) -> Optional["Source"]: + def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]: """ - this shouldn't be used, unlesse you are not certain what the source is for + this shouldn't be used, unless you are not certain what the source is for the reason is that it is more inefficient """ - parsed = urlparse(url) - url = parsed.geturl() + parsed_url = urlparse(url) + url = parsed_url.geturl() - if "musify" in parsed.netloc: - return cls(SourcePages.MUSIFY, url, referer_page=referer_page) + if "musify" in parsed_url.netloc: + return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page) - if parsed.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: - return cls(SourcePages.YOUTUBE, url, referer_page=referer_page) + if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: + return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page) if url.startswith("https://www.deezer"): - return cls(SourcePages.DEEZER, url, referer_page=referer_page) + return cls(SourcePages.DEEZER, url, referrer_page=referrer_page) if url.startswith("https://open.spotify.com"): - return cls(SourcePages.SPOTIFY, url, referer_page=referer_page) + return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page) if "bandcamp" in url: - return cls(SourcePages.BANDCAMP, url, referer_page=referer_page) + return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page) - if "wikipedia" in parsed.netloc: - return cls(SourcePages.WIKIPEDIA, url, referer_page=referer_page) + if "wikipedia" in parsed_url.netloc: + return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page) if url.startswith("https://www.metal-archives.com/"): - return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referer_page=referer_page) + return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page) # the less important once if url.startswith("https://www.facebook"): - return cls(SourcePages.FACEBOOK, url, referer_page=referer_page) + return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page) if url.startswith("https://www.instagram"): - return cls(SourcePages.INSTAGRAM, url, referer_page=referer_page) + return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page) if url.startswith("https://twitter"): - return cls(SourcePages.TWITTER, url, referer_page=referer_page) + return cls(SourcePages.TWITTER, url, referrer_page=referrer_page) if url.startswith("https://myspace.com"): - return cls(SourcePages.MYSPACE, url, referer_page=referer_page) - - def get_song_metadata(self) -> Metadata: - return Metadata({ - Mapping.FILE_WEBPAGE_URL: [self.url], - Mapping.SOURCE_WEBPAGE_URL: [self.homepage] - }) - - def get_artist_metadata(self) -> Metadata: - return Metadata({ - Mapping.ARTIST_WEBPAGE_URL: [self.url] - }) + return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page) @property def hash_url(self) -> str: return hash_url(self.url) @property - def metadata(self) -> Metadata: - return self.get_song_metadata() - - @property - def indexing_values(self) -> List[Tuple[str, object]]: - return [ - ('id', self.id), - ('url', self.url), - ('audio_url', self.audio_url), - ] - - def __str__(self): - return self.__repr__() + def indexing_values(self) -> list: + r = [hash_url(self.url)] + if self.audio_url: + r.append(hash_url(self.audio_url)) + return r def __repr__(self) -> str: - return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})" + return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})" - @property - def title_string(self) -> str: - return self.url + def __merge__(self, other: Source, **kwargs): + if self.audio_url is None: + self.audio_url = other.audio_url + self.additional_data.update(other.additional_data) page_str = property(fget=lambda self: self.page_enum.value) - type_str = property(fget=lambda self: self.type_enum.value) - homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum)) -class SourceCollection(Collection): +class SourceCollection: + __change_version__ = generate_id() + + _indexed_sources: Dict[str, Source] + _page_to_source_list: Dict[SourcePages, List[Source]] + def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): - self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list) + self._page_to_source_list = defaultdict(list) + self._indexed_sources = {} - super().__init__(data=data, **kwargs) + self.extend(data or []) - def _map_element(self, __object: Source, **kwargs): - super()._map_element(__object, **kwargs) + def has_source_page(self, *source_pages: SourcePages) -> bool: + return any(source_page in self._page_to_source_list for source_page in source_pages) - self._page_to_source_list[__object.page_enum].append(__object) + def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: + if not len(source_pages): + source_pages = self.source_pages + + for page in source_pages: + yield from self._page_to_source_list[page] + + def append(self, source: Source): + if source is None: + return + + existing_source = None + for key in source.indexing_values: + if key in self._indexed_sources: + existing_source = self._indexed_sources[key] + break + + if existing_source is not None: + existing_source.__merge__(source) + source = existing_source + else: + self._page_to_source_list[source.page_enum].append(source) + + changed = False + for key in source.indexing_values: + if key not in self._indexed_sources: + changed = True + self._indexed_sources[key] = source + + if changed: + self.__change_version__ = generate_id() + + def extend(self, sources: Iterable[Source]): + for source in sources: + self.append(source) + + def __iter__(self): + yield from self.get_sources() + + def __merge__(self, other: SourceCollection, **kwargs): + self.extend(other) @property - def source_pages(self) -> Set[SourcePages]: - return set(source.page_enum for source in self._data) + def source_pages(self) -> Iterable[SourcePages]: + return sorted(self._page_to_source_list.keys(), key=lambda page: page.value) - def get_sources_from_page(self, source_page: SourcePages) -> List[Source]: - """ - getting the sources for a specific page like - YouTube or musify - """ - return self._page_to_source_list[source_page].copy() + @property + def hash_url_list(self) -> List[str]: + return [hash_url(source.url) for source in self.get_sources()] + + @property + def url_list(self) -> List[str]: + return [source.url for source in self.get_sources()] + + @property + def homepage_list(self) -> List[str]: + return [source.homepage for source in self.source_pages] + + def indexing_values(self) -> Generator[Tuple[str, str], None, None]: + for index in self._indexed_sources: + yield "url", index \ No newline at end of file diff --git a/music_kraken/pages/abstract.py b/music_kraken/pages/abstract.py index 468067b..e322048 100644 --- a/music_kraken/pages/abstract.py +++ b/music_kraken/pages/abstract.py @@ -89,52 +89,6 @@ class NamingDict(dict): return self.default_value_for_name(attribute_name) -def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - if type(music_object) == Label: - return _clean_label(label=music_object, collections=collections) - if type(music_object) == Artist: - return _clean_artist(artist=music_object, collections=collections) - if type(music_object) == Album: - return _clean_album(album=music_object, collections=collections) - if type(music_object) == Song: - return _clean_song(song=music_object, collections=collections) - - -def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]): - if collection.element_type not in collection_dict: - return - - for i, element in enumerate(collection): - r = collection_dict[collection.element_type].append(element, merge_into_existing=True) - collection[i] = r.current_element - - if not r.was_the_same: - _clean_music_object(r.current_element, collection_dict) - - -def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(label.current_artist_collection, collections) - _clean_collection(label.album_collection, collections) - - -def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(artist.main_album_collection, collections) - _clean_collection(artist.feature_song_collection, collections) - _clean_collection(artist.label_collection, collections) - - -def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(album.label_collection, collections) - _clean_collection(album.song_collection, collections) - _clean_collection(album.artist_collection, collections) - - -def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): - _clean_collection(song.album_collection, collections) - _clean_collection(song.feature_artist_collection, collections) - _clean_collection(song.main_artist_collection, collections) - - class Page: """ This is an abstract class, laying out the @@ -246,7 +200,7 @@ class Page: # only certain database objects, have a source list if isinstance(music_object, INDEPENDENT_DB_OBJECTS): source: Source - for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE): + for source in music_object.source_collection.get_sources(self.SOURCE_TYPE): if music_object.already_fetched_from(source.hash_url): continue @@ -300,7 +254,7 @@ class Page: } if obj_type in fetch_map: - music_object = fetch_map[obj_type](source, stop_at_level) + music_object = fetch_map[obj_type](source, stop_at_level=stop_at_level) else: self.LOGGER.warning(f"Can't fetch details of type: {obj_type}") return None @@ -419,9 +373,10 @@ class Page: if song.target_collection.empty: song.target_collection.append(new_target) - sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) - if len(sources) == 0: - return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.") + if not song.source_collection.has_source_page(self.SOURCE_TYPE): + return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.") + + sources = song.source_collection.get_sources(self.SOURCE_TYPE) temp_target: Target = Target( relative_to_music_dir=False, @@ -448,14 +403,21 @@ class Page: self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.") return r - source = sources[0] - + skip_intervals = [] if not found_on_disc: - r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) + for source in sources: + r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) - if not r.is_fatal_error: - r.merge(self._post_process_targets(song, temp_target, - [] if found_on_disc else self.get_skip_intervals(song, source))) + if not r.is_fatal_error: + skip_intervals = self.get_skip_intervals(song, source) + break + + if temp_target.exists: + r.merge(self._post_process_targets( + song=song, + temp_target=temp_target, + interval_list=skip_intervals, + )) return r diff --git a/music_kraken/pages/bandcamp.py b/music_kraken/pages/bandcamp.py index 90064db..dcfebbf 100644 --- a/music_kraken/pages/bandcamp.py +++ b/music_kraken/pages/bandcamp.py @@ -185,7 +185,7 @@ class Bandcamp(Page): if li is None and li['href'] is not None: continue - source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE)) return Artist( name=name, diff --git a/music_kraken/pages/encyclopaedia_metallum.py b/music_kraken/pages/encyclopaedia_metallum.py index d9ce0ca..dba4527 100644 --- a/music_kraken/pages/encyclopaedia_metallum.py +++ b/music_kraken/pages/encyclopaedia_metallum.py @@ -486,7 +486,7 @@ class EncyclopaediaMetallum(Page): href = anchor["href"] if href is not None: - source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(href, referrer_page=self.SOURCE_TYPE)) # The following code is only legacy code, which I just kep because it doesn't harm. # The way ma returns sources changed. @@ -504,7 +504,7 @@ class EncyclopaediaMetallum(Page): if url is None: continue - source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE)) + source_list.append(Source.match_url(url, referrer_page=self.SOURCE_TYPE)) return source_list diff --git a/music_kraken/pages/musify.py b/music_kraken/pages/musify.py index 28ac0a9..ebcb8e6 100644 --- a/music_kraken/pages/musify.py +++ b/music_kraken/pages/musify.py @@ -1,7 +1,7 @@ from collections import defaultdict from dataclasses import dataclass from enum import Enum -from typing import List, Optional, Type, Union, Generator +from typing import List, Optional, Type, Union, Generator, Dict, Any from urllib.parse import urlparse import pycountry @@ -24,7 +24,7 @@ from ..objects import ( Lyrics, Artwork ) -from ..utils.config import logging_settings +from ..utils.config import logging_settings, main_settings from ..utils import string_processing, shared from ..utils.string_processing import clean_song_title from ..utils.support_classes.query import Query @@ -361,7 +361,7 @@ class Musify(Page): return Song( title=clean_song_title(song_title, artist_name=artist_list[0].name if len(artist_list) > 0 else None), - main_artist_list=artist_list, + feature_artist_list=artist_list, source_list=source_list ) @@ -503,14 +503,14 @@ class Musify(Page): source_list.append(Source( SourcePages.YOUTUBE, iframe["src"], - referer_page=self.SOURCE_TYPE + referrer_page=self.SOURCE_TYPE )) return Song( title=clean_song_title(track_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None), source_list=source_list, lyrics_list=lyrics_list, - main_artist_list=artist_list, + feature_artist_list=artist_list, album_list=album_list, artwork=artwork, ) @@ -652,10 +652,101 @@ class Musify(Page): return Song( title=clean_song_title(song_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None), tracksort=tracksort, - main_artist_list=artist_list, + feature_artist_list=artist_list, source_list=source_list ) + + def _parse_album(self, soup: BeautifulSoup) -> Album: + name: str = None + source_list: List[Source] = [] + artist_list: List[Artist] = [] + date: ID3Timestamp = None + + """ + if breadcrumb list has 4 elements, then + the -2 is the artist link, + the -1 is the album + """ + # breadcrumb + breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"}) + breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"}) + if len(breadcrumb_elements) == 4: + # album + album_crumb: BeautifulSoup = breadcrumb_elements[-1] + name = album_crumb.text.strip() + + # artist + artist_crumb: BeautifulSoup = breadcrumb_elements[-2] + anchor: BeautifulSoup = artist_crumb.find("a") + if anchor is not None: + href = anchor.get("href") + artist_source_list: List[Source] = [] + + if href is not None: + artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip())) + + span: BeautifulSoup = anchor.find("span") + if span is not None: + artist_list.append(Artist( + name=span.get_text(strip=True), + source_list=artist_source_list + )) + else: + self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case") + + # meta + meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"}) + if meta_url is not None: + url = meta_url.get("content") + if url is not None: + source_list.append(Source(self.SOURCE_TYPE, self.HOST + url)) + + meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"}) + if meta_name is not None: + _name = meta_name.get("content") + if _name is not None: + name = _name + + # album info + album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"}) + if album_info_ul is not None: + artist_anchor: BeautifulSoup + for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}): + # line 98 + artist_source_list: List[Source] = [] + + artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"}) + if artist_url_meta is not None: + artist_href = artist_url_meta.get("content") + if artist_href is not None: + artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href)) + + artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"}) + if artist_meta_name is not None: + artist_name = artist_meta_name.get("content") + if artist_name is not None: + artist_list.append(Artist( + name=artist_name, + source_list=artist_source_list + )) + + time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"}) + if time_soup is not None: + raw_datetime = time_soup.get("datetime") + if raw_datetime is not None: + try: + date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d") + except ValueError: + self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}") + + return Album( + title=name, + source_list=source_list, + artist_list=artist_list, + date=date + ) + def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: """ fetches album from source: @@ -690,30 +781,18 @@ class Musify(Page): new_song = self._parse_song_card(card_soup) album.song_collection.append(new_song) - if stop_at_level > 1: - song: Song - for song in album.song_collection: - sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) - for source in sources: - song.merge(self.fetch_song(source=source)) - album.update_tracksort() return album - def _get_artist_attributes(self, url: MusifyUrl) -> Artist: + def _fetch_initial_artist(self, url: MusifyUrl, source: Source, **kwargs) -> Artist: """ - fetches the main Artist attributes from this endpoint https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent - it needs to parse html - - :param url: - :return: """ r = self.connection.get(f"https://musify.club/{url.source_type.value}/{url.name_with_id}?_pjax=#bodyContent", name="artist_attributes_" + url.name_with_id) if r is None: - return Artist() + return Artist(source_list=[source]) soup = self.get_soup_from_response(r) @@ -812,7 +891,7 @@ class Musify(Page): href = additional_source.get("href") if href is None: continue - new_src = Source.match_url(href, referer_page=self.SOURCE_TYPE) + new_src = Source.match_url(href, referrer_page=self.SOURCE_TYPE) if new_src is None: continue source_list.append(new_src) @@ -828,7 +907,7 @@ class Musify(Page): notes=notes ) - def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None) -> Album: + def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album: """
""" - _id: Optional[str] = None - name: str = None - source_list: List[Source] = [] - timestamp: Optional[ID3Timestamp] = None - album_status = None - - def set_name(new_name: str): - nonlocal name - nonlocal artist_name - - # example of just setting not working: - # https://musify.club/release/unjoy-eurythmie-psychonaut-4-tired-numb-still-alive-2012-324067 - if new_name.count(" - ") != 1: - name = new_name - return - - potential_artist_list, potential_name = new_name.split(" - ") - unified_artist_list = string_processing.unify(potential_artist_list) - if artist_name is not None: - if string_processing.unify(artist_name) not in unified_artist_list: - name = new_name - return - - name = potential_name - return - - name = new_name + album_kwargs: Dict[str, Any] = { + "source_list": [], + } album_status_id = album_card.get("data-type") if album_status_id.isdigit(): @@ -889,9 +944,7 @@ class Musify(Page): album_status = AlbumStatus.BOOTLEG def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False): - nonlocal _id - nonlocal name - nonlocal source_list + nonlocal album_kwargs if _anchor is None: return @@ -899,20 +952,13 @@ class Musify(Page): href = _anchor.get("href") if href is not None: # add url to sources - source_list.append(Source( + album_kwargs["source_list"].append(Source( self.SOURCE_TYPE, self.HOST + href )) - # split id from url - split_href = href.split("-") - if len(split_href) > 1: - _id = split_href[-1] - - if not text_is_name: - return - - set_name(_anchor.text) + if text_is_name: + album_kwargs["title"] = clean_song_title(_anchor.text, artist_name) anchor_list = album_card.find_all("a", recursive=False) if len(anchor_list) > 0: @@ -923,7 +969,7 @@ class Musify(Page): if thumbnail is not None: alt = thumbnail.get("alt") if alt is not None: - set_name(alt) + album_kwargs["title"] = clean_song_title(alt, artist_name) image_url = thumbnail.get("src") else: @@ -940,7 +986,7 @@ class Musify(Page): 13.11.2021 """ - nonlocal timestamp + nonlocal album_kwargs italic_tagging_soup: BeautifulSoup = small_soup.find("i") if italic_tagging_soup is None: @@ -950,7 +996,7 @@ class Musify(Page): return raw_time = small_soup.text.strip() - timestamp = ID3Timestamp.strptime(raw_time, "%d.%m.%Y") + album_kwargs["date"] = ID3Timestamp.strptime(raw_time, "%d.%m.%Y") # parse small date card_footer_list = album_card.find_all("div", {"class": "card-footer"}) @@ -963,112 +1009,18 @@ class Musify(Page): else: self.LOGGER.debug("there is not even 1 footer in the album card") - return Album( - title=name, - source_list=source_list, - date=timestamp, - album_type=album_type, - album_status=album_status - ) + return Album(**album_kwargs) - def _parse_album(self, soup: BeautifulSoup) -> Album: - name: str = None - source_list: List[Source] = [] - artist_list: List[Artist] = [] - date: ID3Timestamp = None - - """ - if breadcrumb list has 4 elements, then - the -2 is the artist link, - the -1 is the album - """ - # breadcrumb - breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"}) - breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"}) - if len(breadcrumb_elements) == 4: - # album - album_crumb: BeautifulSoup = breadcrumb_elements[-1] - name = album_crumb.text.strip() - - # artist - artist_crumb: BeautifulSoup = breadcrumb_elements[-2] - anchor: BeautifulSoup = artist_crumb.find("a") - if anchor is not None: - href = anchor.get("href") - artist_source_list: List[Source] = [] - - if href is not None: - artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip())) - - span: BeautifulSoup = anchor.find("span") - if span is not None: - artist_list.append(Artist( - name=span.get_text(strip=True), - source_list=artist_source_list - )) - else: - self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case") - - # meta - meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"}) - if meta_url is not None: - url = meta_url.get("content") - if url is not None: - source_list.append(Source(self.SOURCE_TYPE, self.HOST + url)) - - meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"}) - if meta_name is not None: - _name = meta_name.get("content") - if _name is not None: - name = _name - - # album info - album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"}) - if album_info_ul is not None: - artist_anchor: BeautifulSoup - for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}): - # line 98 - artist_source_list: List[Source] = [] - - artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"}) - if artist_url_meta is not None: - artist_href = artist_url_meta.get("content") - if artist_href is not None: - artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href)) - - artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"}) - if artist_meta_name is not None: - artist_name = artist_meta_name.get("content") - if artist_name is not None: - artist_list.append(Artist( - name=artist_name, - source_list=artist_source_list - )) - - time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"}) - if time_soup is not None: - raw_datetime = time_soup.get("datetime") - if raw_datetime is not None: - try: - date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d") - except ValueError: - self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}") - - return Album( - title=name, - source_list=source_list, - artist_list=artist_list, - date=date - ) - - def _get_discography(self, url: MusifyUrl, artist_name: str = None, stop_at_level: int = 1) -> Generator[Album, None, None]: + def _fetch_artist_discography(self, artist: Artist, url: MusifyUrl, artist_name: str = None, **kwargs): """ POST https://musify.club/artist/filteralbums - ArtistID: 280348 - SortOrder.Property: dateCreated - SortOrder.IsAscending: false - X-Requested-With: XMLHttpRequest + ArtistID: 280348 + SortOrder.Property: dateCreated + SortOrder.IsAscending: false + X-Requested-With: XMLHttpRequest """ + _download_all = kwargs.get("download_all", False) + _album_type_blacklist = kwargs.get("album_type_blacklist", main_settings["album_type_blacklist"]) endpoint = self.HOST + "/" + url.source_type.value + "/filteralbums" @@ -1079,33 +1031,29 @@ class Musify(Page): "X-Requested-With": "XMLHttpRequest" }, name="discography_" + url.name_with_id) if r is None: - return [] - soup: BeautifulSoup = BeautifulSoup(r.content, features="html.parser") + return + + soup: BeautifulSoup = self.get_soup_from_response(r) for card_soup in soup.find_all("div", {"class": "card"}): - yield self._parse_album_card(card_soup, artist_name) + album = self._parse_album_card(card_soup, artist_name, **kwargs) + if album.album_type in _album_type_blacklist: + continue - def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: + artist.main_album_collection.append(album) + + def fetch_artist(self, source: Source, **kwargs) -> Artist: """ - fetches artist from source - + TODO [x] discography [x] attributes [] picture gallery - - Args: - source (Source): the source to fetch - stop_at_level: int = 1: if it is false, every album from discograohy will be fetched. Defaults to False. - - Returns: - Artist: the artist fetched """ url = parse_url(source.url) - artist = self._get_artist_attributes(url) - - artist.main_album_collection.extend(self._get_discography(url, artist.name)) + artist = self._fetch_initial_artist(url, source=source, **kwargs) + self._fetch_artist_discography(artist, url, artist.name, **kwargs) return artist diff --git a/music_kraken/pages/youtube_music/_list_render.py b/music_kraken/pages/youtube_music/_list_render.py index 8076e54..bb6f40b 100644 --- a/music_kraken/pages/youtube_music/_list_render.py +++ b/music_kraken/pages/youtube_music/_list_render.py @@ -25,7 +25,6 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]: results.extend(parse_renderer(sub_renderer)) return results - def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]: return parse_run_list(renderer.get("text", {}).get("runs", [])) @@ -54,19 +53,11 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]: for result in results: _map[type(result)].append(result) - for song in song_list: + if len(song_list) == 1: + song = song_list[0] + song.feature_artist_collection.extend(artist_list) song.album_collection.extend(album_list) - song.main_artist_collection.extend(artist_list) - - for album in album_list: - album.artist_collection.extend(artist_list) - - if len(song_list) > 0: - return song_list - if len(album_list) > 0: - return album_list - if len(artist_list) > 0: - return artist_list + return [song] return results diff --git a/music_kraken/pages/youtube_music/_music_object_render.py b/music_kraken/pages/youtube_music/_music_object_render.py index f10d11a..831d50d 100644 --- a/music_kraken/pages/youtube_music/_music_object_render.py +++ b/music_kraken/pages/youtube_music/_music_object_render.py @@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: _temp_nav = run_element.get("navigationEndpoint", {}) is_video = "watchEndpoint" in _temp_nav - navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {}) + navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {})) element_type = PageType.SONG page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "") @@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: except ValueError: return - element_id = navigation_endpoint.get("videoId" if is_video else "browseId") + element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId")) element_text = run_element.get("text") if element_id is None or element_text is None: @@ -60,7 +60,11 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]): source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}") - return Song(title=clean_song_title(element_text), source_list=[source]) + + return Song( + title=clean_song_title(element_text), + source_list=[source] + ) if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]): source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}") diff --git a/music_kraken/pages/youtube_music/youtube_music.py b/music_kraken/pages/youtube_music/youtube_music.py index 6ecbeaf..bbb8d22 100644 --- a/music_kraken/pages/youtube_music/youtube_music.py +++ b/music_kraken/pages/youtube_music/youtube_music.py @@ -8,6 +8,7 @@ import json from dataclasses import dataclass import re from functools import lru_cache +from collections import defaultdict import youtube_dl from youtube_dl.extractor.youtube import YoutubeIE @@ -17,7 +18,7 @@ from ...utils.exception.config import SettingValueError from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.string_processing import clean_song_title -from ...utils import get_current_millis +from ...utils import get_current_millis, traverse_json_path from ...utils import dump_to_file @@ -30,12 +31,16 @@ from ...objects import ( Song, Album, Label, - Target + Target, + Lyrics, + FormattedText ) from ...connection import Connection +from ...utils.enums.album import AlbumType from ...utils.support_classes.download_result import DownloadResult from ._list_render import parse_renderer +from ._music_object_render import parse_run_element from .super_youtube import SuperYouTube @@ -162,6 +167,12 @@ class MusicKrakenYoutubeIE(YoutubeIE): +ALBUM_TYPE_MAP = { + "Single": AlbumType.SINGLE, + "Album": AlbumType.STUDIO_ALBUM, + "EP": AlbumType.EP, +} + class YoutubeMusic(SuperYouTube): # CHANGE @@ -401,7 +412,7 @@ class YoutubeMusic(SuperYouTube): return results def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: - artist = Artist() + artist = Artist(source_list=[source]) # construct the request url = urlparse(source.url) @@ -421,6 +432,19 @@ class YoutubeMusic(SuperYouTube): if DEBUG: dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) + # artist details + data: dict = r.json() + header = data.get("header", {}) + musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) + + title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) + subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) + + if len(title_runs) > 0: + artist.name = title_runs[0].get("text", artist.name) + + + # fetch discography renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) @@ -465,6 +489,46 @@ class YoutubeMusic(SuperYouTube): if DEBUG: dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) + data = r.json() + + # album details + header = data.get("header", {}) + musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) + + title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) + subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) + + if len(title_runs) > 0: + album.title = title_runs[0].get("text", album.title) + + def other_parse_run(run: dict) -> str: + nonlocal album + + if "text" not in run: + return + text = run["text"] + + is_text_field = len(run.keys()) == 1 + + # regex that text is a year + if is_text_field and re.match(r"\d{4}", text): + album.date = ID3Timestamp.strptime(text, "%Y") + return + + if text in ALBUM_TYPE_MAP: + album.album_type = ALBUM_TYPE_MAP[text] + return + + if not is_text_field: + r = parse_run_element(run) + if r is not None: + album.add_list_of_other_objects([r]) + return + + for _run in subtitle_runs: + other_parse_run(_run) + + # tracklist renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) @@ -472,20 +536,67 @@ class YoutubeMusic(SuperYouTube): for i, content in enumerate(renderer_list): dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False) - results = [] - - """ - cant use fixed indices, because if something has no entries, the list dissappears - instead I have to try parse everything, and just reject community playlists and profiles. - """ for renderer in renderer_list: - results.extend(parse_renderer(renderer)) + album.add_list_of_other_objects(parse_renderer(renderer)) - album.add_list_of_other_objects(results) + for song in album.song_collection: + for song_source in song.source_collection: + song_source.additional_data["playlist_id"] = browse_id return album + def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str: + request_data = { + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, + "videoId": video_id, + } + if playlist_id is not None: + request_data["playlistId"] = playlist_id + + tab_request = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"), + json=request_data, + name=f"fetch_song_tabs_{video_id}.json", + ) + + if tab_request is None: + return None + + dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False) + + tab_data: dict = tab_request.json() + + tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[]) + browse_id = None + for tab in tabs: + pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="") + if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower(): + browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None) + break + + if browse_id is None: + return None + + + r = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"), + json={ + "browseId": browse_id, + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}} + }, + name=f"fetch_song_lyrics_{video_id}.json" + ) + + dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False) + + data = r.json() + lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None) + if lyrics_text is None: + return None + + return Lyrics(FormattedText(plain=lyrics_text)) + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: ydl_res: dict = {} @@ -498,7 +609,19 @@ class YoutubeMusic(SuperYouTube): self.fetch_media_url(source=source, ydl_res=ydl_res) - artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic") + artist_names = [] + uploader = ydl_res.get("uploader", "") + if uploader.endswith(" - Topic"): + artist_names = [uploader.rstrip(" - Topic")] + + artist_list = [ + Artist( + name=name, + source_list=[Source( + SourcePages.YOUTUBE_MUSIC, + f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" + )] + ) for name in artist_names] album_list = [] if "album" in ydl_res: @@ -507,25 +630,57 @@ class YoutubeMusic(SuperYouTube): date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"), )) - return Song( + artist_name = artist_names[0] if len(artist_names) > 0 else None + song = Song( title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)), note=ydl_res.get("descriptions"), album_list=album_list, length=int(ydl_res.get("duration", 0)) * 1000, artwork=Artwork(*ydl_res.get("thumbnails", [])), - main_artist_list=[Artist( - name=artist_name, - source_list=[Source( - SourcePages.YOUTUBE_MUSIC, - f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" - )] - )], + main_artist_list=artist_list, source_list=[Source( SourcePages.YOUTUBE_MUSIC, f"https://music.youtube.com/watch?v={ydl_res.get('id')}" ), source], ) + # other song details + parsed_url = urlparse(source.url) + browse_id = parse_qs(parsed_url.query)['v'][0] + request_data = { + "captionParams": {}, + "context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, + "videoId": browse_id, + } + if "playlist_id" in source.additional_data: + request_data["playlistId"] = source.additional_data["playlist_id"] + + initial_details = self.yt_music_connection.post( + url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"), + json=request_data, + name=f"fetch_song_{browse_id}.json", + ) + + if initial_details is None: + return song + + dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False) + + data = initial_details.json() + video_details = data.get("videoDetails", {}) + + browse_id = video_details.get("videoId", browse_id) + song.title = video_details.get("title", song.title) + if video_details.get("isLiveContent", False): + for album in song.album_list: + album.album_type = AlbumType.LIVE_ALBUM + for thumbnail in video_details.get("thumbnails", []): + song.artwork.append(**thumbnail) + + song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId"))) + + return song + def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict: def _get_best_format(format_list: List[Dict]) -> dict: diff --git a/music_kraken/utils/__init__.py b/music_kraken/utils/__init__.py index 9226441..a8d658b 100644 --- a/music_kraken/utils/__init__.py +++ b/music_kraken/utils/__init__.py @@ -3,24 +3,35 @@ from pathlib import Path import json import logging import inspect +from typing import List, Union from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK from .config import config, read_config, write_config from .enums.colors import BColors from .path_manager import LOCATIONS +from .hacking import merge_args """ IO functions """ def _apply_color(msg: str, color: BColors) -> str: + if not isinstance(msg, str): + msg = str(msg) + + endc = BColors.ENDC.value + if color is BColors.ENDC: return msg + + msg = msg.replace(BColors.ENDC.value, BColors.ENDC.value + color.value) + return color.value + msg + BColors.ENDC.value -def output(msg: str, color: BColors = BColors.ENDC): - print(_apply_color(msg, color)) +@merge_args(print) +def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs): + print(*(_apply_color(s, color) for s in msg), **kwargs) def user_input(msg: str, color: BColors = BColors.ENDC): @@ -71,6 +82,43 @@ def object_trace(obj): misc functions """ +def traverse_json_path(data, path: Union[str, List[str]], default=None): + """ + Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices. + """ + + if isinstance(path, str): + path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".") + path = [p for p in path.split(".") if len(p) > 0] + + if len(path) <= 0: + return data + + current = path[0] + path = path[1:] + + new_data = None + + if isinstance(data, dict): + new_data = data.get(current) + + elif isinstance(data, list): + try: + new_data = data[int(current)] + except (IndexError, ValueError): + pass + + if new_data is None: + return default + + return traverse_json_path(data=new_data, path=path, default=default) + +_auto_increment = 0 +def generate_id() -> int: + global _auto_increment + _auto_increment += 1 + return _auto_increment + def get_current_millis() -> int: dt = datetime.now() return int(dt.microsecond / 1_000) diff --git a/music_kraken/utils/enums/source.py b/music_kraken/utils/enums/source.py index a5e213e..be3171f 100644 --- a/music_kraken/utils/enums/source.py +++ b/music_kraken/utils/enums/source.py @@ -9,42 +9,32 @@ class SourceTypes(Enum): class SourcePages(Enum): - YOUTUBE = "youtube" - MUSIFY = "musify" - YOUTUBE_MUSIC = "youtube music" - GENIUS = "genius" - MUSICBRAINZ = "musicbrainz" + YOUTUBE = "youtube", "https://www.youtube.com/" + MUSIFY = "musify", "https://musify.club/" + YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/" + GENIUS = "genius", "https://genius.com/" + MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/" ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum" - BANDCAMP = "bandcamp" - DEEZER = "deezer" - SPOTIFY = "spotify" + BANDCAMP = "bandcamp", "https://bandcamp.com/" + DEEZER = "deezer", "https://www.deezer.com/" + SPOTIFY = "spotify", "https://open.spotify.com/" # This has nothing to do with audio, but bands can be here - WIKIPEDIA = "wikipedia" - INSTAGRAM = "instagram" - FACEBOOK = "facebook" - TWITTER = "twitter" # I will use nitter though lol - MYSPACE = "myspace" # Yes somehow this ancient site is linked EVERYWHERE + WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page" + INSTAGRAM = "instagram", "https://www.instagram.com/" + FACEBOOK = "facebook", "https://www.facebook.com/" + TWITTER = "twitter", "https://twitter.com/" + MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE - MANUAL = "manual" + MANUAL = "manual", "" - PRESET = "preset" + PRESET = "preset", "" - @classmethod - def get_homepage(cls, attribute) -> str: - homepage_map = { - cls.YOUTUBE: "https://www.youtube.com/", - cls.MUSIFY: "https://musify.club/", - cls.MUSICBRAINZ: "https://musicbrainz.org/", - cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/", - cls.GENIUS: "https://genius.com/", - cls.BANDCAMP: "https://bandcamp.com/", - cls.DEEZER: "https://www.deezer.com/", - cls.INSTAGRAM: "https://www.instagram.com/", - cls.FACEBOOK: "https://www.facebook.com/", - cls.SPOTIFY: "https://open.spotify.com/", - cls.TWITTER: "https://twitter.com/", - cls.MYSPACE: "https://myspace.com/", - cls.WIKIPEDIA: "https://en.wikipedia.org/wiki/Main_Page" - } - return homepage_map[attribute] \ No newline at end of file + def __new__(cls, value, homepage = None): + member = object.__new__(cls) + + member._value_ = value + member.homepage = homepage + + return member + \ No newline at end of file diff --git a/music_kraken/utils/exception/__init__.py b/music_kraken/utils/exception/__init__.py index 4e1f95f..746fe78 100644 --- a/music_kraken/utils/exception/__init__.py +++ b/music_kraken/utils/exception/__init__.py @@ -1 +1,11 @@ -__all__ = ["config"] +class MKBaseException(Exception): + def __init__(self, message: str = None, **kwargs) -> None: + self.message = message + super().__init__(message, **kwargs) + + +class MKFrontendException(MKBaseException): + pass + +class MKInvalidInputException(MKFrontendException): + pass diff --git a/music_kraken/utils/hacking.py b/music_kraken/utils/hacking.py index e68356e..0e949d8 100644 --- a/music_kraken/utils/hacking.py +++ b/music_kraken/utils/hacking.py @@ -78,7 +78,14 @@ def _merge( drop_args = [] if drop_kwonlyargs is None: drop_kwonlyargs = [] - source_spec = inspect.getfullargspec(source) + + is_builtin = False + try: + source_spec = inspect.getfullargspec(source) + except TypeError: + is_builtin = True + source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], []) + dest_spec = inspect.getfullargspec(dest) if source_spec.varargs or source_spec.varkw: @@ -128,13 +135,15 @@ def _merge( 'co_kwonlyargcount': len(kwonlyargs_merged), 'co_posonlyargcount': dest.__code__.co_posonlyargcount, 'co_nlocals': len(args_all), - 'co_flags': source.__code__.co_flags, 'co_varnames': args_all, 'co_filename': dest.__code__.co_filename, 'co_name': dest.__code__.co_name, 'co_firstlineno': dest.__code__.co_firstlineno, } + if hasattr(source, "__code__"): + replace_kwargs['co_flags'] = source.__code__.co_flags + if PY310: replace_kwargs['co_linetable'] = dest.__code__.co_linetable else: @@ -151,7 +160,7 @@ def _merge( len(kwonlyargs_merged), _blank.__code__.co_nlocals, _blank.__code__.co_stacksize, - source.__code__.co_flags, + source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags, _blank.__code__.co_code, (), (), args_all, dest.__code__.co_filename, dest.__code__.co_name, @@ -171,6 +180,9 @@ def _merge( dest_ret = dest.__annotations__['return'] for v in ('__kwdefaults__', '__annotations__'): + if not hasattr(source, v): + continue + out = getattr(source, v) if out is None: out = {} diff --git a/music_kraken/utils/shared.py b/music_kraken/utils/shared.py index a2b06b8..8f671f9 100644 --- a/music_kraken/utils/shared.py +++ b/music_kraken/utils/shared.py @@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_PAGES = DEBUG and False DEBUG_DUMP = DEBUG and False +DEBUG_PRINT_ID = DEBUG and True if DEBUG: print("DEBUG ACTIVE") diff --git a/music_kraken/utils/string_processing.py b/music_kraken/utils/string_processing.py index 9acd3c8..22ae63e 100644 --- a/music_kraken/utils/string_processing.py +++ b/music_kraken/utils/string_processing.py @@ -6,6 +6,7 @@ from functools import lru_cache from transliterate.exceptions import LanguageDetectionError from transliterate import translit from pathvalidate import sanitize_filename +from urllib.parse import urlparse, ParseResult, parse_qs COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( @@ -21,6 +22,7 @@ def unify(string: str) -> str: returns a unified str, to make comparisons easy. a unified string has the following attributes: - is lowercase + - is transliterated to Latin characters from e.g. Cyrillic """ if string is None: @@ -30,8 +32,9 @@ def unify(string: str) -> str: string = translit(string, reversed=True) except LanguageDetectionError: pass - - return string.lower() + + string = unify_punctuation(string) + return string.lower().strip() def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: @@ -49,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni string = string[1:] string = string.replace("/", "_").replace("\\", "_") + + try: + string = translit(string, reversed=True) + except LanguageDetectionError: + pass + string = sanitize_filename(string) + return string if isinstance(string, Path): @@ -127,13 +137,45 @@ UNIFY_TO = " " ALLOWED_LENGTH_DISTANCE = 20 -def unify_punctuation(to_unify: str) -> str: +def unify_punctuation(to_unify: str, unify_to: str = UNIFY_TO) -> str: for char in string.punctuation: - to_unify = to_unify.replace(char, UNIFY_TO) + to_unify = to_unify.replace(char, unify_to) return to_unify -def hash_url(url: str) -> int: - return url.strip().lower().lstrip("https://").lstrip("http://") +@lru_cache(maxsize=128) +def hash_url(url: Union[str, ParseResult]) -> str: + if isinstance(url, str): + url = urlparse(url) + + unify_to = "-" + + def unify_part(part: str) -> str: + nonlocal unify_to + return unify_punctuation(part.lower(), unify_to=unify_to).strip(unify_to) + + # netloc + netloc = unify_part(url.netloc) + if netloc.startswith("www" + unify_to): + netloc = netloc[3 + len(unify_to):] + + # query + query = url.query + query_dict: Optional[dict] = None + try: + query_dict: dict = parse_qs(url.query, strict_parsing=True) + except ValueError: + # the query couldn't be parsed + pass + + if isinstance(query_dict, dict): + # sort keys alphabetically + query = "" + for key, value in sorted(query_dict.items(), key=lambda i: i[0]): + query += f"{key.strip()}-{''.join(i.strip() for i in value)}" + + r = f"{netloc}_{unify_part(url.path)}_{unify_part(query)}" + r = r.lower().strip() + return r def remove_feature_part_from_track(title: str) -> str: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_hash_url.py b/tests/test_hash_url.py new file mode 100644 index 0000000..f87b2ff --- /dev/null +++ b/tests/test_hash_url.py @@ -0,0 +1,35 @@ +import unittest + +from music_kraken.utils.string_processing import hash_url + + +class TestCollection(unittest.TestCase): + def test_remove_schema(self): + self.assertFalse(hash_url("https://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("ftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("sftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + self.assertFalse(hash_url("http://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https")) + + def test_no_punctuation(self): + self.assertNotIn(hash_url("https://www.you_tube.com/watch?v=3jZ_D3ELwOQ"), "you_tube") + self.assertNotIn(hash_url("https://docs.gitea.com/next/install.ation/comparison"), ".") + + def test_three_parts(self): + """ + The url is parsed into three parts [netloc; path; query] + Which are then appended to each other with an underscore between. + """ + + self.assertTrue(hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web").count("_") == 2) + + def test_sort_query(self): + """ + The query is sorted alphabetically + """ + hashed = hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web") + sorted_keys = ["ia-", "q-", "t-"] + + self.assertTrue(hashed.index(sorted_keys[0]) < hashed.index(sorted_keys[1]) < hashed.index(sorted_keys[2])) + +if __name__ == "__main__": + unittest.main()