diff --git a/.vscode/launch.json b/.vscode/launch.json index 24c2088..b245b1d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -17,6 +17,12 @@ "request": "launch", "program": "development/actual_donwload.py", "console": "integratedTerminal" + }, + { + "name": "Python Debugger: Music Kraken", + "type": "debugpy", + "request": "launch", // run the module + "program": "${workspaceFolder}/.vscode/run_script.py", } ] } \ No newline at end of file diff --git a/.vscode/run_script.py b/.vscode/run_script.py new file mode 100644 index 0000000..1f7a1b9 --- /dev/null +++ b/.vscode/run_script.py @@ -0,0 +1,3 @@ +from music_kraken.__main__ import cli + +cli() diff --git a/music_kraken/__init__.py b/music_kraken/__init__.py index 5176b38..e9dbc41 100644 --- a/music_kraken/__init__.py +++ b/music_kraken/__init__.py @@ -1,13 +1,13 @@ -import logging import gc +import logging import sys from pathlib import Path -from rich.logging import RichHandler from rich.console import Console +from rich.logging import RichHandler -from .utils.shared import DEBUG, DEBUG_LOGGING from .utils.config import logging_settings, main_settings, read_config +from .utils.shared import DEBUG, DEBUG_LOGGING read_config() diff --git a/music_kraken/__meta__.py b/music_kraken/__meta__.py new file mode 100644 index 0000000..9a47f73 --- /dev/null +++ b/music_kraken/__meta__.py @@ -0,0 +1,3 @@ +PROGRAMM: str = "music-kraken" +DESCRIPTION: str = """This program will first get the metadata of various songs from metadata providers like musicbrainz, and then search for download links on pages like bandcamp. +Then it will download the song and edit the metadata accordingly.""" diff --git a/music_kraken/audio/metadata.py b/music_kraken/audio/metadata.py index bceb775..1ec09f5 100644 --- a/music_kraken/audio/metadata.py +++ b/music_kraken/audio/metadata.py @@ -1,14 +1,15 @@ -import mutagen -from mutagen.id3 import ID3, Frame, APIC, USLT +import logging from pathlib import Path from typing import List -import logging + +import mutagen +from mutagen.id3 import APIC, ID3, USLT, Frame from PIL import Image -from ..utils.config import logging_settings, main_settings -from ..objects import Song, Target, Metadata -from ..objects.metadata import Mapping from ..connection import Connection +from ..objects import Metadata, Song, Target +from ..objects.metadata import Mapping +from ..utils.config import logging_settings, main_settings LOGGER = logging_settings["tagging_logger"] @@ -105,7 +106,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song): APIC( encoding=0, mime="image/jpeg", - type=3, + type=mutagen.id3.PictureType.COVER_FRONT, desc=u"Cover", data=converted_target.read_bytes(), ) diff --git a/music_kraken/cli/__init__.py b/music_kraken/cli/__init__.py deleted file mode 100644 index 1dbf213..0000000 --- a/music_kraken/cli/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .informations import print_paths -from .main_downloader import download -from .options.settings import settings -from .options.frontend import set_frontend - diff --git a/music_kraken/cli/utils.py b/music_kraken/cli/utils.py deleted file mode 100644 index e2f9bab..0000000 --- a/music_kraken/cli/utils.py +++ /dev/null @@ -1,42 +0,0 @@ -from ..utils.shared import get_random_message - - -def cli_function(function): - def wrapper(*args, **kwargs): - silent = kwargs.get("no_cli", False) - if "no_cli" in kwargs: - del kwargs["no_cli"] - - if silent: - return function(*args, **kwargs) - return - - code = 0 - - print_cute_message() - print() - try: - code = function(*args, **kwargs) - except KeyboardInterrupt: - print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues") - - finally: - print() - print_cute_message() - print("See you soon! :3") - - exit() - - return wrapper - - -def print_cute_message(): - message = get_random_message() - try: - print(message) - except UnicodeEncodeError: - message = str(c for c in message if 0 < ord(c) < 127) - print(message) - - - \ No newline at end of file diff --git a/music_kraken/development_cli/__init__.py b/music_kraken/development_cli/__init__.py new file mode 100644 index 0000000..7c86dcd --- /dev/null +++ b/music_kraken/development_cli/__init__.py @@ -0,0 +1,82 @@ +import argparse +from functools import cached_property + +from ..__meta__ import DESCRIPTION, PROGRAMM +from ..download import Downloader +from ..utils import BColors +from ..utils.string_processing import unify +from .utils import HELP_MESSAGE, ask_for_bool, ask_for_create + + +class DevelopmentCli: + def __init__(self, args: argparse.Namespace): + self.args = args + + if args.genre: + self.genre = args.genre + + self.downloader: Downloader = Downloader() + + @cached_property + def genre(self) -> str: + """This is a cached property, which means if it isn't set in the constructor or before it is accessed, + the program will be thrown in a shell + + Returns: + str: the genre that should be used + """ + option_string = f"{BColors.HEADER}Genres{BColors.ENDC}" + genre_map = {} + + _string_list = [] + for i, genre in enumerate(self.downloader.get_existing_genres()): + option_string += f"\n{BColors.BOLD}{i}{BColors.ENDC}: {genre}" + + genre_map[str(i)] = genre + genre_map[unify(genre)] = genre + + genre = None + while genre is None: + print(option_string) + print() + + i = input("> ") + u = unify(i) + if u in genre_map: + genre = genre_map[u] + break + + if ask_for_create("genre", i): + genre = i + break + + return genre + + + def help_screen(self) -> None: + print(HELP_MESSAGE) + + def shell(self) -> None: + print(f"Welcome to the {PROGRAMM} shell!") + print(f"Type '{BColors.OKBLUE}help{BColors.ENDC}' for a list of commands.") + print("") + + while True: + i = input("> ") + if i == "help": + self.help_screen() + elif i == "genre": + self.genre + elif i == "exit": + break + else: + print("Unknown command. Type 'help' for a list of commands.") + +def main(): + parser = argparse.ArgumentParser( + prog=PROGRAMM, + description=DESCRIPTION, + epilog='This is just a development cli. The real frontend is coming soon.' + ) + parser.add_argument('--genre', '-g', action='store_const', required=False, help="choose a genre to download from") + args = parser.parse_args() diff --git a/music_kraken/cli/informations/__init__.py b/music_kraken/development_cli/informations/__init__.py similarity index 100% rename from music_kraken/cli/informations/__init__.py rename to music_kraken/development_cli/informations/__init__.py diff --git a/music_kraken/cli/informations/paths.py b/music_kraken/development_cli/informations/paths.py similarity index 100% rename from music_kraken/cli/informations/paths.py rename to music_kraken/development_cli/informations/paths.py diff --git a/music_kraken/cli/main_downloader.py b/music_kraken/development_cli/main_downloader.py similarity index 71% rename from music_kraken/cli/main_downloader.py rename to music_kraken/development_cli/main_downloader.py index c5ba8a9..eb01c1a 100644 --- a/music_kraken/cli/main_downloader.py +++ b/music_kraken/development_cli/main_downloader.py @@ -1,89 +1,26 @@ import random -from typing import Set, Type, Dict, List -from pathlib import Path import re +from pathlib import Path +from typing import Dict, Generator, List, Set, Type, Union -from .utils import cli_function -from .options.first_config import initial_config - -from ..utils import output, BColors -from ..utils.config import write_config, main_settings -from ..utils.shared import URL_PATTERN -from ..utils.string_processing import fit_to_file_system -from ..utils.support_classes.query import Query -from ..utils.support_classes.download_result import DownloadResult +from .. import console +from ..download import Downloader, Page, components +from ..download.results import GoToResults +from ..download.results import Option as ResultOption +from ..download.results import PageResults, Results +from ..objects import Album, Artist, DatabaseObject, Song +from ..utils import BColors, output +from ..utils.config import main_settings, write_config +from ..utils.enums.colors import BColors from ..utils.exception import MKInvalidInputException from ..utils.exception.download import UrlNotFoundException -from ..utils.enums.colors import BColors -from .. import console - -from ..download.results import Results, Option, PageResults, GoToResults -from ..download.page_attributes import Pages -from ..pages import Page -from ..objects import Song, Album, Artist, DatabaseObject - -""" -This is the implementation of the Shell - -# Behaviour - -## Searching - -```mkshell -> s: {querry or url} - -# examples -> s: https://musify.club/release/some-random-release-183028492 -> s: r: #a an Artist #r some random Release -``` - -Searches for an url, or an query - -### Query Syntax - -``` -#a {artist} #r {release} #t {track} -``` - -You can escape stuff like `#` doing this: `\#` - -## Downloading - -To download something, you either need a direct link, or you need to have already searched for options - -```mkshell -> d: {option ids or direct url} - -# examples -> d: 0, 3, 4 -> d: 1 -> d: https://musify.club/release/some-random-release-183028492 -``` - -## Misc - -### Exit - -```mkshell -> q -> quit -> exit -> abort -``` - -### Current Options - -```mkshell -> . -``` - -### Previous Options - -``` -> .. -``` - -""" +from ..utils.shared import HELP_MESSAGE, URL_PATTERN +from ..utils.string_processing import fit_to_file_system +from ..utils.support_classes.download_result import DownloadResult +from ..utils.support_classes.query import Query +from .genre import get_genre +from .options.first_config import initial_config +from .utils import ask_for_bool, cli_function EXIT_COMMANDS = {"q", "quit", "exit", "abort"} ALPHABET = "abcdefghijklmnopqrstuvwxyz" @@ -91,59 +28,17 @@ PAGE_NAME_FILL = "-" MAX_PAGE_LEN = 21 -def get_existing_genre() -> List[str]: - """ - gets the name of all subdirectories of shared.MUSIC_DIR, - but filters out all directories, where the name matches with any patern - from shared.NOT_A_GENRE_REGEX. - """ - existing_genres: List[str] = [] - # get all subdirectories of MUSIC_DIR, not the files in the dir. - existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()] - - for subdirectory in existing_subdirectories: - name: str = subdirectory.name - - if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]): - existing_genres.append(name) - - existing_genres.sort() - - return existing_genres - - -def get_genre(): - existing_genres = get_existing_genre() - for i, genre_option in enumerate(existing_genres): - print(f"{i + 1:0>2}: {genre_option}") - - while True: - genre = input("Id or new genre: ") - - if genre.isdigit(): - genre_id = int(genre) - 1 - if genre_id >= len(existing_genres): - print(f"No genre under the id {genre_id + 1}.") - continue - - return existing_genres[genre_id] - - new_genre = fit_to_file_system(genre) - - agree_inputs = {"y", "yes", "ok"} - verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower() - if verification in agree_inputs: - return new_genre def help_message(): + print(HELP_MESSAGE) print() print(random.choice(main_settings["happy_messages"])) print() -class Downloader: +class CliDownloader: def __init__( self, exclude_pages: Set[Type[Page]] = None, @@ -153,7 +48,7 @@ class Downloader: genre: str = None, process_metadata_anyway: bool = False, ) -> None: - self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady) + self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady) self.page_dict: Dict[str, Type[Page]] = dict() @@ -171,13 +66,16 @@ class Downloader: output() def print_current_options(self): - self.page_dict = dict() print() + print(self.current_results.pprint()) + """ + self.page_dict = dict() + page_count = 0 for option in self.current_results.formatted_generator(): - if isinstance(option, Option): + if isinstance(option, ResultOption): r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" print(r) else: @@ -189,10 +87,13 @@ class Downloader: self.page_dict[option.__name__] = option page_count += 1 + """ print() - def set_current_options(self, current_options: Results): + def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]): + current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options) + if main_settings["result_history"]: self._result_history.append(current_options) @@ -242,7 +143,7 @@ class Downloader: def search(self, query: str): if re.match(URL_PATTERN, query) is not None: try: - page, data_object = self.pages.fetch_url(query) + data_object = self.downloader.fetch_url(query) except UrlNotFoundException as e: print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" f"PR appreciated if the site isn't implemented.\n" @@ -296,15 +197,17 @@ class Downloader: parsed_query: Query = self._process_parsed(key_text, query) - self.set_current_options(self.pages.search(parsed_query)) + self.set_current_options(self.downloader.search(parsed_query)) self.print_current_options() - def goto(self, data_object: DatabaseObject): + def goto(self, data_object: Union[DatabaseObject, components.Select]): page: Type[Page] - self.pages.fetch_details(data_object, stop_at_level=1) - - self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options)) + if isinstance(data_object, components.Select): + self.set_current_options(data_object) + else: + self.downloader.fetch_details(data_object, stop_at_level=1) + self.set_current_options(data_object.options) self.print_current_options() @@ -316,7 +219,7 @@ class Downloader: _result_map: Dict[DatabaseObject, DownloadResult] = dict() for database_object in data_objects: - r = self.pages.download( + r = self.downloader.download( data_object=database_object, genre=self.genre, **kwargs @@ -371,24 +274,15 @@ class Downloader: indices = [] for possible_index in q.split(","): - possible_index = possible_index.strip() if possible_index == "": continue + + if possible_index not in self.current_results: + raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.") - i = 0 - try: - i = int(possible_index) - except ValueError: - raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.") + yield self.current_results[possible_index] - if i < 0 or i >= len(self.current_results): - raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.") - - indices.append(i) - - return [self.current_results[i] for i in indices] - - selected_objects = get_selected_objects(query) + selected_objects = list(get_selected_objects(query)) if do_merge: old_selected_objects = selected_objects @@ -403,19 +297,19 @@ class Downloader: if do_fetch: for data_object in selected_objects: - self.pages.fetch_details(data_object) + self.downloader.fetch_details(data_object) self.print_current_options() return False if do_download: - self.download(selected_objects) + self.download(list(o.value for o in selected_objects)) return False if len(selected_objects) != 1: raise MKInvalidInputException(message="You can only go to one object at a time without merging.") - self.goto(selected_objects[0]) + self.goto(selected_objects[0].value) return False except MKInvalidInputException as e: output("\n" + e.message + "\n", color=BColors.FAIL) @@ -446,7 +340,7 @@ def download( else: print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") - shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway) + shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway) if command_list is not None: for command in command_list: diff --git a/music_kraken/cli/options/__init__.py b/music_kraken/development_cli/options/__init__.py similarity index 100% rename from music_kraken/cli/options/__init__.py rename to music_kraken/development_cli/options/__init__.py diff --git a/music_kraken/cli/options/cache.py b/music_kraken/development_cli/options/cache.py similarity index 100% rename from music_kraken/cli/options/cache.py rename to music_kraken/development_cli/options/cache.py diff --git a/music_kraken/cli/options/first_config.py b/music_kraken/development_cli/options/first_config.py similarity index 100% rename from music_kraken/cli/options/first_config.py rename to music_kraken/development_cli/options/first_config.py diff --git a/music_kraken/cli/options/frontend.py b/music_kraken/development_cli/options/frontend.py similarity index 100% rename from music_kraken/cli/options/frontend.py rename to music_kraken/development_cli/options/frontend.py diff --git a/music_kraken/cli/options/settings.py b/music_kraken/development_cli/options/settings.py similarity index 100% rename from music_kraken/cli/options/settings.py rename to music_kraken/development_cli/options/settings.py diff --git a/music_kraken/development_cli/utils.py b/music_kraken/development_cli/utils.py new file mode 100644 index 0000000..830bec8 --- /dev/null +++ b/music_kraken/development_cli/utils.py @@ -0,0 +1,85 @@ +from ..utils import BColors +from ..utils.shared import get_random_message + + +def cli_function(function): + def wrapper(*args, **kwargs): + silent = kwargs.get("no_cli", False) + if "no_cli" in kwargs: + del kwargs["no_cli"] + + if silent: + return function(*args, **kwargs) + return + + code = 0 + + print_cute_message() + print() + try: + code = function(*args, **kwargs) + except KeyboardInterrupt: + print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues") + + finally: + print() + print_cute_message() + print("See you soon! :3") + + exit() + + return wrapper + + +def print_cute_message(): + message = get_random_message() + try: + print(message) + except UnicodeEncodeError: + message = str(c for c in message if 0 < ord(c) < 127) + print(message) + + +def highlight_placeholder(text: str) -> str: + return text.replace("<", f"{BColors.BOLD}<").replace(">", f">{BColors.ENDC}") + + +HELP_MESSAGE = highlight_placeholder(f"""{BColors.HEADER}To search:{BColors.ENDC} +> s: +> s: https://musify.club/release/some-random-release-183028492 +> s: #a #r #t + +If you found the same object twice from different sources you can merge those objects. +Then it will use those sources. To do so, use the {BColors.BOLD}m{BColors.ENDC} command. + +{BColors.HEADER}To download:{BColors.ENDC} +> d: +> dm: 0, 3, 4 # merge all objects into one and download this object +> d: 1 +> d: https://musify.club/release/some-random-release-183028492 + +{BColors.HEADER}To inspect an object:{BColors.ENDC} +If you inspect an object, you see its adjacent object. This means for example the releases of an artist, or the tracks of a release. +You can also merge objects with the {BColors.BOLD}m{BColors.ENDC} command here. + +> g: +> gm: 0, 3, 4 # merge all objects into one and inspect this object +> g: 1 +> g: https://musify.club/release/some-random-release-183028492""") + + +class COMMANDS: + AGREE = {"y", "yes", "ok"} + DISAGREE = {"n", "no"} + EXIT = {"exit"} + HELP = {"help", "h"} + + +def ask_for_bool(msg: str) -> bool: + i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower() + return i in COMMANDS.AGREE + + +def ask_for_create(name: str, value: str) -> bool: + return ask_for_bool(f"Do you want to create the {name} {BColors.OKBLUE}{value}{BColors.ENDC}?") + \ No newline at end of file diff --git a/music_kraken/download/__init__.py b/music_kraken/download/__init__.py index 7ca0086..21567f8 100644 --- a/music_kraken/download/__init__.py +++ b/music_kraken/download/__init__.py @@ -1,8 +1,36 @@ -from dataclasses import dataclass, field -from typing import Set +from __future__ import annotations -from ..utils.config import main_settings +import logging +import random +import re +from collections import defaultdict +from copy import copy +from dataclasses import dataclass, field +from pathlib import Path +from string import Formatter +from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List, + Optional, Set, Tuple, Type, TypedDict, Union) + +import requests +from bs4 import BeautifulSoup + +from ..audio import correct_codec, write_metadata_to_target +from ..connection import Connection +from ..objects import Album, Artist, Collection +from ..objects import DatabaseObject as DataObject +from ..objects import Label, Options, Song, Source, Target +from ..utils import BColors, limit_generator, output, trace +from ..utils.config import main_settings, youtube_settings +from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums.album import AlbumType +from ..utils.exception import MKComposeException, MKMissingNameException +from ..utils.exception.download import UrlNotFoundException +from ..utils.path_manager import LOCATIONS +from ..utils.shared import DEBUG_PAGES +from ..utils.string_processing import fit_to_file_system +from ..utils.support_classes.download_result import DownloadResult +from ..utils.support_classes.query import Query +from .results import SearchResults @dataclass @@ -19,3 +47,512 @@ class DownloadOptions: download_again_if_found: bool = False process_audio_if_found: bool = False process_metadata_if_found: bool = True + + +fetch_map = { + Song: "fetch_song", + Album: "fetch_album", + Artist: "fetch_artist", + Label: "fetch_label", +} + + +class Downloader: + def __init__( + self, + auto_register_pages: bool = True, + download_options: DownloadOptions = None, + fetch_options: FetchOptions = None, + **kwargs + ): + self.LOGGER = logging.getLogger("download") + + self.download_options: DownloadOptions = download_options or DownloadOptions() + self.fetch_options: FetchOptions = fetch_options or FetchOptions() + + self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set) + if auto_register_pages: + self.scan_for_pages(**kwargs) + + # manage which pages to use + + def register_page(self, page_type: Type[Page], **kwargs): + if page_type in self._registered_pages: + return + + self._registered_pages[page_type].add(page_type( + download_options=self.download_options, + fetch_options=self.fetch_options, + **kwargs + )) + + def deregister_page(self, page_type: Type[Page]): + if page_type not in _registered_pages: + return + + for p in self._registered_pages[page_type]: + p.__del__() + del self._registered_pages[page_type] + + def scan_for_pages(self, **kwargs): + # assuming the wanted pages are the leaf classes of the interface + from .. import pages + + leaf_classes = [] + + class_list = [Page] + while len(class_list): + _class = class_list.pop() + class_subclasses = _class.__subclasses__() + + if len(class_subclasses) == 0: + if _class.REGISTER: + leaf_classes.append(_class) + else: + class_list.extend(class_subclasses) + + if Page in leaf_classes: + self.LOGGER.warn("couldn't find any data source") + return + for leaf_class in leaf_classes: + self.register_page(leaf_class, **kwargs) + + def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]: + if len(page_types) == 0: + page_types = self._registered_pages.keys() + + for page_type in page_types: + yield from self._registered_pages[page_type] + + # fetching/downloading data + + def search(self, query: Query) -> Generator[DataObject, None, None]: + """Yields all data objects that were found by the query. + Other than `Downloader.search_yield_pages`, this function just yields all data objects. + This looses the data, where th objects were searched originally, so this might not be the best choice. + + Args: + query (Query): The query to search for. + + Yields: + Generator[DataObject, None, None]: A generator that yields all found data objects. + """ + + for page in self.get_pages(): + yield from page.search(query=query) + + def search_yield_pages(self, query: Query, results_per_page: Optional[int] = None) -> Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: + """Yields all data objects that were found by the query, grouped by the page they were found on. + every yield is a tuple of the page and a generator that yields the data objects. + So this could be how it is used: + + ```python + for page, data_objects in downloader.search_yield_pages(query): + print(f"Found on {page}:") + for data_object in data_objects: + print(data_object) + ``` + + Args: + query (Query): The query to search for. + results_per_page (Optional[int], optional): If this is set, the generators only yield this amount of data objects per page. + + Yields: + Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: yields the page and a generator that yields the data objects. + """ + + for page in self.get_pages(): + yield page, limit_generator(page.search(query=query), limit=results_per_page) + + def fetch_details(self, data_object: DataObject, **kwargs) -> DataObject: + """Fetches more details for the given data object. + This uses every source contained in data_object.source_collection that has a page. + + Args: + data_object (DataObject): The data object to fetch details for. + + Returns: + DataObject: The same data object, but with more details. + """ + + source: Source + for source in data_object.source_collection.get_sources(source_type_sorting={ + "only_with_page": True, + }): + new_data_object = self.fetch_from_source(source=source, **kwargs) + if new_data_object is not None: + data_object.merge(new_data_object) + + return data_object + + def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: + """Gets a data object from the given source. + + Args: + source (Source): The source to get the data object from. + + Returns: + Optional[DataObject]: If a data object can be retrieved, it is returned. Otherwise, None is returned. + """ + if not source.has_page: + return None + + source_type = source.page.get_source_type(source=source) + if source_type is None: + self.LOGGER.debug(f"Could not determine source type for {source}.") + return None + + func = getattr(source.page, fetch_map[source_type]) + + # fetching the data object and marking it as fetched + data_object: DataObject = func(source=source, **kwargs) + data_object.mark_as_fetched(source.hash_url) + return data_object + + def fetch_from_url(self, url: str) -> Optional[DataObject]: + """This function tries to detect the source of the given url and fetches the data object from it. + + Args: + url (str): The url to fetch the data object from. + + Returns: + Optional[DataObject]: The fetched data object, or None if no source could be detected or if no object could be retrieved. + """ + + source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) + if source is None: + return None + + return self.fetch_from_source(source=source) + + def _skip_object(self, data_object: DataObject) -> bool: + """Determine if the given data object should be downloaded or not. + + Args: + data_object (DataObject): The data object in question. + + Returns: + bool: Returns True if the given data object should be skipped. + """ + if isinstance(data_object, Album): + if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: + return True + + return False + + def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: + """Downloads the given data object. + It will recursively fetch all available details for this and every related object. + Then it will create the folder structure and download the audio file. + In the end the metadata will be written to the file. + + Args: + data_object (DataObject): The data object to download. If it is a artist, it will download the whole discography, if it is an Album it will download the whole Tracklist. + + Returns: + DownloadResult: Relevant information about the download success. + """ + + # fetch the given object + self.fetch_details(data_object) + output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) + + # fetching all parent objects (e.g. if you only download a song) + if not kwargs.get("fetched_upwards", False): + to_fetch: List[DataObject] = [data_object] + + while len(to_fetch) > 0: + new_to_fetch = [] + for d in to_fetch: + if self._skip_object(d): + continue + + self.fetch_details(d) + + for c in d.get_parent_collections(): + new_to_fetch.extend(c) + + to_fetch = new_to_fetch + + kwargs["fetched_upwards"] = True + + # download all children + download_result: DownloadResult = DownloadResult() + for c in data_object.get_child_collections(): + for d in c: + if self._skip_object(d): + continue + + download_result.merge(self.download(d, genre, **kwargs)) + + # actually download if the object is a song + if isinstance(data_object, Song): + """ + TODO + add the traced artist and album to the naming. + I am able to do that, because duplicate values are removed later on. + """ + + self._download_song(data_object, naming={ + "genre": [genre], + "audio_format": [main_settings["audio_format"]], + }) + + return download_result + + def _extract_fields_from_template(self, path_template: str) -> Set[str]: + return set(re.findall(r"{([^}]+)}", path_template)) + + def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: + field_names: Set[str] = self._extract_fields_from_template(path_template) + + for field in field_names: + if len(naming[field]) == 0: + raise MKMissingNameException(f"Missing field for {field}.") + + path_template = path_template.replace(f"{{{field}}}", naming[field][0]) + + return path_template + + def _download_song(self, song: Song, naming: dict) -> DownloadOptions: + """ + TODO + Search the song in the file system. + """ + r = DownloadResult(total=1) + + # pre process the data recursively + song.compile() + + # manage the naming + naming: Dict[str, List[str]] = defaultdict(list, naming) + naming["song"].append(song.title_value) + naming["isrc"].append(song.isrc) + naming["album"].extend(a.title_value for a in song.album_collection) + naming["album_type"].extend(a.album_type.value for a in song.album_collection) + naming["artist"].extend(a.name for a in song.artist_collection) + naming["artist"].extend(a.name for a in song.feature_artist_collection) + for a in song.album_collection: + naming["label"].extend([l.title_value for l in a.label_collection]) + # removing duplicates from the naming, and process the strings + for key, value in naming.items(): + # https://stackoverflow.com/a/17016257 + naming[key] = list(dict.fromkeys(value)) + song.genre = naming["genre"][0] + + # manage the targets + tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) + + song.target_collection.append(Target( + relative_to_music_dir=True, + file_path=Path( + self._parse_path_template(main_settings["download_path"], naming=naming), + self._parse_path_template(main_settings["download_file"], naming=naming), + ) + )) + for target in song.target_collection: + if target.exists: + output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) + r.found_on_disk += 1 + + if not self.download_options.download_again_if_found: + target.copy_content(tmp) + else: + target.create_path() + output(f'{target.file_path}', color=BColors.GREY) + + # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source + used_source: Optional[Source] = None + skip_intervals: List[Tuple[float, float]] = [] + for source in song.source_collection.get_sources(source_type_sorting={ + "only_with_page": True, + "sort_key": lambda page: page.download_priority, + "reverse": True, + }): + if tmp.exists: + break + + used_source = source + streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") + skip_intervals = source.page.get_skip_intervals(song=song, source=source) + + # if something has been downloaded but it somehow failed, delete the file + if streaming_results.is_fatal_error and tmp.exists: + tmp.delete() + + # if everything went right, the file should exist now + if not tmp.exists: + if used_source is None: + r.error_message = f"No source found for {song.option_string}." + else: + r.error_message = f"Something went wrong downloading {song.option_string}." + return r + + # post process the audio + found_on_disk = used_source is None + if not found_on_disk or self.download_options.process_audio_if_found: + correct_codec(target=tmp, skip_intervals=skip_intervals) + r.sponsor_segments = len(skip_intervals) + + if used_source is not None: + used_source.page.post_process_hook(song=song, temp_target=tmp) + + if not found_on_disk or self.download_options.process_metadata_if_found: + write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) + + # copy the tmp target to the final locations + for target in song.target_collection: + tmp.copy_content(target) + + tmp.delete() + return r + + def fetch_url(self, url: str, **kwargs) -> DataObject: + source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) + + if source is None or source.page is None: + raise UrlNotFoundException(url=url) + + return source.page.fetch_object_from_source(source=source, **kwargs) + + # misc function + + def get_existing_genres(self) -> Generator[str, None, None]: + """Yields every existing genre, for the user to select from. + + Yields: + Generator[str, None, None]: a generator that yields every existing genre. + """ + + def is_valid_genre(genre: Path) -> bool: + """ + gets the name of all subdirectories of shared.MUSIC_DIR, + but filters out all directories, where the name matches with any Patern + from shared.NOT_A_GENRE_REGEX. + """ + if not genre.is_dir(): + return False + + if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): + return False + + return True + + for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()): + yield genre.name + +class Page: + REGISTER = True + SOURCE_TYPE: SourceType + LOGGER: logging.Logger + + def __new__(cls, *args, **kwargs): + cls.LOGGER = logging.getLogger(cls.__name__) + return super().__new__(cls) + + @classmethod + def is_leaf_page(cls) -> bool: + return len(cls.__subclasses__()) == 0 + + def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs): + self.SOURCE_TYPE.register_page(self) + + self.download_options: DownloadOptions = download_options or DownloadOptions() + self.fetch_options: FetchOptions = fetch_options or FetchOptions() + + def __del__(self): + self.SOURCE_TYPE.deregister_page() + + def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): + """ + Perform a regex search on the given string, using a single or a list of + patterns returning the first matching group. + In case of failure return a default value or raise a WARNING or a + RegexNotFoundError, depending on fatal, specifying the field name. + """ + + if isinstance(pattern, str): + mobj = re.search(pattern, string, flags) + else: + for p in pattern: + mobj = re.search(p, string, flags) + if mobj: + break + + if mobj: + if group is None: + # return the first matching group + return next(g for g in mobj.groups() if g is not None) + elif isinstance(group, (list, tuple)): + return tuple(mobj.group(g) for g in group) + else: + return mobj.group(group) + + return default + + def get_source_type(self, source: Source) -> Optional[Type[DataObject]]: + return None + + def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: + return BeautifulSoup(r.content, "html.parser") + + # to search stuff + def search(self, query: Query) -> List[DataObject]: + music_object = query.music_object + + search_functions = { + Song: self.song_search, + Album: self.album_search, + Artist: self.artist_search, + Label: self.label_search + } + + if type(music_object) in search_functions: + r = search_functions[type(music_object)](music_object) + if r is not None and len(r) > 0: + return r + + r = [] + for default_query in query.default_search: + for single_option in self.general_search(default_query): + r.append(single_option) + + return r + + def general_search(self, search_query: str) -> List[DataObject]: + return [] + + def label_search(self, label: Label) -> List[Label]: + return [] + + def artist_search(self, artist: Artist) -> List[Artist]: + return [] + + def album_search(self, album: Album) -> List[Album]: + return [] + + def song_search(self, song: Song) -> List[Song]: + return [] + + # to fetch stuff + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: + return Song() + + def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: + return Album() + + def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: + return Artist() + + def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: + return Label() + + # to download stuff + def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: + return [] + + def post_process_hook(self, song: Song, temp_target: Target, **kwargs): + pass + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: + return DownloadResult() diff --git a/music_kraken/download/components/__init__.py b/music_kraken/download/components/__init__.py new file mode 100644 index 0000000..effc09f --- /dev/null +++ b/music_kraken/download/components/__init__.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import copy +import re +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path +from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List, + Optional, Tuple, TypeVar, Union) + +from ...objects import OuterProxy as DataObject +from ...utils import BColors +from ...utils.config import main_settings +from ...utils.enums import SourceType +from ...utils.exception import MKComposeException +from ...utils.shared import ALPHABET +from ...utils.string_processing import unify + +P = TypeVar('P') + + +class HumanIO: + @staticmethod + def ask_to_create(option: Option) -> bool: + return True + + @staticmethod + def not_found(key: Any) -> None: + return None + +class Option(Generic[P]): + """ + This could represent a data object, a string or a page. + """ + TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}" + ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value") + ATTRIBUTES_KEY: Tuple[str, ...] = ("index", ) + + def __init__( + self, + value: P, + hidden: bool = False, + additional_keys: List[Hashable] = None, + **kwargs + ): + self.value = value + self.hidden = hidden + self._additional_keys = set(self._to_hash(key) for key in additional_keys or []) + + for key, value in kwargs.items(): + setattr(self, key, value) + + super(Option, self).__init__() + + def _to_option_string(self, value: Any) -> str: + if hasattr(value, "option_string"): + return value.option_string + + return str(value) + + @property + def text(self) -> str: + text = self.TEXT_TEMPLATE + + for attribute_key in self.ATTRIBUTES_FORMATTING: + text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key))) + + return text + + def _to_hash(self, key: Any) -> int: + try: + key = int(key) + except ValueError: + pass + + if isinstance(key, str): + return hash(unify(key)) + + return hash(key) + + @property + def keys(self) -> set: + keys = self._additional_keys.copy() + + for key in self.ATTRIBUTES_KEY: + keys.add(self._to_hash(getattr(self, key))) + + def __contains__(self, key: Any) -> bool: + return self._to_hash(key) in self.keys + + def __str__(self): + return self.text + + def __iter__(self) -> Generator[Option[P], None, None]: + yield self + + +class Select(Generic[P]): + OPTION: Type[Option[P]] = Option + HUMAN_IO: Type[HumanIO] = HumanIO + CAN_CREATE_OPTIONS: bool = False + + def __init__( + self, + data: Generator[P, None, None], + **kwargs + ): + self.option: Type[Option[P]] = kwargs.get("option", self.OPTION) + self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO) + self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS) + + self._options: List[Option[P]] = [] + self.extend(data) + + super(Select, self).__init__(**kwargs) + + def append(self, value: P) -> Option[P]: + option = self.option(value) + self._options.append(option) + return option + + def extend(self, values: Generator[P, None, None]): + for value in values: + self.append(value) + + @property + def _options_to_show(self) -> Generator[Option[P], None, None]: + for option in self._options: + if option.hidden: + continue + + yield option + + def __iter__(self) -> Generator[Option, None, None]: + _index = 0 + + for i, o in enumerate(self._options_to_show): + for option in o: + option.index = _index + yield option + _index += 1 + + def __contains__(self, key: Any) -> bool: + for option in self._options: + if key in option: + return True + + return False + + def __getitem__(self, key: Any) -> Option[P]: + for option in self._options: + if key in option: + return option + + raise KeyError(key) + + def choose(self, key: Any) -> Optional[Option[P]]: + try: + return self[key] + except KeyError: + if self.can_create_options: + return self.append(key) + + self.human_io.not_found(key) + + def pprint(self) -> str: + return "\n".join(str(option) for option in self) + + +class Node(Generator[P]): + def __init__( + self, + value: Optional[P] = None, + children: List[Node[P]] = None, + parent: Node[P] = None, + **kwargs + ): + self.value = value + self.depth = 0 + self.index: int = 0 + + + self.children: List[Node[P]] = kwargs.get("children", []) + self.parent: Optional[Node[P]] = kwargs.get("parent", None) + + super(Node, self).__init__(**kwargs) + + @property + def is_root(self) -> bool: + return self.parent is None + + @property + def is_leaf(self) -> bool: + return not self.children + + def __iter__(self, **kwargs) -> Generator[Node[P], None, None]: + _level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0)) + + self.index = _level_index_map[self.depth] + yield self + _level_index_map[self.depth] += 1 + + for child in self.children: + child.depth = self.depth + 1 + + for node in child.__iter__(level_index_map=_level_index_map): + yield node + + def __getitem__(self, key: Any) -> Option[P]: + pass + + def __contains__(self, key: Any) -> bool: + if key in self.option: + return True diff --git a/music_kraken/download/components/generic.py b/music_kraken/download/components/generic.py new file mode 100644 index 0000000..be88017 --- /dev/null +++ b/music_kraken/download/components/generic.py @@ -0,0 +1 @@ +from . import Option, Select diff --git a/music_kraken/download/page_attributes.py b/music_kraken/download/page_attributes.py deleted file mode 100644 index 1db24be..0000000 --- a/music_kraken/download/page_attributes.py +++ /dev/null @@ -1,328 +0,0 @@ -from typing import Tuple, Type, Dict, Set, Optional, List -from collections import defaultdict -from pathlib import Path -import re -import logging - -from . import FetchOptions, DownloadOptions -from .results import SearchResults -from ..objects import ( - DatabaseObject as DataObject, - Collection, - Target, - Source, - Options, - Song, - Album, - Artist, - Label, -) -from ..audio import write_metadata_to_target, correct_codec -from ..utils import output, BColors -from ..utils.string_processing import fit_to_file_system -from ..utils.config import youtube_settings, main_settings -from ..utils.path_manager import LOCATIONS -from ..utils.enums import SourceType, ALL_SOURCE_TYPES -from ..utils.support_classes.download_result import DownloadResult -from ..utils.support_classes.query import Query -from ..utils.support_classes.download_result import DownloadResult -from ..utils.exception import MKMissingNameException -from ..utils.exception.download import UrlNotFoundException -from ..utils.shared import DEBUG_PAGES - -from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS - - -ALL_PAGES: Set[Type[Page]] = { - # EncyclopaediaMetallum, - Genius, - Musify, - YoutubeMusic, - Bandcamp -} - -if youtube_settings["use_youtube_alongside_youtube_music"]: - ALL_PAGES.add(YouTube) - -AUDIO_PAGES: Set[Type[Page]] = { - Musify, - YouTube, - YoutubeMusic, - Bandcamp -} - -SHADY_PAGES: Set[Type[Page]] = { - Musify, -} - -fetch_map = { - Song: "fetch_song", - Album: "fetch_album", - Artist: "fetch_artist", - Label: "fetch_label", -} - -if DEBUG_PAGES: - DEBUGGING_PAGE = Bandcamp - print(f"Only downloading from page {DEBUGGING_PAGE}.") - - ALL_PAGES = {DEBUGGING_PAGE} - AUDIO_PAGES = ALL_PAGES.union(AUDIO_PAGES) - - -class Pages: - def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): - self.LOGGER = logging.getLogger("download") - - self.download_options: DownloadOptions = download_options or DownloadOptions() - self.fetch_options: FetchOptions = fetch_options or FetchOptions() - - # initialize all page instances - self._page_instances: Dict[Type[Page], Page] = dict() - self._source_to_page: Dict[SourceType, Type[Page]] = dict() - - exclude_pages = exclude_pages if exclude_pages is not None else set() - - if exclude_shady: - exclude_pages = exclude_pages.union(SHADY_PAGES) - - if not exclude_pages.issubset(ALL_PAGES): - raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}") - - def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]: - return tuple(sorted(page_set, key=lambda page: page.__name__)) - - self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages) - self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set) - - self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES) - self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) - - for page_type in self.pages: - self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options) - self._source_to_page[page_type.SOURCE_TYPE] = page_type - - def _get_page_from_enum(self, source_page: SourceType) -> Page: - if source_page not in self._source_to_page: - return None - return self._page_instances[self._source_to_page[source_page]] - - def search(self, query: Query) -> SearchResults: - result = SearchResults() - - for page_type in self.pages: - result.add( - page=page_type, - search_result=self._page_instances[page_type].search(query=query) - ) - - return result - - def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: - if not isinstance(data_object, INDEPENDENT_DB_OBJECTS): - return data_object - - source: Source - for source in data_object.source_collection.get_sources(source_type_sorting={ - "only_with_page": True, - }): - new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) - if new_data_object is not None: - data_object.merge(new_data_object) - - return data_object - - def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: - if not source.has_page: - return None - - source_type = source.page.get_source_type(source=source) - if source_type is None: - self.LOGGER.debug(f"Could not determine source type for {source}.") - return None - - func = getattr(source.page, fetch_map[source_type]) - - # fetching the data object and marking it as fetched - data_object: DataObject = func(source=source, **kwargs) - data_object.mark_as_fetched(source.hash_url) - return data_object - - def fetch_from_url(self, url: str) -> Optional[DataObject]: - source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) - if source is None: - return None - - return self.fetch_from_source(source=source) - - def _skip_object(self, data_object: DataObject) -> bool: - if isinstance(data_object, Album): - if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: - return True - - return False - - def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: - # fetch the given object - self.fetch_details(data_object) - output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) - - # fetching all parent objects (e.g. if you only download a song) - if not kwargs.get("fetched_upwards", False): - to_fetch: List[DataObject] = [data_object] - - while len(to_fetch) > 0: - new_to_fetch = [] - for d in to_fetch: - if self._skip_object(d): - continue - - self.fetch_details(d) - - for c in d.get_parent_collections(): - new_to_fetch.extend(c) - - to_fetch = new_to_fetch - - kwargs["fetched_upwards"] = True - - # download all children - download_result: DownloadResult = DownloadResult() - for c in data_object.get_child_collections(): - for d in c: - if self._skip_object(d): - continue - - download_result.merge(self.download(d, genre, **kwargs)) - - # actually download if the object is a song - if isinstance(data_object, Song): - """ - TODO - add the traced artist and album to the naming. - I am able to do that, because duplicate values are removed later on. - """ - - self._download_song(data_object, naming={ - "genre": [genre], - "audio_format": [main_settings["audio_format"]], - }) - - return download_result - - def _extract_fields_from_template(self, path_template: str) -> Set[str]: - return set(re.findall(r"{([^}]+)}", path_template)) - - def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: - field_names: Set[str] = self._extract_fields_from_template(path_template) - - for field in field_names: - if len(naming[field]) == 0: - raise MKMissingNameException(f"Missing field for {field}.") - - path_template = path_template.replace(f"{{{field}}}", naming[field][0]) - - return path_template - - def _download_song(self, song: Song, naming: dict) -> DownloadOptions: - """ - TODO - Search the song in the file system. - """ - r = DownloadResult(total=1) - - # pre process the data recursively - song.compile() - - # manage the naming - naming: Dict[str, List[str]] = defaultdict(list, naming) - naming["song"].append(song.title_value) - naming["isrc"].append(song.isrc) - naming["album"].extend(a.title_value for a in song.album_collection) - naming["album_type"].extend(a.album_type.value for a in song.album_collection) - naming["artist"].extend(a.name for a in song.artist_collection) - naming["artist"].extend(a.name for a in song.feature_artist_collection) - for a in song.album_collection: - naming["label"].extend([l.title_value for l in a.label_collection]) - # removing duplicates from the naming, and process the strings - for key, value in naming.items(): - # https://stackoverflow.com/a/17016257 - naming[key] = list(dict.fromkeys(value)) - song.genre = naming["genre"][0] - - # manage the targets - tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) - - song.target_collection.append(Target( - relative_to_music_dir=True, - file_path=Path( - self._parse_path_template(main_settings["download_path"], naming=naming), - self._parse_path_template(main_settings["download_file"], naming=naming), - ) - )) - for target in song.target_collection: - if target.exists: - output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) - r.found_on_disk += 1 - - if not self.download_options.download_again_if_found: - target.copy_content(tmp) - else: - target.create_path() - output(f'{target.file_path}', color=BColors.GREY) - - # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source - used_source: Optional[Source] = None - skip_intervals: List[Tuple[float, float]] = [] - for source in song.source_collection.get_sources(source_type_sorting={ - "only_with_page": True, - "sort_key": lambda page: page.download_priority, - "reverse": True, - }): - if tmp.exists: - break - - used_source = source - streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") - skip_intervals = source.page.get_skip_intervals(song=song, source=source) - - # if something has been downloaded but it somehow failed, delete the file - if streaming_results.is_fatal_error and tmp.exists: - tmp.delete() - - # if everything went right, the file should exist now - if not tmp.exists: - if used_source is None: - r.error_message = f"No source found for {song.option_string}." - else: - r.error_message = f"Something went wrong downloading {song.option_string}." - return r - - # post process the audio - found_on_disk = used_source is None - if not found_on_disk or self.download_options.process_audio_if_found: - correct_codec(target=tmp, skip_intervals=skip_intervals) - r.sponsor_segments = len(skip_intervals) - - if used_source is not None: - used_source.page.post_process_hook(song=song, temp_target=tmp) - - if not found_on_disk or self.download_options.process_metadata_if_found: - write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) - - # copy the tmp target to the final locations - for target in song.target_collection: - tmp.copy_content(target) - - tmp.delete() - return r - - def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: - source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) - - if source is None: - raise UrlNotFoundException(url=url) - - _actual_page = self._source_to_page[source.source_type] - - return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) \ No newline at end of file diff --git a/music_kraken/download/results.py b/music_kraken/download/results.py index 2486c26..3587da4 100644 --- a/music_kraken/download/results.py +++ b/music_kraken/download/results.py @@ -1,8 +1,12 @@ -from typing import Tuple, Type, Dict, List, Generator, Union +from __future__ import annotations + from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, Generator, List, Tuple, Type, Union from ..objects import DatabaseObject -from ..pages import Page, EncyclopaediaMetallum, Musify + +if TYPE_CHECKING: + from . import Page @dataclass diff --git a/music_kraken/objects/song.py b/music_kraken/objects/song.py index 980bc08..a25252e 100644 --- a/music_kraken/objects/song.py +++ b/music_kraken/objects/song.py @@ -1,35 +1,32 @@ from __future__ import annotations +import copy import random from collections import defaultdict -from typing import List, Optional, Dict, Tuple, Type, Union -import copy +from typing import Dict, List, Optional, Tuple, Type, Union import pycountry -from ..utils.enums.album import AlbumType, AlbumStatus -from .collection import Collection -from .formatted_text import FormattedText -from .lyrics import Lyrics -from .contact import Contact -from .artwork import Artwork -from .metadata import ( - Mapping as id3Mapping, - ID3Timestamp, - Metadata -) -from .option import Options -from .parents import OuterProxy, P -from .source import Source, SourceCollection -from .target import Target -from .country import Language, Country +from ..utils.config import main_settings +from ..utils.enums.album import AlbumStatus, AlbumType +from ..utils.enums.colors import BColors from ..utils.shared import DEBUG_PRINT_ID from ..utils.string_processing import unify - +from .artwork import Artwork +from .collection import Collection +from .contact import Contact +from .country import Country, Language +from .formatted_text import FormattedText +from .lyrics import Lyrics +from .metadata import ID3Timestamp +from .metadata import Mapping as id3Mapping +from .metadata import Metadata +from .option import Options +from .parents import OuterProxy from .parents import OuterProxy as Base - -from ..utils.config import main_settings -from ..utils.enums.colors import BColors +from .parents import P +from .source import Source, SourceCollection +from .target import Target """ All Objects dependent diff --git a/music_kraken/pages/__init__.py b/music_kraken/pages/__init__.py index ba24501..a9fe75b 100644 --- a/music_kraken/pages/__init__.py +++ b/music_kraken/pages/__init__.py @@ -1,8 +1,52 @@ -from .encyclopaedia_metallum import EncyclopaediaMetallum -from .musify import Musify -from .youtube import YouTube -from .youtube_music import YoutubeMusic -from .bandcamp import Bandcamp -from .genius import Genius +import importlib +import inspect +import logging +import pkgutil +import sys +from collections import defaultdict +from copy import copy +from pathlib import Path +from typing import Dict, Generator, List, Set, Type -from .abstract import Page, INDEPENDENT_DB_OBJECTS +from ._bandcamp import Bandcamp +from ._encyclopaedia_metallum import EncyclopaediaMetallum +from ._genius import Genius +from ._musify import Musify +from ._youtube import YouTube +from ._youtube_music import YoutubeMusic + + +def import_children(): + _page_directory = Path(__file__).parent + _stem_blacklist = set(["__pycache__", "__init__"]) + + for _file in _page_directory.iterdir(): + if _file.stem in _stem_blacklist: + continue + + logging.debug(f"importing {_file.absolute()}") + exec(f"from . import {_file.stem}") + +# module_blacklist = set(sys.modules.keys()) +import_children() + +""" +classes = set() + +print(__name__) +for module_name, module in sys.modules.items(): + if module_name in module_blacklist or not module_name.startswith(__name__): + continue + + print("scanning module", module_name) + for name, obj in inspect.getmembers(module, predicate=inspect.isclass): + _module = obj.__module__ + if _module.startswith(__name__) and hasattr(obj, "SOURCE_TYPE"): + print("checking object", name, obj.__module__) + classes.add(obj) + print() + +print(*(c.__name__ for c in classes), sep=",\t") + +__all__ = [c.__name__ for c in classes] +""" \ No newline at end of file diff --git a/music_kraken/pages/bandcamp.py b/music_kraken/pages/_bandcamp.py similarity index 97% rename from music_kraken/pages/bandcamp.py rename to music_kraken/pages/_bandcamp.py index 1caf803..ab4977e 100644 --- a/music_kraken/pages/bandcamp.py +++ b/music_kraken/pages/_bandcamp.py @@ -1,33 +1,22 @@ -from typing import List, Optional, Type -from urllib.parse import urlparse, urlunparse import json from enum import Enum -from bs4 import BeautifulSoup -import pycountry +from typing import List, Optional, Type +from urllib.parse import urlparse, urlunparse + +import pycountry +from bs4 import BeautifulSoup -from ..objects import Source, DatabaseObject -from .abstract import Page -from ..objects import ( - Artist, - Source, - SourceType, - Song, - Album, - Label, - Target, - Contact, - ID3Timestamp, - Lyrics, - FormattedText, - Artwork, -) from ..connection import Connection +from ..download import Page +from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject, + FormattedText, ID3Timestamp, Label, Lyrics, Song, + Source, SourceType, Target) from ..utils import dump_to_file -from ..utils.enums import SourceType, ALL_SOURCE_TYPES -from ..utils.support_classes.download_result import DownloadResult -from ..utils.string_processing import clean_song_title -from ..utils.config import main_settings, logging_settings +from ..utils.config import logging_settings, main_settings +from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.shared import DEBUG +from ..utils.string_processing import clean_song_title +from ..utils.support_classes.download_result import DownloadResult if DEBUG: from ..utils import dump_to_file diff --git a/music_kraken/pages/encyclopaedia_metallum.py b/music_kraken/pages/_encyclopaedia_metallum.py similarity index 98% rename from music_kraken/pages/encyclopaedia_metallum.py rename to music_kraken/pages/_encyclopaedia_metallum.py index 52d9ea3..99ca57e 100644 --- a/music_kraken/pages/encyclopaedia_metallum.py +++ b/music_kraken/pages/_encyclopaedia_metallum.py @@ -1,31 +1,20 @@ from collections import defaultdict -from typing import List, Optional, Dict, Type, Union -from bs4 import BeautifulSoup +from typing import Dict, List, Optional, Type, Union +from urllib.parse import urlencode, urlparse + import pycountry -from urllib.parse import urlparse, urlencode +from bs4 import BeautifulSoup from ..connection import Connection -from ..utils.config import logging_settings -from .abstract import Page -from ..utils.enums import SourceType, ALL_SOURCE_TYPES -from ..utils.enums.album import AlbumType -from ..utils.support_classes.query import Query -from ..objects import ( - Lyrics, - Artist, - Source, - Song, - Album, - ID3Timestamp, - FormattedText, - Label, - Options, - DatabaseObject -) -from ..utils.shared import DEBUG +from ..download import Page +from ..objects import (Album, Artist, DatabaseObject, FormattedText, + ID3Timestamp, Label, Lyrics, Options, Song, Source) from ..utils import dump_to_file - - +from ..utils.config import logging_settings +from ..utils.enums import ALL_SOURCE_TYPES, SourceType +from ..utils.enums.album import AlbumType +from ..utils.shared import DEBUG +from ..utils.support_classes.query import Query ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, { "Full-length": AlbumType.STUDIO_ALBUM, @@ -207,6 +196,7 @@ def create_grid( class EncyclopaediaMetallum(Page): + REGISTER = False SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM LOGGER = logging_settings["metal_archives_logger"] diff --git a/music_kraken/pages/genius.py b/music_kraken/pages/_genius.py similarity index 96% rename from music_kraken/pages/genius.py rename to music_kraken/pages/_genius.py index 1719025..f79a1db 100644 --- a/music_kraken/pages/genius.py +++ b/music_kraken/pages/_genius.py @@ -1,33 +1,22 @@ -from typing import List, Optional, Type -from urllib.parse import urlparse, urlunparse, urlencode import json from enum import Enum -from bs4 import BeautifulSoup -import pycountry +from typing import List, Optional, Type +from urllib.parse import urlencode, urlparse, urlunparse + +import pycountry +from bs4 import BeautifulSoup -from ..objects import Source, DatabaseObject -from .abstract import Page -from ..objects import ( - Artist, - Source, - SourceType, - Song, - Album, - Label, - Target, - Contact, - ID3Timestamp, - Lyrics, - FormattedText, - Artwork, -) from ..connection import Connection +from ..download import Page +from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject, + FormattedText, ID3Timestamp, Label, Lyrics, Song, + Source, SourceType, Target) from ..utils import dump_to_file, traverse_json_path -from ..utils.enums import SourceType, ALL_SOURCE_TYPES -from ..utils.support_classes.download_result import DownloadResult -from ..utils.string_processing import clean_song_title -from ..utils.config import main_settings, logging_settings +from ..utils.config import logging_settings, main_settings +from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.shared import DEBUG +from ..utils.string_processing import clean_song_title +from ..utils.support_classes.download_result import DownloadResult if DEBUG: from ..utils import dump_to_file diff --git a/music_kraken/pages/musify.py b/music_kraken/pages/_musify.py similarity index 99% rename from music_kraken/pages/musify.py rename to music_kraken/pages/_musify.py index e8078fb..9e4c6ae 100644 --- a/music_kraken/pages/musify.py +++ b/music_kraken/pages/_musify.py @@ -1,34 +1,23 @@ from collections import defaultdict from dataclasses import dataclass from enum import Enum -from typing import List, Optional, Type, Union, Generator, Dict, Any +from typing import Any, Dict, Generator, List, Optional, Type, Union from urllib.parse import urlparse import pycountry from bs4 import BeautifulSoup from ..connection import Connection -from .abstract import Page -from ..utils.enums import SourceType, ALL_SOURCE_TYPES -from ..utils.enums.album import AlbumType, AlbumStatus -from ..objects import ( - Artist, - Source, - Song, - Album, - ID3Timestamp, - FormattedText, - Label, - Target, - DatabaseObject, - Lyrics, - Artwork -) +from ..download import Page +from ..objects import (Album, Artist, Artwork, DatabaseObject, FormattedText, + ID3Timestamp, Label, Lyrics, Song, Source, Target) +from ..utils import shared, string_processing from ..utils.config import logging_settings, main_settings -from ..utils import string_processing, shared +from ..utils.enums import ALL_SOURCE_TYPES, SourceType +from ..utils.enums.album import AlbumStatus, AlbumType from ..utils.string_processing import clean_song_title -from ..utils.support_classes.query import Query from ..utils.support_classes.download_result import DownloadResult +from ..utils.support_classes.query import Query """ https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent diff --git a/music_kraken/pages/youtube.py b/music_kraken/pages/_youtube.py similarity index 95% rename from music_kraken/pages/youtube.py rename to music_kraken/pages/_youtube.py index 8f21c73..5bbee4e 100644 --- a/music_kraken/pages/youtube.py +++ b/music_kraken/pages/_youtube.py @@ -1,29 +1,19 @@ -from typing import List, Optional, Type, Tuple -from urllib.parse import urlparse, urlunparse, parse_qs from enum import Enum +from typing import List, Optional, Tuple, Type +from urllib.parse import parse_qs, urlparse, urlunparse import python_sponsorblock -from ..objects import Source, DatabaseObject, Song, Target -from .abstract import Page -from ..objects import ( - Artist, - Source, - Song, - Album, - Label, - Target, - FormattedText, - ID3Timestamp -) from ..connection import Connection +from ..download import Page +from ..objects import (Album, Artist, DatabaseObject, FormattedText, + ID3Timestamp, Label, Song, Source, Target) +from ..utils.config import logging_settings, main_settings, youtube_settings +from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.string_processing import clean_song_title -from ..utils.enums import SourceType, ALL_SOURCE_TYPES from ..utils.support_classes.download_result import DownloadResult -from ..utils.config import youtube_settings, main_settings, logging_settings - -from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType - +from ._youtube_music.super_youtube import (SuperYouTube, YouTubeUrl, + YouTubeUrlType, get_invidious_url) """ - https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance @@ -38,7 +28,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s class YouTube(SuperYouTube): - # CHANGE + REGISTER = youtube_settings["use_youtube_alongside_youtube_music"] SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE def __init__(self, *args, **kwargs): diff --git a/music_kraken/pages/youtube_music/__init__.py b/music_kraken/pages/_youtube_music/__init__.py similarity index 100% rename from music_kraken/pages/youtube_music/__init__.py rename to music_kraken/pages/_youtube_music/__init__.py diff --git a/music_kraken/pages/youtube_music/_list_render.py b/music_kraken/pages/_youtube_music/_list_render.py similarity index 99% rename from music_kraken/pages/youtube_music/_list_render.py rename to music_kraken/pages/_youtube_music/_list_render.py index 7dcc8cf..29474cd 100644 --- a/music_kraken/pages/youtube_music/_list_render.py +++ b/music_kraken/pages/_youtube_music/_list_render.py @@ -3,7 +3,6 @@ from enum import Enum from ...utils.config import logging_settings from ...objects import Source, DatabaseObject -from ..abstract import Page from ...objects import ( Artist, Source, diff --git a/music_kraken/pages/youtube_music/_music_object_render.py b/music_kraken/pages/_youtube_music/_music_object_render.py similarity index 99% rename from music_kraken/pages/youtube_music/_music_object_render.py rename to music_kraken/pages/_youtube_music/_music_object_render.py index 43aee3e..474f395 100644 --- a/music_kraken/pages/youtube_music/_music_object_render.py +++ b/music_kraken/pages/_youtube_music/_music_object_render.py @@ -6,7 +6,6 @@ from ...utils.string_processing import clean_song_title from ...utils.enums import SourceType, ALL_SOURCE_TYPES from ...objects import Source, DatabaseObject -from ..abstract import Page from ...objects import ( Artist, Source, diff --git a/music_kraken/pages/youtube_music/super_youtube.py b/music_kraken/pages/_youtube_music/super_youtube.py similarity index 93% rename from music_kraken/pages/youtube_music/super_youtube.py rename to music_kraken/pages/_youtube_music/super_youtube.py index df900a1..3e3dced 100644 --- a/music_kraken/pages/youtube_music/super_youtube.py +++ b/music_kraken/pages/_youtube_music/super_youtube.py @@ -1,26 +1,17 @@ -from typing import List, Optional, Type, Tuple -from urllib.parse import urlparse, urlunparse, parse_qs from enum import Enum -import requests +from typing import List, Optional, Tuple, Type +from urllib.parse import parse_qs, urlparse, urlunparse import python_sponsorblock +import requests -from ...objects import Source, DatabaseObject, Song, Target -from ..abstract import Page -from ...objects import ( - Artist, - Source, - Song, - Album, - Label, - Target, - FormattedText, - ID3Timestamp -) from ...connection import Connection +from ...download import Page +from ...objects import (Album, Artist, DatabaseObject, FormattedText, + ID3Timestamp, Label, Song, Source, Target) +from ...utils.config import logging_settings, main_settings, youtube_settings +from ...utils.enums import ALL_SOURCE_TYPES, SourceType from ...utils.support_classes.download_result import DownloadResult -from ...utils.config import youtube_settings, logging_settings, main_settings -from ...utils.enums import SourceType, ALL_SOURCE_TYPES def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: diff --git a/music_kraken/pages/youtube_music/youtube_music.py b/music_kraken/pages/_youtube_music/youtube_music.py similarity index 97% rename from music_kraken/pages/youtube_music/youtube_music.py rename to music_kraken/pages/_youtube_music/youtube_music.py index 08e2207..ad66844 100644 --- a/music_kraken/pages/youtube_music/youtube_music.py +++ b/music_kraken/pages/_youtube_music/youtube_music.py @@ -1,46 +1,33 @@ -from __future__ import unicode_literals, annotations +from __future__ import annotations, unicode_literals -from typing import Dict, List, Optional, Set, Type -from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode +import json import logging import random -import json -from dataclasses import dataclass import re -from functools import lru_cache from collections import defaultdict +from dataclasses import dataclass +from functools import lru_cache +from typing import Dict, List, Optional, Set, Type +from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse import youtube_dl from youtube_dl.extractor.youtube import YoutubeIE from youtube_dl.utils import DownloadError +from ...connection import Connection +from ...download import Page +from ...objects import Album, Artist, Artwork +from ...objects import DatabaseObject as DataObject +from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song, + Source, Target) +from ...utils import dump_to_file, get_current_millis, traverse_json_path +from ...utils.config import logging_settings, main_settings, youtube_settings +from ...utils.enums import ALL_SOURCE_TYPES, SourceType +from ...utils.enums.album import AlbumType from ...utils.exception.config import SettingValueError -from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.string_processing import clean_song_title -from ...utils import get_current_millis, traverse_json_path - -from ...utils import dump_to_file - -from ..abstract import Page -from ...objects import ( - DatabaseObject as DataObject, - Source, - FormattedText, - ID3Timestamp, - Artwork, - Artist, - Song, - Album, - Label, - Target, - Lyrics, -) -from ...connection import Connection -from ...utils.enums import SourceType, ALL_SOURCE_TYPES -from ...utils.enums.album import AlbumType from ...utils.support_classes.download_result import DownloadResult - from ._list_render import parse_renderer from ._music_object_render import parse_run_element from .super_youtube import SuperYouTube diff --git a/music_kraken/pages/abstract.py b/music_kraken/pages/abstract.py deleted file mode 100644 index 8783dbb..0000000 --- a/music_kraken/pages/abstract.py +++ /dev/null @@ -1,157 +0,0 @@ -import logging -import random -import re -from copy import copy -from pathlib import Path -from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict -from string import Formatter -from dataclasses import dataclass, field - -import requests -from bs4 import BeautifulSoup - -from ..connection import Connection -from ..objects import ( - Song, - Source, - Album, - Artist, - Target, - DatabaseObject, - Options, - Collection, - Label, -) -from ..utils.enums import SourceType -from ..utils.enums.album import AlbumType -from ..audio import write_metadata_to_target, correct_codec -from ..utils.config import main_settings -from ..utils.support_classes.query import Query -from ..utils.support_classes.download_result import DownloadResult -from ..utils.string_processing import fit_to_file_system -from ..utils import trace, output, BColors - -INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song] -INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]] - -@dataclass -class FetchOptions: - download_all: bool = False - album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) - -@dataclass -class DownloadOptions: - download_all: bool = False - album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) - - process_audio_if_found: bool = False - process_metadata_if_found: bool = True - -class Page: - SOURCE_TYPE: SourceType - LOGGER: logging.Logger - - def __new__(cls, *args, **kwargs): - cls.LOGGER = logging.getLogger(cls.__name__) - - return super().__new__(cls) - - def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): - self.SOURCE_TYPE.register_page(self) - - self.download_options: DownloadOptions = download_options or DownloadOptions() - self.fetch_options: FetchOptions = fetch_options or FetchOptions() - - def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): - """ - Perform a regex search on the given string, using a single or a list of - patterns returning the first matching group. - In case of failure return a default value or raise a WARNING or a - RegexNotFoundError, depending on fatal, specifying the field name. - """ - - if isinstance(pattern, str): - mobj = re.search(pattern, string, flags) - else: - for p in pattern: - mobj = re.search(p, string, flags) - if mobj: - break - - if mobj: - if group is None: - # return the first matching group - return next(g for g in mobj.groups() if g is not None) - elif isinstance(group, (list, tuple)): - return tuple(mobj.group(g) for g in group) - else: - return mobj.group(group) - - return default - - def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: - return None - - def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: - return BeautifulSoup(r.content, "html.parser") - - # to search stuff - def search(self, query: Query) -> List[DatabaseObject]: - music_object = query.music_object - - search_functions = { - Song: self.song_search, - Album: self.album_search, - Artist: self.artist_search, - Label: self.label_search - } - - if type(music_object) in search_functions: - r = search_functions[type(music_object)](music_object) - if r is not None and len(r) > 0: - return r - - r = [] - for default_query in query.default_search: - for single_option in self.general_search(default_query): - r.append(single_option) - - return r - - def general_search(self, search_query: str) -> List[DatabaseObject]: - return [] - - def label_search(self, label: Label) -> List[Label]: - return [] - - def artist_search(self, artist: Artist) -> List[Artist]: - return [] - - def album_search(self, album: Album) -> List[Album]: - return [] - - def song_search(self, song: Song) -> List[Song]: - return [] - - # to fetch stuff - def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: - return Song() - - def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: - return Album() - - def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: - return Artist() - - def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: - return Label() - - # to download stuff - def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: - return [] - - def post_process_hook(self, song: Song, temp_target: Target, **kwargs): - pass - - def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: - return DownloadResult() diff --git a/music_kraken/utils/__init__.py b/music_kraken/utils/__init__.py index a8d658b..bc386a9 100644 --- a/music_kraken/utils/__init__.py +++ b/music_kraken/utils/__init__.py @@ -1,15 +1,17 @@ -from datetime import datetime -from pathlib import Path +import inspect import json import logging -import inspect +from datetime import datetime +from itertools import takewhile +from pathlib import Path from typing import List, Union -from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK from .config import config, read_config, write_config from .enums.colors import BColors -from .path_manager import LOCATIONS from .hacking import merge_args +from .path_manager import LOCATIONS +from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE, + DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE) """ IO functions @@ -125,4 +127,9 @@ def get_current_millis() -> int: def get_unix_time() -> int: - return int(datetime.now().timestamp()) \ No newline at end of file + return int(datetime.now().timestamp()) + + +def limit_generator(generator, limit: Optional[int] = None): + return takewhile(lambda x: x < limit, generator) if limit is not None else generator + \ No newline at end of file diff --git a/music_kraken/utils/enums/__init__.py b/music_kraken/utils/enums/__init__.py index 28f0b9f..38fc03f 100644 --- a/music_kraken/utils/enums/__init__.py +++ b/music_kraken/utils/enums/__init__.py @@ -17,6 +17,9 @@ class SourceType: def register_page(self, page: Page): self.page = page + def deregister_page(self): + self.page = None + def __hash__(self): return hash(self.name) diff --git a/music_kraken/utils/enums/colors.py b/music_kraken/utils/enums/colors.py index a9fac51..44f79da 100644 --- a/music_kraken/utils/enums/colors.py +++ b/music_kraken/utils/enums/colors.py @@ -1,7 +1,7 @@ from enum import Enum -class BColors(Enum): +class BColors: # https://stackoverflow.com/a/287944 HEADER = "\033[95m" OKBLUE = "\033[94m" diff --git a/music_kraken/utils/exception/__init__.py b/music_kraken/utils/exception/__init__.py index 8f139fb..42b26d7 100644 --- a/music_kraken/utils/exception/__init__.py +++ b/music_kraken/utils/exception/__init__.py @@ -3,6 +3,9 @@ class MKBaseException(Exception): self.message = message super().__init__(message, **kwargs) +# Compose exceptions. Those usually mean a bug on my side. +class MKComposeException(MKBaseException): + pass # Downloading class MKDownloadException(MKBaseException): diff --git a/music_kraken/utils/shared.py b/music_kraken/utils/shared.py index b75cf7f..52613af 100644 --- a/music_kraken/utils/shared.py +++ b/music_kraken/utils/shared.py @@ -1,11 +1,11 @@ -import random -from dotenv import load_dotenv -from pathlib import Path import os +import random +from pathlib import Path +from dotenv import load_dotenv -from .path_manager import LOCATIONS from .config import main_settings +from .path_manager import LOCATIONS if not load_dotenv(Path(__file__).parent.parent.parent / ".env"): load_dotenv(Path(__file__).parent.parent.parent / ".env.example") @@ -51,3 +51,6 @@ have fun :3""".strip() URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+" INT_PATTERN = r"^\d*$" FLOAT_PATTERN = r"^[\d|\,|\.]*$" + + +ALPHABET = "abcdefghijklmnopqrstuvwxyz" diff --git a/music_kraken/utils/string_processing.py b/music_kraken/utils/string_processing.py index b76e3fc..2a7ed23 100644 --- a/music_kraken/utils/string_processing.py +++ b/music_kraken/utils/string_processing.py @@ -1,13 +1,12 @@ -from typing import Tuple, Union, Optional -from pathlib import Path import string from functools import lru_cache +from pathlib import Path +from typing import Optional, Tuple, Union +from urllib.parse import ParseResult, parse_qs, urlparse -from transliterate.exceptions import LanguageDetectionError -from transliterate import translit from pathvalidate import sanitize_filename -from urllib.parse import urlparse, ParseResult, parse_qs - +from transliterate import translit +from transliterate.exceptions import LanguageDetectionError COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( "(official video)", @@ -180,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str: r = r.lower().strip() return r +def hash(self, key: Any) -> int: + try: + key = int(key) + except ValueError: + pass + + if isinstance(key, str): + return hash(unify(key)) + + return hash(key) + def remove_feature_part_from_track(title: str) -> str: if ")" != title[-1]: