import random import re from pathlib import Path from typing import Dict, Generator, List, Set, Type, Union from .. import console from ..download import Downloader, Page, components from ..download.results import GoToResults from ..download.results import Option as ResultOption from ..download.results import PageResults, Results from ..objects import Album, Artist, DatabaseObject, Song from ..utils import BColors, output from ..utils.config import main_settings, write_config from ..utils.enums.colors import BColors from ..utils.exception import MKInvalidInputException from ..utils.exception.download import UrlNotFoundException from ..utils.shared import HELP_MESSAGE, URL_PATTERN from ..utils.string_processing import fit_to_file_system from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.query import Query from .genre import get_genre from .options.first_config import initial_config from .utils import ask_for_bool, cli_function EXIT_COMMANDS = {"q", "quit", "exit", "abort"} ALPHABET = "abcdefghijklmnopqrstuvwxyz" PAGE_NAME_FILL = "-" MAX_PAGE_LEN = 21 def help_message(): print(HELP_MESSAGE) print() print(random.choice(main_settings["happy_messages"])) print() class CliDownloader: def __init__( self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, max_displayed_options: int = 10, option_digits: int = 3, genre: str = None, process_metadata_anyway: bool = False, ) -> None: self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady) self.page_dict: Dict[str, Type[Page]] = dict() self.max_displayed_options = max_displayed_options self.option_digits: int = option_digits self.current_results: Results = None self._result_history: List[Results] = [] self.genre = genre or get_genre() self.process_metadata_anyway = process_metadata_anyway output() output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER) output() def print_current_options(self): print() print(self.current_results.pprint()) """ self.page_dict = dict() page_count = 0 for option in self.current_results.formatted_generator(): if isinstance(option, ResultOption): r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" print(r) else: prefix = ALPHABET[page_count % len(ALPHABET)] print( f"{BColors.HEADER.value}({prefix}) --------------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}--------------------{BColors.ENDC.value}") self.page_dict[prefix] = option self.page_dict[option.__name__] = option page_count += 1 """ print() def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]): current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options) if main_settings["result_history"]: self._result_history.append(current_options) if main_settings["history_length"] != -1: if len(self._result_history) > main_settings["history_length"]: self._result_history.pop(0) self.current_results = current_options def previous_option(self) -> bool: if not main_settings["result_history"]: print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.") return False if len(self._result_history) <= 1: print(f"No results in history.") return False self._result_history.pop() self.current_results = self._result_history[-1] return True def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query: # strip all the values in key_text key_text = {key: value.strip() for key, value in key_text.items()} song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True) album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True) artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True) if song is not None: if album is not None: song.album_collection.append(album) if artist is not None: song.artist_collection.append(artist) return Query(raw_query=query, music_object=song) if album is not None: if artist is not None: album.artist_collection.append(artist) return Query(raw_query=query, music_object=album) if artist is not None: return Query(raw_query=query, music_object=artist) return Query(raw_query=query) def search(self, query: str): if re.match(URL_PATTERN, query) is not None: try: data_object = self.downloader.fetch_url(query) except UrlNotFoundException as e: print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" f"PR appreciated if the site isn't implemented.\n" f"Recommendations and suggestions on sites to implement appreciated.\n" f"But don't be a bitch if I don't end up implementing it.") return self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options)) self.print_current_options() return special_characters = "#\\" query = query + " " key_text = {} skip_next = False escape_next = False new_text = "" latest_key: str = None for i in range(len(query) - 1): current_char = query[i] next_char = query[i + 1] if skip_next: skip_next = False continue if escape_next: new_text += current_char escape_next = False # escaping if current_char == "\\": if next_char in special_characters: escape_next = True continue if current_char == "#": if latest_key is not None: key_text[latest_key] = new_text new_text = "" latest_key = next_char skip_next = True continue new_text += current_char if latest_key is not None: key_text[latest_key] = new_text parsed_query: Query = self._process_parsed(key_text, query) self.set_current_options(self.downloader.search(parsed_query)) self.print_current_options() def goto(self, data_object: Union[DatabaseObject, components.Select]): page: Type[Page] if isinstance(data_object, components.Select): self.set_current_options(data_object) else: self.downloader.fetch_details(data_object, stop_at_level=1) self.set_current_options(data_object.options) self.print_current_options() def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: output() if len(data_objects) > 1: output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n") _result_map: Dict[DatabaseObject, DownloadResult] = dict() for database_object in data_objects: r = self.downloader.download( data_object=database_object, genre=self.genre, **kwargs ) _result_map[database_object] = r for music_object, result in _result_map.items(): output() output(music_object.option_string) output(result) return True def process_input(self, input_str: str) -> bool: try: input_str = input_str.strip() processed_input: str = input_str.lower() if processed_input in EXIT_COMMANDS: return True if processed_input == ".": self.print_current_options() return False if processed_input == "..": if self.previous_option(): self.print_current_options() return False command = "" query = processed_input if ":" in processed_input: _ = processed_input.split(":") command, query = _[0], ":".join(_[1:]) do_search = "s" in command do_fetch = "f" in command do_download = "d" in command do_merge = "m" in command if do_search and (do_download or do_fetch or do_merge): raise MKInvalidInputException(message="You can't search and do another operation at the same time.") if do_search: self.search(":".join(input_str.split(":")[1:])) return False def get_selected_objects(q: str): if q.strip().lower() == "all": return list(self.current_results) indices = [] for possible_index in q.split(","): if possible_index == "": continue if possible_index not in self.current_results: raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.") yield self.current_results[possible_index] selected_objects = list(get_selected_objects(query)) if do_merge: old_selected_objects = selected_objects a = old_selected_objects[0] for b in old_selected_objects[1:]: if type(a) != type(b): raise MKInvalidInputException(message="You can't merge different types of objects.") a.merge(b) selected_objects = [a] if do_fetch: for data_object in selected_objects: self.downloader.fetch_details(data_object) self.print_current_options() return False if do_download: self.download(list(o.value for o in selected_objects)) return False if len(selected_objects) != 1: raise MKInvalidInputException(message="You can only go to one object at a time without merging.") self.goto(selected_objects[0].value) return False except MKInvalidInputException as e: output("\n" + e.message + "\n", color=BColors.FAIL) help_message() return False def mainloop(self): while True: if self.process_input(input("> ")): return @cli_function def download( genre: str = None, download_all: bool = False, direct_download_url: str = None, command_list: List[str] = None, process_metadata_anyway: bool = False, ): if main_settings["hasnt_yet_started"]: code = initial_config() if code == 0: main_settings["hasnt_yet_started"] = False write_config() print(f"{BColors.OKGREEN.value}Restart the programm to use it.{BColors.ENDC.value}") else: print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway) if command_list is not None: for command in command_list: shell.process_input(command) return if direct_download_url is not None: if shell.download(direct_download_url, download_all=download_all): return shell.mainloop()