From b2362913786f1ced20eb0ae83ad13dfcb007c5b9 Mon Sep 17 00:00:00 2001 From: Lars Noack Date: Wed, 19 Jun 2024 10:04:41 +0200 Subject: [PATCH] feat: implemented downloader --- music_kraken/cli/__init__.py | 5 - music_kraken/cli/genre.py | 53 ---- music_kraken/cli/informations/__init__.py | 1 - music_kraken/cli/informations/paths.py | 22 -- music_kraken/cli/main_downloader.py | 354 ---------------------- music_kraken/cli/options/__init__.py | 0 music_kraken/cli/options/cache.py | 26 -- music_kraken/cli/options/first_config.py | 6 - music_kraken/cli/options/frontend.py | 196 ------------ music_kraken/cli/options/settings.py | 71 ----- music_kraken/cli/utils.py | 47 --- music_kraken/download/__init__.py | 25 ++ 12 files changed, 25 insertions(+), 781 deletions(-) delete mode 100644 music_kraken/cli/__init__.py delete mode 100644 music_kraken/cli/genre.py delete mode 100644 music_kraken/cli/informations/__init__.py delete mode 100644 music_kraken/cli/informations/paths.py delete mode 100644 music_kraken/cli/main_downloader.py delete mode 100644 music_kraken/cli/options/__init__.py delete mode 100644 music_kraken/cli/options/cache.py delete mode 100644 music_kraken/cli/options/first_config.py delete mode 100644 music_kraken/cli/options/frontend.py delete mode 100644 music_kraken/cli/options/settings.py delete mode 100644 music_kraken/cli/utils.py diff --git a/music_kraken/cli/__init__.py b/music_kraken/cli/__init__.py deleted file mode 100644 index 1dbf213..0000000 --- a/music_kraken/cli/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .informations import print_paths -from .main_downloader import download -from .options.settings import settings -from .options.frontend import set_frontend - diff --git a/music_kraken/cli/genre.py b/music_kraken/cli/genre.py deleted file mode 100644 index 3580495..0000000 --- a/music_kraken/cli/genre.py +++ /dev/null @@ -1,53 +0,0 @@ -import re -from pathlib import Path -from typing import Dict, Generator, List, Set, Type, Union - -from ..download import Downloader, Page, components -from ..utils.config import main_settings -from .utils import ask_for_bool, cli_function - - -class GenreIO(components.HumanIO): - @staticmethod - def ask_to_create(option: components.Option) -> bool: - output() - return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}") - - @staticmethod - def not_found(key: str) -> None: - output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL) - - -def _genre_generator() -> Generator[str, None, None]: - def is_valid_genre(genre: Path) -> bool: - """ - gets the name of all subdirectories of shared.MUSIC_DIR, - but filters out all directories, where the name matches with any Patern - from shared.NOT_A_GENRE_REGEX. - """ - if not genre.is_dir(): - return False - - if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): - return False - - return True - - for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()): - yield genre.name - -def get_genre() -> str: - select_genre = components.Select( - human_io=GenreIO, - can_create_options=True, - data=_genre_generator(), - ) - genre: Optional[components.Option[str]] = None - - while genre is None: - print(select_genre.pprint()) - print() - - genre = select_genre.choose(input("> ")) - - return genre.value diff --git a/music_kraken/cli/informations/__init__.py b/music_kraken/cli/informations/__init__.py deleted file mode 100644 index be110bf..0000000 --- a/music_kraken/cli/informations/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .paths import print_paths \ No newline at end of file diff --git a/music_kraken/cli/informations/paths.py b/music_kraken/cli/informations/paths.py deleted file mode 100644 index 327b351..0000000 --- a/music_kraken/cli/informations/paths.py +++ /dev/null @@ -1,22 +0,0 @@ -from ..utils import cli_function - -from ...utils.path_manager import LOCATIONS -from ...utils.config import main_settings - - -def all_paths(): - return { - "Temp dir": main_settings["temp_directory"], - "Music dir": main_settings["music_directory"], - "Conf dir": LOCATIONS.CONFIG_DIRECTORY, - "Conf file": LOCATIONS.CONFIG_FILE, - "logging file": main_settings["log_file"], - "FFMPEG bin": main_settings["ffmpeg_binary"], - "Cache Dir": main_settings["cache_directory"], - } - - -@cli_function -def print_paths(): - for name, path in all_paths().items(): - print(f"{name}:\t{path}") \ No newline at end of file diff --git a/music_kraken/cli/main_downloader.py b/music_kraken/cli/main_downloader.py deleted file mode 100644 index eb01c1a..0000000 --- a/music_kraken/cli/main_downloader.py +++ /dev/null @@ -1,354 +0,0 @@ -import random -import re -from pathlib import Path -from typing import Dict, Generator, List, Set, Type, Union - -from .. import console -from ..download import Downloader, Page, components -from ..download.results import GoToResults -from ..download.results import Option as ResultOption -from ..download.results import PageResults, Results -from ..objects import Album, Artist, DatabaseObject, Song -from ..utils import BColors, output -from ..utils.config import main_settings, write_config -from ..utils.enums.colors import BColors -from ..utils.exception import MKInvalidInputException -from ..utils.exception.download import UrlNotFoundException -from ..utils.shared import HELP_MESSAGE, URL_PATTERN -from ..utils.string_processing import fit_to_file_system -from ..utils.support_classes.download_result import DownloadResult -from ..utils.support_classes.query import Query -from .genre import get_genre -from .options.first_config import initial_config -from .utils import ask_for_bool, cli_function - -EXIT_COMMANDS = {"q", "quit", "exit", "abort"} -ALPHABET = "abcdefghijklmnopqrstuvwxyz" -PAGE_NAME_FILL = "-" -MAX_PAGE_LEN = 21 - - - - - -def help_message(): - print(HELP_MESSAGE) - print() - print(random.choice(main_settings["happy_messages"])) - print() - - -class CliDownloader: - def __init__( - self, - exclude_pages: Set[Type[Page]] = None, - exclude_shady: bool = False, - max_displayed_options: int = 10, - option_digits: int = 3, - genre: str = None, - process_metadata_anyway: bool = False, - ) -> None: - self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady) - - self.page_dict: Dict[str, Type[Page]] = dict() - - self.max_displayed_options = max_displayed_options - self.option_digits: int = option_digits - - self.current_results: Results = None - self._result_history: List[Results] = [] - - self.genre = genre or get_genre() - self.process_metadata_anyway = process_metadata_anyway - - output() - output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER) - output() - - def print_current_options(self): - - print() - print(self.current_results.pprint()) - - """ - self.page_dict = dict() - - page_count = 0 - for option in self.current_results.formatted_generator(): - if isinstance(option, ResultOption): - r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" - print(r) - else: - prefix = ALPHABET[page_count % len(ALPHABET)] - print( - f"{BColors.HEADER.value}({prefix}) --------------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}--------------------{BColors.ENDC.value}") - - self.page_dict[prefix] = option - self.page_dict[option.__name__] = option - - page_count += 1 - """ - - print() - - def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]): - current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options) - - if main_settings["result_history"]: - self._result_history.append(current_options) - - if main_settings["history_length"] != -1: - if len(self._result_history) > main_settings["history_length"]: - self._result_history.pop(0) - - self.current_results = current_options - - def previous_option(self) -> bool: - if not main_settings["result_history"]: - print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.") - return False - - if len(self._result_history) <= 1: - print(f"No results in history.") - return False - self._result_history.pop() - self.current_results = self._result_history[-1] - return True - - def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query: - # strip all the values in key_text - key_text = {key: value.strip() for key, value in key_text.items()} - - song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True) - album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True) - artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True) - - if song is not None: - if album is not None: - song.album_collection.append(album) - if artist is not None: - song.artist_collection.append(artist) - return Query(raw_query=query, music_object=song) - - if album is not None: - if artist is not None: - album.artist_collection.append(artist) - return Query(raw_query=query, music_object=album) - - if artist is not None: - return Query(raw_query=query, music_object=artist) - - return Query(raw_query=query) - - def search(self, query: str): - if re.match(URL_PATTERN, query) is not None: - try: - data_object = self.downloader.fetch_url(query) - except UrlNotFoundException as e: - print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" - f"PR appreciated if the site isn't implemented.\n" - f"Recommendations and suggestions on sites to implement appreciated.\n" - f"But don't be a bitch if I don't end up implementing it.") - return - self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options)) - self.print_current_options() - return - - special_characters = "#\\" - query = query + " " - - key_text = {} - - skip_next = False - escape_next = False - new_text = "" - latest_key: str = None - for i in range(len(query) - 1): - current_char = query[i] - next_char = query[i + 1] - - if skip_next: - skip_next = False - continue - - if escape_next: - new_text += current_char - escape_next = False - - # escaping - if current_char == "\\": - if next_char in special_characters: - escape_next = True - continue - - if current_char == "#": - if latest_key is not None: - key_text[latest_key] = new_text - new_text = "" - - latest_key = next_char - skip_next = True - continue - - new_text += current_char - - if latest_key is not None: - key_text[latest_key] = new_text - - parsed_query: Query = self._process_parsed(key_text, query) - - self.set_current_options(self.downloader.search(parsed_query)) - self.print_current_options() - - def goto(self, data_object: Union[DatabaseObject, components.Select]): - page: Type[Page] - - if isinstance(data_object, components.Select): - self.set_current_options(data_object) - else: - self.downloader.fetch_details(data_object, stop_at_level=1) - self.set_current_options(data_object.options) - - self.print_current_options() - - def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: - output() - if len(data_objects) > 1: - output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n") - - _result_map: Dict[DatabaseObject, DownloadResult] = dict() - - for database_object in data_objects: - r = self.downloader.download( - data_object=database_object, - genre=self.genre, - **kwargs - ) - _result_map[database_object] = r - - for music_object, result in _result_map.items(): - output() - output(music_object.option_string) - output(result) - - return True - - def process_input(self, input_str: str) -> bool: - try: - input_str = input_str.strip() - processed_input: str = input_str.lower() - - if processed_input in EXIT_COMMANDS: - return True - - if processed_input == ".": - self.print_current_options() - return False - - if processed_input == "..": - if self.previous_option(): - self.print_current_options() - return False - - command = "" - query = processed_input - if ":" in processed_input: - _ = processed_input.split(":") - command, query = _[0], ":".join(_[1:]) - - do_search = "s" in command - do_fetch = "f" in command - do_download = "d" in command - do_merge = "m" in command - - if do_search and (do_download or do_fetch or do_merge): - raise MKInvalidInputException(message="You can't search and do another operation at the same time.") - - if do_search: - self.search(":".join(input_str.split(":")[1:])) - return False - - def get_selected_objects(q: str): - if q.strip().lower() == "all": - return list(self.current_results) - - indices = [] - for possible_index in q.split(","): - if possible_index == "": - continue - - if possible_index not in self.current_results: - raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.") - - yield self.current_results[possible_index] - - selected_objects = list(get_selected_objects(query)) - - if do_merge: - old_selected_objects = selected_objects - - a = old_selected_objects[0] - for b in old_selected_objects[1:]: - if type(a) != type(b): - raise MKInvalidInputException(message="You can't merge different types of objects.") - a.merge(b) - - selected_objects = [a] - - if do_fetch: - for data_object in selected_objects: - self.downloader.fetch_details(data_object) - - self.print_current_options() - return False - - if do_download: - self.download(list(o.value for o in selected_objects)) - return False - - if len(selected_objects) != 1: - raise MKInvalidInputException(message="You can only go to one object at a time without merging.") - - self.goto(selected_objects[0].value) - return False - except MKInvalidInputException as e: - output("\n" + e.message + "\n", color=BColors.FAIL) - help_message() - - return False - - def mainloop(self): - while True: - if self.process_input(input("> ")): - return - - -@cli_function -def download( - genre: str = None, - download_all: bool = False, - direct_download_url: str = None, - command_list: List[str] = None, - process_metadata_anyway: bool = False, -): - if main_settings["hasnt_yet_started"]: - code = initial_config() - if code == 0: - main_settings["hasnt_yet_started"] = False - write_config() - print(f"{BColors.OKGREEN.value}Restart the programm to use it.{BColors.ENDC.value}") - else: - print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") - - shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway) - - if command_list is not None: - for command in command_list: - shell.process_input(command) - return - - if direct_download_url is not None: - if shell.download(direct_download_url, download_all=download_all): - return - - shell.mainloop() diff --git a/music_kraken/cli/options/__init__.py b/music_kraken/cli/options/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/music_kraken/cli/options/cache.py b/music_kraken/cli/options/cache.py deleted file mode 100644 index 42cb76b..0000000 --- a/music_kraken/cli/options/cache.py +++ /dev/null @@ -1,26 +0,0 @@ -from logging import getLogger - -from ..utils import cli_function -from ...connection.cache import Cache - - -@cli_function -def clear_cache(): - """ - Deletes the cache. - :return: - """ - - Cache("main", getLogger("cache")).clear() - print("Cleared cache") - - -@cli_function -def clean_cache(): - """ - Deletes the outdated cache. (all expired cached files, and not indexed files) - :return: - """ - - Cache("main", getLogger("cache")).clean() - print("Cleaned cache") diff --git a/music_kraken/cli/options/first_config.py b/music_kraken/cli/options/first_config.py deleted file mode 100644 index 6160265..0000000 --- a/music_kraken/cli/options/first_config.py +++ /dev/null @@ -1,6 +0,0 @@ -from .frontend import set_frontend - - -def initial_config(): - code = set_frontend(no_cli=True) - return code diff --git a/music_kraken/cli/options/frontend.py b/music_kraken/cli/options/frontend.py deleted file mode 100644 index 64838c4..0000000 --- a/music_kraken/cli/options/frontend.py +++ /dev/null @@ -1,196 +0,0 @@ -from typing import Dict, List -from dataclasses import dataclass -from collections import defaultdict -from urllib.parse import urlparse - -from ..utils import cli_function - -from ...objects import Country -from ...utils import config, write_config -from ...utils.config import youtube_settings -from ...connection import Connection - - -@dataclass -class Instance: - """ - Attributes which influence the quality of an instance: - - - users - """ - name: str - uri: str - regions: List[Country] - users: int = 0 - - def __str__(self) -> str: - return f"{self.name} with {self.users} users." - - -class FrontendInstance: - SETTING_NAME = "placeholder" - - def __init__(self) -> None: - self.region_instances: Dict[Country, List[Instance]] = defaultdict(list) - self.all_instances: List[Instance] = [] - - def add_instance(self, instance: Instance): - self.all_instances.append(instance) - - youtube_lists = youtube_settings["youtube_url"] - existing_netlocs = set(tuple(url.netloc for url in youtube_lists)) - - parsed_instance = urlparse(instance.uri) - instance_netloc = parsed_instance.netloc - - if instance_netloc not in existing_netlocs: - youtube_lists.append(parsed_instance) - youtube_settings.__setitem__("youtube_url", youtube_lists, is_parsed=True) - - for region in instance.regions: - self.region_instances[region].append(instance) - - def fetch(self, silent: bool = False): - if not silent: - print(f"Downloading {type(self).__name__} instances...") - - def set_instance(self, instance: Instance): - youtube_settings.__setitem__(self.SETTING_NAME, instance.uri) - - def _choose_country(self) -> List[Instance]: - print("Input the country code, an example would be \"US\"") - print('\n'.join(f'{region.name} ({region.alpha_2})' for region in self.region_instances)) - print() - - - available_instances = set(i.alpha_2 for i in self.region_instances) - - chosen_region = "" - - while chosen_region not in available_instances: - chosen_region = input("nearest country: ").strip().upper() - - return self.region_instances[Country.by_alpha_2(chosen_region)] - - def choose(self, silent: bool = False): - instances = self.all_instances if silent else self._choose_country() - instances.sort(key=lambda x: x.users, reverse=True) - - if silent: - self.set_instance(instances[0]) - return - - # output the options - print("Choose your instance (input needs to be a digit):") - for i, instance in enumerate(instances): - print(f"{i}) {instance}") - - print() - - # ask for index - index = "" - while not index.isdigit() or int(index) >= len(instances): - index = input("> ").strip() - - instance = instances[int(index)] - print() - print(f"Setting the instance to {instance}") - - self.set_instance(instance) - - -class Invidious(FrontendInstance): - SETTING_NAME = "invidious_instance" - - def __init__(self) -> None: - self.connection = Connection(host="https://api.invidious.io/") - self.endpoint = "https://api.invidious.io/instances.json" - - super().__init__() - - - def _process_instance(self, all_instance_data: dict): - instance_data = all_instance_data[1] - stats = instance_data["stats"] - - if not instance_data["api"]: - return - if instance_data["type"] != "https": - return - - region = instance_data["region"] - - instance = Instance( - name=all_instance_data[0], - uri=instance_data["uri"], - regions=[Country.by_alpha_2(region)], - users=stats["usage"]["users"]["total"] - ) - - self.add_instance(instance) - - def fetch(self, silent: bool): - r = self.connection.get(self.endpoint) - if r is None: - return - - for instance in r.json(): - self._process_instance(all_instance_data=instance) - - -class Piped(FrontendInstance): - SETTING_NAME = "piped_instance" - - def __init__(self) -> None: - self.connection = Connection(host="https://raw.githubusercontent.com") - - super().__init__() - - def process_instance(self, instance_data: str): - cells = instance_data.split(" | ") - - instance = Instance( - name=cells[0].strip(), - uri=cells[1].strip(), - regions=[Country.by_emoji(flag) for flag in cells[2].split(", ")] - ) - - self.add_instance(instance) - - def fetch(self, silent: bool = False): - r = self.connection.get("https://raw.githubusercontent.com/wiki/TeamPiped/Piped-Frontend/Instances.md") - if r is None: - return - - process = False - - for line in r.content.decode("utf-8").split("\n"): - line = line.strip() - - if line != "" and process: - self.process_instance(line) - - if line.startswith("---"): - process = True - - -class FrontendSelection: - def __init__(self): - self.invidious = Invidious() - self.piped = Piped() - - def choose(self, silent: bool = False): - self.invidious.fetch(silent) - self.invidious.choose(silent) - - self.piped.fetch(silent) - self.piped.choose(silent) - - -@cli_function -def set_frontend(silent: bool = False): - shell = FrontendSelection() - shell.choose(silent=silent) - - return 0 - \ No newline at end of file diff --git a/music_kraken/cli/options/settings.py b/music_kraken/cli/options/settings.py deleted file mode 100644 index 3ba0ade..0000000 --- a/music_kraken/cli/options/settings.py +++ /dev/null @@ -1,71 +0,0 @@ -from ..utils import cli_function - -from ...utils.config import config, write_config -from ...utils import exception - - -def modify_setting(_name: str, _value: str, invalid_ok: bool = True) -> bool: - try: - config.set_name_to_value(_name, _value) - except exception.config.SettingException as e: - if invalid_ok: - print(e) - return False - else: - raise e - - write_config() - return True - - -def print_settings(): - for i, attribute in enumerate(config): - print(f"{i:0>2}: {attribute.name}={attribute.value}") - - - def modify_setting_by_index(index: int) -> bool: - attribute = list(config)[index] - - print() - print(attribute) - - input__ = input(f"{attribute.name}=") - if not modify_setting(attribute.name, input__.strip()): - return modify_setting_by_index(index) - - return True - - -def modify_setting_by_index(index: int) -> bool: - attribute = list(config)[index] - - print() - print(attribute) - - input__ = input(f"{attribute.name}=") - if not modify_setting(attribute.name, input__.strip()): - return modify_setting_by_index(index) - - return True - - -@cli_function -def settings( - name: str = None, - value: str = None, -): - if name is not None and value is not None: - modify_setting(name, value, invalid_ok=True) - return - - while True: - print_settings() - - input_ = input("Id of setting to modify: ") - print() - if input_.isdigit() and int(input_) < len(config): - if modify_setting_by_index(int(input_)): - return - else: - print("Please input a valid ID.") - print() \ No newline at end of file diff --git a/music_kraken/cli/utils.py b/music_kraken/cli/utils.py deleted file mode 100644 index 1acfe33..0000000 --- a/music_kraken/cli/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -from ..utils import BColors -from ..utils.shared import get_random_message - - -def cli_function(function): - def wrapper(*args, **kwargs): - silent = kwargs.get("no_cli", False) - if "no_cli" in kwargs: - del kwargs["no_cli"] - - if silent: - return function(*args, **kwargs) - return - - code = 0 - - print_cute_message() - print() - try: - code = function(*args, **kwargs) - except KeyboardInterrupt: - print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues") - - finally: - print() - print_cute_message() - print("See you soon! :3") - - exit() - - return wrapper - - -def print_cute_message(): - message = get_random_message() - try: - print(message) - except UnicodeEncodeError: - message = str(c for c in message if 0 < ord(c) < 127) - print(message) - - -AGREE_INPUTS = {"y", "yes", "ok"} -def ask_for_bool(msg: str) -> bool: - i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower() - return i in AGREE_INPUTS - \ No newline at end of file diff --git a/music_kraken/download/__init__.py b/music_kraken/download/__init__.py index 99ff41d..21567f8 100644 --- a/music_kraken/download/__init__.py +++ b/music_kraken/download/__init__.py @@ -415,6 +415,31 @@ class Downloader: return source.page.fetch_object_from_source(source=source, **kwargs) + # misc function + + def get_existing_genres(self) -> Generator[str, None, None]: + """Yields every existing genre, for the user to select from. + + Yields: + Generator[str, None, None]: a generator that yields every existing genre. + """ + + def is_valid_genre(genre: Path) -> bool: + """ + gets the name of all subdirectories of shared.MUSIC_DIR, + but filters out all directories, where the name matches with any Patern + from shared.NOT_A_GENRE_REGEX. + """ + if not genre.is_dir(): + return False + + if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): + return False + + return True + + for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()): + yield genre.name class Page: REGISTER = True