From 6a8374d595d3b0dd4ae84fedb94977ad432b5703 Mon Sep 17 00:00:00 2001 From: Lars Noack Date: Mon, 15 Jan 2024 12:48:36 +0100 Subject: [PATCH] fix: fixed youtube music --- src/actual_donwload.py | 5 +- src/music_kraken/cli/main_downloader.py | 129 +++++++++--------- src/music_kraken/download/page_attributes.py | 6 +- src/music_kraken/objects/collection.py | 2 +- src/music_kraken/objects/parents.py | 13 ++ src/music_kraken/objects/song.py | 61 +++++++++ src/music_kraken/pages/abstract.py | 136 ++++++++++--------- 7 files changed, 216 insertions(+), 136 deletions(-) diff --git a/src/actual_donwload.py b/src/actual_donwload.py index b7a468c..6ad3c34 100644 --- a/src/actual_donwload.py +++ b/src/actual_donwload.py @@ -42,9 +42,8 @@ if __name__ == "__main__": ] bandcamp_test = [ - "s: #a Ghost Bath", - "0", - "d: 4" + "s: #a Only Smile", + "d: 1", ] diff --git a/src/music_kraken/cli/main_downloader.py b/src/music_kraken/cli/main_downloader.py index 3958150..52d9847 100644 --- a/src/music_kraken/cli/main_downloader.py +++ b/src/music_kraken/cli/main_downloader.py @@ -16,7 +16,6 @@ from ..download.page_attributes import Pages from ..pages import Page from ..objects import Song, Album, Artist, DatabaseObject - """ This is the implementation of the Shell @@ -107,6 +106,7 @@ def get_existing_genre() -> List[str]: return existing_genres + def get_genre(): existing_genres = get_existing_genre() for i, genre_option in enumerate(existing_genres): @@ -129,19 +129,18 @@ def get_genre(): verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower() if verification in agree_inputs: return new_genre - - + + def help_message(): print() print(main_settings["happy_messages"]) print() - class Downloader: def __init__( self, - exclude_pages: Set[Type[Page]] = None, + exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, max_displayed_options: int = 10, option_digits: int = 3, @@ -149,23 +148,22 @@ class Downloader: process_metadata_anyway: bool = False, ) -> None: self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady) - + self.page_dict: Dict[str, Type[Page]] = dict() - + self.max_displayed_options = max_displayed_options self.option_digits: int = option_digits - + self.current_results: Results = None self._result_history: List[Results] = [] - + self.genre = genre or get_genre() self.process_metadata_anyway = process_metadata_anyway - + print() print(f"Downloading to: \"{self.genre}\"") print() - def print_current_options(self): self.page_dict = dict() @@ -176,12 +174,13 @@ class Downloader: if isinstance(option, Option): print(f"{option.index:0{self.option_digits}} {option.music_object.option_string}") else: - prefix = ALPHABET[page_count%len(ALPHABET)] - print(f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------") - + prefix = ALPHABET[page_count % len(ALPHABET)] + print( + f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------") + self.page_dict[prefix] = option self.page_dict[option.__name__] = option - + page_count += 1 print() @@ -189,47 +188,47 @@ class Downloader: def set_current_options(self, current_options: Results): if main_settings["result_history"]: self._result_history.append(current_options) - + if main_settings["history_length"] != -1: if len(self._result_history) > main_settings["history_length"]: self._result_history.pop(0) - + self.current_results = current_options - + def previous_option(self) -> bool: if not main_settings["result_history"]: print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.") return False - + if len(self._result_history) <= 1: print(f"No results in history.") return False self._result_history.pop() self.current_results = self._result_history[-1] return True - + def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query: song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True) album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True) artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True) - + if song is not None: if album is not None: song.album_collection.append(album) if artist is not None: song.main_artist_collection.append(artist) return Query(raw_query=query, music_object=song) - + if album is not None: if artist is not None: album.artist_collection.append(artist) return Query(raw_query=query, music_object=album) - + if artist is not None: return Query(raw_query=query, music_object=artist) - + return Query(raw_query=query) - + def search(self, query: str): if re.match(URL_PATTERN, query) is not None: try: @@ -243,58 +242,57 @@ class Downloader: self.set_current_options(PageResults(page, data_object.options)) self.print_current_options() return - + special_characters = "#\\" query = query + " " - + key_text = {} - + skip_next = False escape_next = False new_text = "" latest_key: str = None for i in range(len(query) - 1): current_char = query[i] - next_char = query[i+1] - + next_char = query[i + 1] + if skip_next: skip_next = False continue - + if escape_next: new_text += current_char escape_next = False - + # escaping if current_char == "\\": if next_char in special_characters: escape_next = True continue - + if current_char == "#": if latest_key is not None: key_text[latest_key] = new_text new_text = "" - + latest_key = next_char skip_next = True continue - + new_text += current_char - + if latest_key is not None: key_text[latest_key] = new_text - - + parsed_query: Query = self._process_parsed(key_text, query) - + self.set_current_options(self.pages.search(parsed_query)) self.print_current_options() - + def goto(self, index: int): page: Type[Page] music_object: DatabaseObject - + try: page, music_object = self.current_results.get_music_object_by_index(index) except KeyError: @@ -302,23 +300,22 @@ class Downloader: print(f"The option {index} doesn't exist.") print() return - + self.pages.fetch_details(music_object) print(music_object) print(music_object.options) self.set_current_options(PageResults(page, music_object.options)) - + self.print_current_options() - - + def download(self, download_str: str, download_all: bool = False) -> bool: to_download: List[DatabaseObject] = [] if re.match(URL_PATTERN, download_str) is not None: _, music_objects = self.pages.fetch_url(download_str) to_download.append(music_objects) - + else: index: str for index in download_str.split(", "): @@ -327,66 +324,68 @@ class Downloader: print(f"Every download thingie has to be an index, not {index}.") print() return False - + for index in download_str.split(", "): to_download.append(self.current_results.get_music_object_by_index(int(index))[1]) - + print() print("Downloading:") for download_object in to_download: print(download_object.option_string) print() - + _result_map: Dict[DatabaseObject, DownloadResult] = dict() - + for database_object in to_download: - r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all, process_metadata_anyway=self.process_metadata_anyway) + r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all, + process_metadata_anyway=self.process_metadata_anyway) _result_map[database_object] = r - + for music_object, result in _result_map.items(): print() print(music_object.option_string) print(result) - + return True - + def process_input(self, input_str: str) -> bool: input_str = input_str.strip() processed_input: str = input_str.lower() - + if processed_input in EXIT_COMMANDS: return True - + if processed_input == ".": self.print_current_options() return False - + if processed_input == "..": if self.previous_option(): self.print_current_options() return False - + if processed_input.startswith("s: "): self.search(input_str[3:]) return False - + if processed_input.startswith("d: "): return self.download(input_str[3:]) - + if processed_input.isdigit(): self.goto(int(processed_input)) return False - + if processed_input != "help": print("Invalid input.") help_message() return False - + def mainloop(self): while True: if self.process_input(input("> ")): return + @cli_function def download( genre: str = None, @@ -403,9 +402,9 @@ def download( print("Restart the programm to use it.") else: print("Something went wrong configuring.") - + shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway) - + if command_list is not None: for command in command_list: shell.process_input(command) @@ -414,5 +413,5 @@ def download( if direct_download_url is not None: if shell.download(direct_download_url, download_all=download_all): return - + shell.mainloop() diff --git a/src/music_kraken/download/page_attributes.py b/src/music_kraken/download/page_attributes.py index c655d11..27ae19c 100644 --- a/src/music_kraken/download/page_attributes.py +++ b/src/music_kraken/download/page_attributes.py @@ -98,8 +98,10 @@ class Pages: def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: if not isinstance(music_object, INDEPENDENT_DB_OBJECTS): return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.") - - _page_types = set() + + self.fetch_details(music_object) + + _page_types = set(self._source_to_page) for src in music_object.source_collection.source_pages: if src in self._source_to_page: _page_types.add(self._source_to_page[src]) diff --git a/src/music_kraken/objects/collection.py b/src/music_kraken/objects/collection.py index a3b1bbf..21330fa 100644 --- a/src/music_kraken/objects/collection.py +++ b/src/music_kraken/objects/collection.py @@ -315,7 +315,7 @@ class Collection(Generic[T]): yield element def __merge__(self, __other: Collection, override: bool = False): - self.extend(__other.shallow_list, from_map=True) + self.extend(__other._data, from_map=True) def __getitem__(self, item: int): if item < len(self._data): diff --git a/src/music_kraken/objects/parents.py b/src/music_kraken/objects/parents.py index e1d07f5..bdaa960 100644 --- a/src/music_kraken/objects/parents.py +++ b/src/music_kraken/objects/parents.py @@ -1,6 +1,7 @@ from __future__ import annotations import random +from collections import defaultdict from functools import lru_cache from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set @@ -131,6 +132,18 @@ class OuterProxy: return super().__setattr__(__name, __value) + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): + pass + + def add_list_of_other_objects(self, object_list: List[OuterProxy]): + d: Dict[Type[OuterProxy], List[OuterProxy]] = defaultdict(list) + + for db_object in object_list: + d[type(db_object)].append(db_object) + + for key, value in d.items(): + self._add_other_db_objects(key, value) + def __hash__(self): """ :raise: IsDynamicException diff --git a/src/music_kraken/objects/song.py b/src/music_kraken/objects/song.py index 0b6b553..f418330 100644 --- a/src/music_kraken/objects/song.py +++ b/src/music_kraken/objects/song.py @@ -95,6 +95,22 @@ class Song(Base): "feature_song_collection": self } + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): + if object_type is Song: + return + + if isinstance(object_list, Lyrics): + self.lyrics_collection.extend(object_list) + return + + if isinstance(object_list, Artist): + self.main_artist_collection.extend(object_list) + return + + if isinstance(object_list, Album): + self.album_collection.extend(object_list) + return + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ @@ -229,6 +245,22 @@ class Album(Base): "main_artist_collection": self.artist_collection } + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): + if object_type is Song: + self.song_collection.extend(object_list) + return + + if object_type is Artist: + self.artist_collection.extend(object_list) + return + + if object_type is Album: + return + + if object_type is Label: + self.label_collection.extend(object_list) + return + @property def indexing_values(self) -> List[Tuple[str, object]]: return [ @@ -436,6 +468,23 @@ class Artist(Base): "current_artist_collection": self } + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): + if object_type is Song: + # this doesn't really make sense + # self.feature_song_collection.extend(object_list) + return + + if object_type is Artist: + return + + if object_type is Album: + self.main_album_collection.extend(object_list) + return + + if object_type is Label: + self.label_collection.extend(object_list) + return + @property def options(self) -> List[P]: options = [self, *self.main_album_collection.shallow_list, *self.feature_album] @@ -618,6 +667,18 @@ class Label(Base): *[('url', source.url) for source in self.source_collection] ] + def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): + if object_type is Song: + return + + if object_type is Artist: + self.current_artist_collection.extend(object_list) + return + + if object_type is Album: + self.album_collection.extend(object_list) + return + @property def options(self) -> List[P]: options = [self] diff --git a/src/music_kraken/pages/abstract.py b/src/music_kraken/pages/abstract.py index 7a8751c..50a4e7b 100644 --- a/src/music_kraken/pages/abstract.py +++ b/src/music_kraken/pages/abstract.py @@ -28,7 +28,6 @@ from ..utils.support_classes.query import Query from ..utils.support_classes.download_result import DownloadResult from ..utils.string_processing import fit_to_file_system - INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song] INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]] @@ -42,22 +41,22 @@ class NamingDict(dict): "album": "album.title", "album_type": "album.album_type_string" } - + def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None): self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict() - + super().__init__(values) self["audio_format"] = main_settings["audio_format"] - + def add_object(self, music_object: DatabaseObject): self.object_mappings[type(music_object).__name__.lower()] = music_object - + def copy(self) -> dict: return type(self)(super().copy(), self.object_mappings.copy()) - + def __getitem__(self, key: str) -> str: return fit_to_file_system(super().__getitem__(key)) - + def default_value_for_name(self, name: str) -> str: return f'Various {name.replace("_", " ").title()}' @@ -67,23 +66,23 @@ class NamingDict(dict): return self.default_value_for_name(key) key = self.CUSTOM_KEYS[key] - + frag_list = key.split(".") - + object_name = frag_list[0].strip().lower() attribute_name = frag_list[-1].strip().lower() if object_name not in self.object_mappings: return self.default_value_for_name(attribute_name) - + music_object = self.object_mappings[object_name] try: value = getattr(music_object, attribute_name) if value is None: return self.default_value_for_name(attribute_name) - + return str(value) - + except AttributeError: return self.default_value_for_name(attribute_name) @@ -133,6 +132,7 @@ def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]) _clean_collection(song.feature_artist_collection, collections) _clean_collection(song.main_artist_collection, collections) + def clean_object(dirty_object: DatabaseObject) -> DatabaseObject: if isinstance(dirty_object, INDEPENDENT_DB_OBJECTS): collections = { @@ -147,20 +147,22 @@ def clean_object(dirty_object: DatabaseObject) -> DatabaseObject: _clean_music_object(dirty_object, collections) return dirty_object - + + def build_new_object(new_object: DatabaseObject) -> DatabaseObject: new_object = clean_object(new_object) new_object.compile(merge_into=False) - + return new_object + def merge_together(old_object: DatabaseObject, new_object: DatabaseObject, do_compile: bool = True) -> DatabaseObject: new_object = clean_object(new_object) - + old_object.merge(new_object) if do_compile and False: old_object.compile(merge_into=False) - + return old_object @@ -169,60 +171,59 @@ class Page: This is an abstract class, laying out the functionality for every other class fetching something """ - + SOURCE_TYPE: SourcePages LOGGER = logging.getLogger("this shouldn't be used") - + # set this to true, if all song details can also be fetched by fetching album details NO_ADDITIONAL_DATA_FROM_SONG = False - def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: return None - + def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: return BeautifulSoup(r.content, "html.parser") # to search stuff def search(self, query: Query) -> List[DatabaseObject]: music_object = query.music_object - + search_functions = { Song: self.song_search, Album: self.album_search, Artist: self.artist_search, Label: self.label_search } - + if type(music_object) in search_functions: r = search_functions[type(music_object)](music_object) if r is not None and len(r) > 0: return r - + r = [] for default_query in query.default_search: for single_option in self.general_search(default_query): r.append(single_option) - + return r - + def general_search(self, search_query: str) -> List[DatabaseObject]: return [] - + def label_search(self, label: Label) -> List[Label]: return [] - + def artist_search(self, artist: Artist) -> List[Artist]: return [] - + def album_search(self, album: Album) -> List[Album]: return [] - + def song_search(self, song: Song) -> List[Song]: return [] - - def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1, post_process: bool = True) -> DatabaseObject: + def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1, + post_process: bool = True) -> DatabaseObject: """ when a music object with lacking data is passed in, it returns the SAME object **(no copy)** with more detailed data. @@ -263,7 +264,9 @@ class Page: return music_object - def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]: + def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, + enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[ + DatabaseObject]: obj_type = self.get_source_type(source) if obj_type is None: @@ -272,16 +275,16 @@ class Page: if enforce_type != obj_type and enforce_type is not None: self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}") return None - + music_object: DatabaseObject = None - + fetch_map = { Song: self.fetch_song, Album: self.fetch_album, Artist: self.fetch_artist, Label: self.fetch_label } - + if obj_type in fetch_map: music_object = fetch_map[obj_type](source, stop_at_level) else: @@ -294,10 +297,11 @@ class Page: collection = music_object.__getattribute__(collection_str) for sub_element in collection: - sub_element.merge(self.fetch_details(sub_element, stop_at_level=stop_at_level-1, post_process=False)) - + sub_element.merge( + self.fetch_details(sub_element, stop_at_level=stop_at_level - 1, post_process=False)) + return music_object - + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: return Song() @@ -310,41 +314,42 @@ class Page: def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: return Label() - def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: + def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, + process_metadata_anyway: bool = False) -> DownloadResult: naming_dict: NamingDict = NamingDict({"genre": genre}) - + def fill_naming_objects(naming_music_object: DatabaseObject): nonlocal naming_dict - + for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES: collection: Collection = getattr(naming_music_object, collection_name) - + if collection.empty: continue - + dom_ordered_music_object: DatabaseObject = collection[0] naming_dict.add_object(dom_ordered_music_object) return fill_naming_objects(dom_ordered_music_object) - + fill_naming_objects(music_object) - + return self._download(music_object, naming_dict, download_all, process_metadata_anyway=process_metadata_anyway) - - def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False, skip_details: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: + def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False, + skip_details: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: skip_next_details = skip_details - + # Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False if isinstance(music_object, Album): if self.NO_ADDITIONAL_DATA_FROM_SONG: skip_next_details = True - + if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]: return DownloadResult() if not isinstance(music_object, Song) or not self.NO_ADDITIONAL_DATA_FROM_SONG: self.fetch_details(music_object=music_object, stop_at_level=2) - + naming_dict.add_object(music_object) if isinstance(music_object, Song): @@ -357,7 +362,9 @@ class Page: sub_ordered_music_object: DatabaseObject for sub_ordered_music_object in collection: - download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all, skip_details=skip_next_details, process_metadata_anyway=process_metadata_anyway)) + download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all, + skip_details=skip_next_details, + process_metadata_anyway=process_metadata_anyway)) return download_result @@ -378,7 +385,6 @@ class Page: ) ) - if song.target_collection.empty: song.target_collection.append(new_target) @@ -393,7 +399,7 @@ class Page: str(song.id) ) ) - + r = DownloadResult(1) found_on_disc = False @@ -403,10 +409,10 @@ class Page: if process_metadata_anyway: target.copy_content(temp_target) found_on_disc = True - + r.found_on_disk += 1 r.add_target(target) - + if found_on_disc and not process_metadata_anyway: self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.") return r @@ -415,18 +421,18 @@ class Page: if not found_on_disc: r = self.download_song_to_target(source=source, target=temp_target, desc=song.title) - if not r.is_fatal_error: - r.merge(self._post_process_targets(song, temp_target, [] if found_on_disc else self.get_skip_intervals(song, source))) + r.merge(self._post_process_targets(song, temp_target, + [] if found_on_disc else self.get_skip_intervals(song, source))) return r - + def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List) -> DownloadResult: correct_codec(temp_target, interval_list=interval_list) - + self.post_process_hook(song, temp_target) - + write_metadata_to_target(song.metadata, temp_target) r = DownloadResult() @@ -436,17 +442,17 @@ class Page: if temp_target is not target: temp_target.copy_content(target) r.add_target(target) - + temp_target.delete() r.sponsor_segments += len(interval_list) - + return r - + def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: return [] - + def post_process_hook(self, song: Song, temp_target: Target, **kwargs): pass - + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: return DownloadResult()