fix: fixed youtube music

This commit is contained in:
Hazel 2024-01-15 12:48:36 +01:00
parent 564621b332
commit 6a8374d595
7 changed files with 216 additions and 136 deletions

View File

@ -42,9 +42,8 @@ if __name__ == "__main__":
]
bandcamp_test = [
"s: #a Ghost Bath",
"0",
"d: 4"
"s: #a Only Smile",
"d: 1",
]

View File

@ -16,7 +16,6 @@ from ..download.page_attributes import Pages
from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject
"""
This is the implementation of the Shell
@ -107,6 +106,7 @@ def get_existing_genre() -> List[str]:
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
@ -129,19 +129,18 @@ def get_genre():
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def help_message():
print()
print(main_settings["happy_messages"])
print()
class Downloader:
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
@ -149,23 +148,22 @@ class Downloader:
process_metadata_anyway: bool = False,
) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self.current_results: Results = None
self._result_history: List[Results] = []
self.genre = genre or get_genre()
self.process_metadata_anyway = process_metadata_anyway
print()
print(f"Downloading to: \"{self.genre}\"")
print()
def print_current_options(self):
self.page_dict = dict()
@ -176,12 +174,13 @@ class Downloader:
if isinstance(option, Option):
print(f"{option.index:0{self.option_digits}} {option.music_object.option_string}")
else:
prefix = ALPHABET[page_count%len(ALPHABET)]
print(f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------")
prefix = ALPHABET[page_count % len(ALPHABET)]
print(
f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------")
self.page_dict[prefix] = option
self.page_dict[option.__name__] = option
page_count += 1
print()
@ -189,47 +188,47 @@ class Downloader:
def set_current_options(self, current_options: Results):
if main_settings["result_history"]:
self._result_history.append(current_options)
if main_settings["history_length"] != -1:
if len(self._result_history) > main_settings["history_length"]:
self._result_history.pop(0)
self.current_results = current_options
def previous_option(self) -> bool:
if not main_settings["result_history"]:
print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.")
return False
if len(self._result_history) <= 1:
print(f"No results in history.")
return False
self._result_history.pop()
self.current_results = self._result_history[-1]
return True
def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query:
song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True)
album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True)
artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True)
if song is not None:
if album is not None:
song.album_collection.append(album)
if artist is not None:
song.main_artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
if artist is not None:
album.artist_collection.append(artist)
return Query(raw_query=query, music_object=album)
if artist is not None:
return Query(raw_query=query, music_object=artist)
return Query(raw_query=query)
def search(self, query: str):
if re.match(URL_PATTERN, query) is not None:
try:
@ -243,58 +242,57 @@ class Downloader:
self.set_current_options(PageResults(page, data_object.options))
self.print_current_options()
return
special_characters = "#\\"
query = query + " "
key_text = {}
skip_next = False
escape_next = False
new_text = ""
latest_key: str = None
for i in range(len(query) - 1):
current_char = query[i]
next_char = query[i+1]
next_char = query[i + 1]
if skip_next:
skip_next = False
continue
if escape_next:
new_text += current_char
escape_next = False
# escaping
if current_char == "\\":
if next_char in special_characters:
escape_next = True
continue
if current_char == "#":
if latest_key is not None:
key_text[latest_key] = new_text
new_text = ""
latest_key = next_char
skip_next = True
continue
new_text += current_char
if latest_key is not None:
key_text[latest_key] = new_text
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.pages.search(parsed_query))
self.print_current_options()
def goto(self, index: int):
page: Type[Page]
music_object: DatabaseObject
try:
page, music_object = self.current_results.get_music_object_by_index(index)
except KeyError:
@ -302,23 +300,22 @@ class Downloader:
print(f"The option {index} doesn't exist.")
print()
return
self.pages.fetch_details(music_object)
print(music_object)
print(music_object.options)
self.set_current_options(PageResults(page, music_object.options))
self.print_current_options()
def download(self, download_str: str, download_all: bool = False) -> bool:
to_download: List[DatabaseObject] = []
if re.match(URL_PATTERN, download_str) is not None:
_, music_objects = self.pages.fetch_url(download_str)
to_download.append(music_objects)
else:
index: str
for index in download_str.split(", "):
@ -327,66 +324,68 @@ class Downloader:
print(f"Every download thingie has to be an index, not {index}.")
print()
return False
for index in download_str.split(", "):
to_download.append(self.current_results.get_music_object_by_index(int(index))[1])
print()
print("Downloading:")
for download_object in to_download:
print(download_object.option_string)
print()
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in to_download:
r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all, process_metadata_anyway=self.process_metadata_anyway)
r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all,
process_metadata_anyway=self.process_metadata_anyway)
_result_map[database_object] = r
for music_object, result in _result_map.items():
print()
print(music_object.option_string)
print(result)
return True
def process_input(self, input_str: str) -> bool:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options()
return False
if processed_input.startswith("s: "):
self.search(input_str[3:])
return False
if processed_input.startswith("d: "):
return self.download(input_str[3:])
if processed_input.isdigit():
self.goto(int(processed_input))
return False
if processed_input != "help":
print("Invalid input.")
help_message()
return False
def mainloop(self):
while True:
if self.process_input(input("> ")):
return
@cli_function
def download(
genre: str = None,
@ -403,9 +402,9 @@ def download(
print("Restart the programm to use it.")
else:
print("Something went wrong configuring.")
shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None:
for command in command_list:
shell.process_input(command)
@ -414,5 +413,5 @@ def download(
if direct_download_url is not None:
if shell.download(direct_download_url, download_all=download_all):
return
shell.mainloop()

View File

@ -98,8 +98,10 @@ class Pages:
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.")
_page_types = set()
self.fetch_details(music_object)
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])

View File

@ -315,7 +315,7 @@ class Collection(Generic[T]):
yield element
def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other.shallow_list, from_map=True)
self.extend(__other._data, from_map=True)
def __getitem__(self, item: int):
if item < len(self._data):

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import random
from collections import defaultdict
from functools import lru_cache
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
@ -131,6 +132,18 @@ class OuterProxy:
return super().__setattr__(__name, __value)
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
pass
def add_list_of_other_objects(self, object_list: List[OuterProxy]):
d: Dict[Type[OuterProxy], List[OuterProxy]] = defaultdict(list)
for db_object in object_list:
d[type(db_object)].append(db_object)
for key, value in d.items():
self._add_other_db_objects(key, value)
def __hash__(self):
"""
:raise: IsDynamicException

View File

@ -95,6 +95,22 @@ class Song(Base):
"feature_song_collection": self
}
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
return
if isinstance(object_list, Lyrics):
self.lyrics_collection.extend(object_list)
return
if isinstance(object_list, Artist):
self.main_artist_collection.extend(object_list)
return
if isinstance(object_list, Album):
self.album_collection.extend(object_list)
return
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
@ -229,6 +245,22 @@ class Album(Base):
"main_artist_collection": self.artist_collection
}
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
self.song_collection.extend(object_list)
return
if object_type is Artist:
self.artist_collection.extend(object_list)
return
if object_type is Album:
return
if object_type is Label:
self.label_collection.extend(object_list)
return
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
@ -436,6 +468,23 @@ class Artist(Base):
"current_artist_collection": self
}
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
# this doesn't really make sense
# self.feature_song_collection.extend(object_list)
return
if object_type is Artist:
return
if object_type is Album:
self.main_album_collection.extend(object_list)
return
if object_type is Label:
self.label_collection.extend(object_list)
return
@property
def options(self) -> List[P]:
options = [self, *self.main_album_collection.shallow_list, *self.feature_album]
@ -618,6 +667,18 @@ class Label(Base):
*[('url', source.url) for source in self.source_collection]
]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
return
if object_type is Artist:
self.current_artist_collection.extend(object_list)
return
if object_type is Album:
self.album_collection.extend(object_list)
return
@property
def options(self) -> List[P]:
options = [self]

View File

@ -28,7 +28,6 @@ from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@ -42,22 +41,22 @@ class NamingDict(dict):
"album": "album.title",
"album_type": "album.album_type_string"
}
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = main_settings["audio_format"]
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
def __getitem__(self, key: str) -> str:
return fit_to_file_system(super().__getitem__(key))
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
@ -67,23 +66,23 @@ class NamingDict(dict):
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
frag_list = key.split(".")
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
return str(value)
except AttributeError:
return self.default_value_for_name(attribute_name)
@ -133,6 +132,7 @@ def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection])
_clean_collection(song.feature_artist_collection, collections)
_clean_collection(song.main_artist_collection, collections)
def clean_object(dirty_object: DatabaseObject) -> DatabaseObject:
if isinstance(dirty_object, INDEPENDENT_DB_OBJECTS):
collections = {
@ -147,20 +147,22 @@ def clean_object(dirty_object: DatabaseObject) -> DatabaseObject:
_clean_music_object(dirty_object, collections)
return dirty_object
def build_new_object(new_object: DatabaseObject) -> DatabaseObject:
new_object = clean_object(new_object)
new_object.compile(merge_into=False)
return new_object
def merge_together(old_object: DatabaseObject, new_object: DatabaseObject, do_compile: bool = True) -> DatabaseObject:
new_object = clean_object(new_object)
old_object.merge(new_object)
if do_compile and False:
old_object.compile(merge_into=False)
return old_object
@ -169,60 +171,59 @@ class Page:
This is an abstract class, laying out the
functionality for every other class fetching something
"""
SOURCE_TYPE: SourcePages
LOGGER = logging.getLogger("this shouldn't be used")
# set this to true, if all song details can also be fetched by fetching album details
NO_ADDITIONAL_DATA_FROM_SONG = False
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1, post_process: bool = True) -> DatabaseObject:
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1,
post_process: bool = True) -> DatabaseObject:
"""
when a music object with lacking data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
@ -263,7 +264,9 @@ class Page:
return music_object
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]:
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2,
enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[
DatabaseObject]:
obj_type = self.get_source_type(source)
if obj_type is None:
@ -272,16 +275,16 @@ class Page:
if enforce_type != obj_type and enforce_type is not None:
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
return None
music_object: DatabaseObject = None
fetch_map = {
Song: self.fetch_song,
Album: self.fetch_album,
Artist: self.fetch_artist,
Label: self.fetch_label
}
if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level)
else:
@ -294,10 +297,11 @@ class Page:
collection = music_object.__getattribute__(collection_str)
for sub_element in collection:
sub_element.merge(self.fetch_details(sub_element, stop_at_level=stop_at_level-1, post_process=False))
sub_element.merge(
self.fetch_details(sub_element, stop_at_level=stop_at_level - 1, post_process=False))
return music_object
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
@ -310,41 +314,42 @@ class Page:
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False,
process_metadata_anyway: bool = False) -> DownloadResult:
naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty:
continue
dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object)
return self._download(music_object, naming_dict, download_all, process_metadata_anyway=process_metadata_anyway)
def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False, skip_details: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False,
skip_details: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
skip_next_details = skip_details
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
if self.NO_ADDITIONAL_DATA_FROM_SONG:
skip_next_details = True
if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]:
return DownloadResult()
if not isinstance(music_object, Song) or not self.NO_ADDITIONAL_DATA_FROM_SONG:
self.fetch_details(music_object=music_object, stop_at_level=2)
naming_dict.add_object(music_object)
if isinstance(music_object, Song):
@ -357,7 +362,9 @@ class Page:
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all, skip_details=skip_next_details, process_metadata_anyway=process_metadata_anyway))
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all,
skip_details=skip_next_details,
process_metadata_anyway=process_metadata_anyway))
return download_result
@ -378,7 +385,6 @@ class Page:
)
)
if song.target_collection.empty:
song.target_collection.append(new_target)
@ -393,7 +399,7 @@ class Page:
str(song.id)
)
)
r = DownloadResult(1)
found_on_disc = False
@ -403,10 +409,10 @@ class Page:
if process_metadata_anyway:
target.copy_content(temp_target)
found_on_disc = True
r.found_on_disk += 1
r.add_target(target)
if found_on_disc and not process_metadata_anyway:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r
@ -415,18 +421,18 @@ class Page:
if not found_on_disc:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.title)
if not r.is_fatal_error:
r.merge(self._post_process_targets(song, temp_target, [] if found_on_disc else self.get_skip_intervals(song, source)))
r.merge(self._post_process_targets(song, temp_target,
[] if found_on_disc else self.get_skip_intervals(song, source)))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List) -> DownloadResult:
correct_codec(temp_target, interval_list=interval_list)
self.post_process_hook(song, temp_target)
write_metadata_to_target(song.metadata, temp_target)
r = DownloadResult()
@ -436,17 +442,17 @@ class Page:
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
return r
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()