diff --git a/music_kraken/__meta__.py b/music_kraken/__meta__.py new file mode 100644 index 0000000..9a47f73 --- /dev/null +++ b/music_kraken/__meta__.py @@ -0,0 +1,3 @@ +PROGRAMM: str = "music-kraken" +DESCRIPTION: str = """This program will first get the metadata of various songs from metadata providers like musicbrainz, and then search for download links on pages like bandcamp. +Then it will download the song and edit the metadata accordingly.""" diff --git a/music_kraken/development_cli/__init__.py b/music_kraken/development_cli/__init__.py new file mode 100644 index 0000000..7cc3b15 --- /dev/null +++ b/music_kraken/development_cli/__init__.py @@ -0,0 +1,77 @@ +import argparse +from functools import cached_property + +from ..__meta__ import DESCRIPTION, PROGRAMM +from ..download import Downloader +from ..utils import BColors +from ..utils.string_processing import unify +from .utils import ask_for_bool, ask_for_create + + +class Selection: + def __init__(self, options: list): + self.options = options + + def pprint(self): + return "\n".join(f"{i}: {option}" for i, option in enumerate(self.options)) + + def choose(self, input_: str): + try: + return self.options[int(input_)] + except (ValueError, IndexError): + return None + + +class DevelopmentCli: + def __init__(self, args: argparse.Namespace): + self.args = args + + if args.genre: + self.genre = args.genre + + self.downloader: Downloader = Downloader() + + @cached_property + def genre(self) -> str: + """This is a cached property, which means if it isn't set in the constructor or before it is accessed, + the program will be thrown in a shell + + Returns: + str: the genre that should be used + """ + option_string = f"{BColors.HEADER}Genres{BColors.ENDC}" + genre_map = {} + + _string_list = [] + for i, genre in enumerate(self.downloader.get_existing_genres()): + option_string += f"\n{BColors.BOLD}{i}{BColors.ENDC}: {genre}" + + genre_map[str(i)] = genre + genre_map[unify(genre)] = genre + + genre = None + while genre is None: + print(option_string) + print() + + i = input("> ") + u = unify(i) + if u in genre_map: + genre = genre_map[u] + break + + if ask_for_create("genre", i): + genre = i + break + + return genre + + +def main(): + parser = argparse.ArgumentParser( + prog=PROGRAMM, + description=DESCRIPTION, + epilog='This is just a development cli. The real frontend is coming soon.' + ) + parser.add_argument('--genre', '-g', action='store_const') + args = parser.parse_args() diff --git a/music_kraken/development_cli/genre.py b/music_kraken/development_cli/genre.py new file mode 100644 index 0000000..3580495 --- /dev/null +++ b/music_kraken/development_cli/genre.py @@ -0,0 +1,53 @@ +import re +from pathlib import Path +from typing import Dict, Generator, List, Set, Type, Union + +from ..download import Downloader, Page, components +from ..utils.config import main_settings +from .utils import ask_for_bool, cli_function + + +class GenreIO(components.HumanIO): + @staticmethod + def ask_to_create(option: components.Option) -> bool: + output() + return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}") + + @staticmethod + def not_found(key: str) -> None: + output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL) + + +def _genre_generator() -> Generator[str, None, None]: + def is_valid_genre(genre: Path) -> bool: + """ + gets the name of all subdirectories of shared.MUSIC_DIR, + but filters out all directories, where the name matches with any Patern + from shared.NOT_A_GENRE_REGEX. + """ + if not genre.is_dir(): + return False + + if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): + return False + + return True + + for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()): + yield genre.name + +def get_genre() -> str: + select_genre = components.Select( + human_io=GenreIO, + can_create_options=True, + data=_genre_generator(), + ) + genre: Optional[components.Option[str]] = None + + while genre is None: + print(select_genre.pprint()) + print() + + genre = select_genre.choose(input("> ")) + + return genre.value diff --git a/music_kraken/development_cli/informations/__init__.py b/music_kraken/development_cli/informations/__init__.py new file mode 100644 index 0000000..be110bf --- /dev/null +++ b/music_kraken/development_cli/informations/__init__.py @@ -0,0 +1 @@ +from .paths import print_paths \ No newline at end of file diff --git a/music_kraken/development_cli/informations/paths.py b/music_kraken/development_cli/informations/paths.py new file mode 100644 index 0000000..327b351 --- /dev/null +++ b/music_kraken/development_cli/informations/paths.py @@ -0,0 +1,22 @@ +from ..utils import cli_function + +from ...utils.path_manager import LOCATIONS +from ...utils.config import main_settings + + +def all_paths(): + return { + "Temp dir": main_settings["temp_directory"], + "Music dir": main_settings["music_directory"], + "Conf dir": LOCATIONS.CONFIG_DIRECTORY, + "Conf file": LOCATIONS.CONFIG_FILE, + "logging file": main_settings["log_file"], + "FFMPEG bin": main_settings["ffmpeg_binary"], + "Cache Dir": main_settings["cache_directory"], + } + + +@cli_function +def print_paths(): + for name, path in all_paths().items(): + print(f"{name}:\t{path}") \ No newline at end of file diff --git a/music_kraken/development_cli/main_downloader.py b/music_kraken/development_cli/main_downloader.py new file mode 100644 index 0000000..eb01c1a --- /dev/null +++ b/music_kraken/development_cli/main_downloader.py @@ -0,0 +1,354 @@ +import random +import re +from pathlib import Path +from typing import Dict, Generator, List, Set, Type, Union + +from .. import console +from ..download import Downloader, Page, components +from ..download.results import GoToResults +from ..download.results import Option as ResultOption +from ..download.results import PageResults, Results +from ..objects import Album, Artist, DatabaseObject, Song +from ..utils import BColors, output +from ..utils.config import main_settings, write_config +from ..utils.enums.colors import BColors +from ..utils.exception import MKInvalidInputException +from ..utils.exception.download import UrlNotFoundException +from ..utils.shared import HELP_MESSAGE, URL_PATTERN +from ..utils.string_processing import fit_to_file_system +from ..utils.support_classes.download_result import DownloadResult +from ..utils.support_classes.query import Query +from .genre import get_genre +from .options.first_config import initial_config +from .utils import ask_for_bool, cli_function + +EXIT_COMMANDS = {"q", "quit", "exit", "abort"} +ALPHABET = "abcdefghijklmnopqrstuvwxyz" +PAGE_NAME_FILL = "-" +MAX_PAGE_LEN = 21 + + + + + +def help_message(): + print(HELP_MESSAGE) + print() + print(random.choice(main_settings["happy_messages"])) + print() + + +class CliDownloader: + def __init__( + self, + exclude_pages: Set[Type[Page]] = None, + exclude_shady: bool = False, + max_displayed_options: int = 10, + option_digits: int = 3, + genre: str = None, + process_metadata_anyway: bool = False, + ) -> None: + self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady) + + self.page_dict: Dict[str, Type[Page]] = dict() + + self.max_displayed_options = max_displayed_options + self.option_digits: int = option_digits + + self.current_results: Results = None + self._result_history: List[Results] = [] + + self.genre = genre or get_genre() + self.process_metadata_anyway = process_metadata_anyway + + output() + output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER) + output() + + def print_current_options(self): + + print() + print(self.current_results.pprint()) + + """ + self.page_dict = dict() + + page_count = 0 + for option in self.current_results.formatted_generator(): + if isinstance(option, ResultOption): + r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" + print(r) + else: + prefix = ALPHABET[page_count % len(ALPHABET)] + print( + f"{BColors.HEADER.value}({prefix}) --------------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}--------------------{BColors.ENDC.value}") + + self.page_dict[prefix] = option + self.page_dict[option.__name__] = option + + page_count += 1 + """ + + print() + + def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]): + current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options) + + if main_settings["result_history"]: + self._result_history.append(current_options) + + if main_settings["history_length"] != -1: + if len(self._result_history) > main_settings["history_length"]: + self._result_history.pop(0) + + self.current_results = current_options + + def previous_option(self) -> bool: + if not main_settings["result_history"]: + print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.") + return False + + if len(self._result_history) <= 1: + print(f"No results in history.") + return False + self._result_history.pop() + self.current_results = self._result_history[-1] + return True + + def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query: + # strip all the values in key_text + key_text = {key: value.strip() for key, value in key_text.items()} + + song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True) + album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True) + artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True) + + if song is not None: + if album is not None: + song.album_collection.append(album) + if artist is not None: + song.artist_collection.append(artist) + return Query(raw_query=query, music_object=song) + + if album is not None: + if artist is not None: + album.artist_collection.append(artist) + return Query(raw_query=query, music_object=album) + + if artist is not None: + return Query(raw_query=query, music_object=artist) + + return Query(raw_query=query) + + def search(self, query: str): + if re.match(URL_PATTERN, query) is not None: + try: + data_object = self.downloader.fetch_url(query) + except UrlNotFoundException as e: + print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" + f"PR appreciated if the site isn't implemented.\n" + f"Recommendations and suggestions on sites to implement appreciated.\n" + f"But don't be a bitch if I don't end up implementing it.") + return + self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options)) + self.print_current_options() + return + + special_characters = "#\\" + query = query + " " + + key_text = {} + + skip_next = False + escape_next = False + new_text = "" + latest_key: str = None + for i in range(len(query) - 1): + current_char = query[i] + next_char = query[i + 1] + + if skip_next: + skip_next = False + continue + + if escape_next: + new_text += current_char + escape_next = False + + # escaping + if current_char == "\\": + if next_char in special_characters: + escape_next = True + continue + + if current_char == "#": + if latest_key is not None: + key_text[latest_key] = new_text + new_text = "" + + latest_key = next_char + skip_next = True + continue + + new_text += current_char + + if latest_key is not None: + key_text[latest_key] = new_text + + parsed_query: Query = self._process_parsed(key_text, query) + + self.set_current_options(self.downloader.search(parsed_query)) + self.print_current_options() + + def goto(self, data_object: Union[DatabaseObject, components.Select]): + page: Type[Page] + + if isinstance(data_object, components.Select): + self.set_current_options(data_object) + else: + self.downloader.fetch_details(data_object, stop_at_level=1) + self.set_current_options(data_object.options) + + self.print_current_options() + + def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: + output() + if len(data_objects) > 1: + output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n") + + _result_map: Dict[DatabaseObject, DownloadResult] = dict() + + for database_object in data_objects: + r = self.downloader.download( + data_object=database_object, + genre=self.genre, + **kwargs + ) + _result_map[database_object] = r + + for music_object, result in _result_map.items(): + output() + output(music_object.option_string) + output(result) + + return True + + def process_input(self, input_str: str) -> bool: + try: + input_str = input_str.strip() + processed_input: str = input_str.lower() + + if processed_input in EXIT_COMMANDS: + return True + + if processed_input == ".": + self.print_current_options() + return False + + if processed_input == "..": + if self.previous_option(): + self.print_current_options() + return False + + command = "" + query = processed_input + if ":" in processed_input: + _ = processed_input.split(":") + command, query = _[0], ":".join(_[1:]) + + do_search = "s" in command + do_fetch = "f" in command + do_download = "d" in command + do_merge = "m" in command + + if do_search and (do_download or do_fetch or do_merge): + raise MKInvalidInputException(message="You can't search and do another operation at the same time.") + + if do_search: + self.search(":".join(input_str.split(":")[1:])) + return False + + def get_selected_objects(q: str): + if q.strip().lower() == "all": + return list(self.current_results) + + indices = [] + for possible_index in q.split(","): + if possible_index == "": + continue + + if possible_index not in self.current_results: + raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.") + + yield self.current_results[possible_index] + + selected_objects = list(get_selected_objects(query)) + + if do_merge: + old_selected_objects = selected_objects + + a = old_selected_objects[0] + for b in old_selected_objects[1:]: + if type(a) != type(b): + raise MKInvalidInputException(message="You can't merge different types of objects.") + a.merge(b) + + selected_objects = [a] + + if do_fetch: + for data_object in selected_objects: + self.downloader.fetch_details(data_object) + + self.print_current_options() + return False + + if do_download: + self.download(list(o.value for o in selected_objects)) + return False + + if len(selected_objects) != 1: + raise MKInvalidInputException(message="You can only go to one object at a time without merging.") + + self.goto(selected_objects[0].value) + return False + except MKInvalidInputException as e: + output("\n" + e.message + "\n", color=BColors.FAIL) + help_message() + + return False + + def mainloop(self): + while True: + if self.process_input(input("> ")): + return + + +@cli_function +def download( + genre: str = None, + download_all: bool = False, + direct_download_url: str = None, + command_list: List[str] = None, + process_metadata_anyway: bool = False, +): + if main_settings["hasnt_yet_started"]: + code = initial_config() + if code == 0: + main_settings["hasnt_yet_started"] = False + write_config() + print(f"{BColors.OKGREEN.value}Restart the programm to use it.{BColors.ENDC.value}") + else: + print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") + + shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway) + + if command_list is not None: + for command in command_list: + shell.process_input(command) + return + + if direct_download_url is not None: + if shell.download(direct_download_url, download_all=download_all): + return + + shell.mainloop() diff --git a/music_kraken/development_cli/options/__init__.py b/music_kraken/development_cli/options/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/music_kraken/development_cli/options/cache.py b/music_kraken/development_cli/options/cache.py new file mode 100644 index 0000000..42cb76b --- /dev/null +++ b/music_kraken/development_cli/options/cache.py @@ -0,0 +1,26 @@ +from logging import getLogger + +from ..utils import cli_function +from ...connection.cache import Cache + + +@cli_function +def clear_cache(): + """ + Deletes the cache. + :return: + """ + + Cache("main", getLogger("cache")).clear() + print("Cleared cache") + + +@cli_function +def clean_cache(): + """ + Deletes the outdated cache. (all expired cached files, and not indexed files) + :return: + """ + + Cache("main", getLogger("cache")).clean() + print("Cleaned cache") diff --git a/music_kraken/development_cli/options/first_config.py b/music_kraken/development_cli/options/first_config.py new file mode 100644 index 0000000..6160265 --- /dev/null +++ b/music_kraken/development_cli/options/first_config.py @@ -0,0 +1,6 @@ +from .frontend import set_frontend + + +def initial_config(): + code = set_frontend(no_cli=True) + return code diff --git a/music_kraken/development_cli/options/frontend.py b/music_kraken/development_cli/options/frontend.py new file mode 100644 index 0000000..64838c4 --- /dev/null +++ b/music_kraken/development_cli/options/frontend.py @@ -0,0 +1,196 @@ +from typing import Dict, List +from dataclasses import dataclass +from collections import defaultdict +from urllib.parse import urlparse + +from ..utils import cli_function + +from ...objects import Country +from ...utils import config, write_config +from ...utils.config import youtube_settings +from ...connection import Connection + + +@dataclass +class Instance: + """ + Attributes which influence the quality of an instance: + + - users + """ + name: str + uri: str + regions: List[Country] + users: int = 0 + + def __str__(self) -> str: + return f"{self.name} with {self.users} users." + + +class FrontendInstance: + SETTING_NAME = "placeholder" + + def __init__(self) -> None: + self.region_instances: Dict[Country, List[Instance]] = defaultdict(list) + self.all_instances: List[Instance] = [] + + def add_instance(self, instance: Instance): + self.all_instances.append(instance) + + youtube_lists = youtube_settings["youtube_url"] + existing_netlocs = set(tuple(url.netloc for url in youtube_lists)) + + parsed_instance = urlparse(instance.uri) + instance_netloc = parsed_instance.netloc + + if instance_netloc not in existing_netlocs: + youtube_lists.append(parsed_instance) + youtube_settings.__setitem__("youtube_url", youtube_lists, is_parsed=True) + + for region in instance.regions: + self.region_instances[region].append(instance) + + def fetch(self, silent: bool = False): + if not silent: + print(f"Downloading {type(self).__name__} instances...") + + def set_instance(self, instance: Instance): + youtube_settings.__setitem__(self.SETTING_NAME, instance.uri) + + def _choose_country(self) -> List[Instance]: + print("Input the country code, an example would be \"US\"") + print('\n'.join(f'{region.name} ({region.alpha_2})' for region in self.region_instances)) + print() + + + available_instances = set(i.alpha_2 for i in self.region_instances) + + chosen_region = "" + + while chosen_region not in available_instances: + chosen_region = input("nearest country: ").strip().upper() + + return self.region_instances[Country.by_alpha_2(chosen_region)] + + def choose(self, silent: bool = False): + instances = self.all_instances if silent else self._choose_country() + instances.sort(key=lambda x: x.users, reverse=True) + + if silent: + self.set_instance(instances[0]) + return + + # output the options + print("Choose your instance (input needs to be a digit):") + for i, instance in enumerate(instances): + print(f"{i}) {instance}") + + print() + + # ask for index + index = "" + while not index.isdigit() or int(index) >= len(instances): + index = input("> ").strip() + + instance = instances[int(index)] + print() + print(f"Setting the instance to {instance}") + + self.set_instance(instance) + + +class Invidious(FrontendInstance): + SETTING_NAME = "invidious_instance" + + def __init__(self) -> None: + self.connection = Connection(host="https://api.invidious.io/") + self.endpoint = "https://api.invidious.io/instances.json" + + super().__init__() + + + def _process_instance(self, all_instance_data: dict): + instance_data = all_instance_data[1] + stats = instance_data["stats"] + + if not instance_data["api"]: + return + if instance_data["type"] != "https": + return + + region = instance_data["region"] + + instance = Instance( + name=all_instance_data[0], + uri=instance_data["uri"], + regions=[Country.by_alpha_2(region)], + users=stats["usage"]["users"]["total"] + ) + + self.add_instance(instance) + + def fetch(self, silent: bool): + r = self.connection.get(self.endpoint) + if r is None: + return + + for instance in r.json(): + self._process_instance(all_instance_data=instance) + + +class Piped(FrontendInstance): + SETTING_NAME = "piped_instance" + + def __init__(self) -> None: + self.connection = Connection(host="https://raw.githubusercontent.com") + + super().__init__() + + def process_instance(self, instance_data: str): + cells = instance_data.split(" | ") + + instance = Instance( + name=cells[0].strip(), + uri=cells[1].strip(), + regions=[Country.by_emoji(flag) for flag in cells[2].split(", ")] + ) + + self.add_instance(instance) + + def fetch(self, silent: bool = False): + r = self.connection.get("https://raw.githubusercontent.com/wiki/TeamPiped/Piped-Frontend/Instances.md") + if r is None: + return + + process = False + + for line in r.content.decode("utf-8").split("\n"): + line = line.strip() + + if line != "" and process: + self.process_instance(line) + + if line.startswith("---"): + process = True + + +class FrontendSelection: + def __init__(self): + self.invidious = Invidious() + self.piped = Piped() + + def choose(self, silent: bool = False): + self.invidious.fetch(silent) + self.invidious.choose(silent) + + self.piped.fetch(silent) + self.piped.choose(silent) + + +@cli_function +def set_frontend(silent: bool = False): + shell = FrontendSelection() + shell.choose(silent=silent) + + return 0 + \ No newline at end of file diff --git a/music_kraken/development_cli/options/settings.py b/music_kraken/development_cli/options/settings.py new file mode 100644 index 0000000..3ba0ade --- /dev/null +++ b/music_kraken/development_cli/options/settings.py @@ -0,0 +1,71 @@ +from ..utils import cli_function + +from ...utils.config import config, write_config +from ...utils import exception + + +def modify_setting(_name: str, _value: str, invalid_ok: bool = True) -> bool: + try: + config.set_name_to_value(_name, _value) + except exception.config.SettingException as e: + if invalid_ok: + print(e) + return False + else: + raise e + + write_config() + return True + + +def print_settings(): + for i, attribute in enumerate(config): + print(f"{i:0>2}: {attribute.name}={attribute.value}") + + + def modify_setting_by_index(index: int) -> bool: + attribute = list(config)[index] + + print() + print(attribute) + + input__ = input(f"{attribute.name}=") + if not modify_setting(attribute.name, input__.strip()): + return modify_setting_by_index(index) + + return True + + +def modify_setting_by_index(index: int) -> bool: + attribute = list(config)[index] + + print() + print(attribute) + + input__ = input(f"{attribute.name}=") + if not modify_setting(attribute.name, input__.strip()): + return modify_setting_by_index(index) + + return True + + +@cli_function +def settings( + name: str = None, + value: str = None, +): + if name is not None and value is not None: + modify_setting(name, value, invalid_ok=True) + return + + while True: + print_settings() + + input_ = input("Id of setting to modify: ") + print() + if input_.isdigit() and int(input_) < len(config): + if modify_setting_by_index(int(input_)): + return + else: + print("Please input a valid ID.") + print() \ No newline at end of file diff --git a/music_kraken/development_cli/utils.py b/music_kraken/development_cli/utils.py new file mode 100644 index 0000000..b0fb1b8 --- /dev/null +++ b/music_kraken/development_cli/utils.py @@ -0,0 +1,51 @@ +from ..utils import BColors +from ..utils.shared import get_random_message + + +def cli_function(function): + def wrapper(*args, **kwargs): + silent = kwargs.get("no_cli", False) + if "no_cli" in kwargs: + del kwargs["no_cli"] + + if silent: + return function(*args, **kwargs) + return + + code = 0 + + print_cute_message() + print() + try: + code = function(*args, **kwargs) + except KeyboardInterrupt: + print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues") + + finally: + print() + print_cute_message() + print("See you soon! :3") + + exit() + + return wrapper + + +def print_cute_message(): + message = get_random_message() + try: + print(message) + except UnicodeEncodeError: + message = str(c for c in message if 0 < ord(c) < 127) + print(message) + + +AGREE_INPUTS = {"y", "yes", "ok"} +def ask_for_bool(msg: str) -> bool: + i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower() + return i in AGREE_INPUTS + + +def ask_for_create(name: str, value: str) -> bool: + return ask_for_bool(f"Do you want to create the {name} {BColors.OKBLUE}{value}{BColors.ENDC}?") + \ No newline at end of file