music-kraken-core/music_kraken/cli/main_downloader.py

355 lines
12 KiB
Python

import random
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from .. import console
from ..download import Downloader, Page, components
from ..download.results import GoToResults
from ..download.results import Option as ResultOption
from ..download.results import PageResults, Results
from ..objects import Album, Artist, DatabaseObject, Song
from ..utils import BColors, output
from ..utils.config import main_settings, write_config
from ..utils.enums.colors import BColors
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .genre import get_genre
from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def help_message():
print(HELP_MESSAGE)
print()
print(random.choice(main_settings["happy_messages"]))
print()
class CliDownloader:
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
genre: str = None,
process_metadata_anyway: bool = False,
) -> None:
self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self.current_results: Results = None
self._result_history: List[Results] = []
self.genre = genre or get_genre()
self.process_metadata_anyway = process_metadata_anyway
output()
output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER)
output()
def print_current_options(self):
print()
print(self.current_results.pprint())
"""
self.page_dict = dict()
page_count = 0
for option in self.current_results.formatted_generator():
if isinstance(option, ResultOption):
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:
prefix = ALPHABET[page_count % len(ALPHABET)]
print(
f"{BColors.HEADER.value}({prefix}) --------------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}--------------------{BColors.ENDC.value}")
self.page_dict[prefix] = option
self.page_dict[option.__name__] = option
page_count += 1
"""
print()
def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]):
current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options)
if main_settings["result_history"]:
self._result_history.append(current_options)
if main_settings["history_length"] != -1:
if len(self._result_history) > main_settings["history_length"]:
self._result_history.pop(0)
self.current_results = current_options
def previous_option(self) -> bool:
if not main_settings["result_history"]:
print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.")
return False
if len(self._result_history) <= 1:
print(f"No results in history.")
return False
self._result_history.pop()
self.current_results = self._result_history[-1]
return True
def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query:
# strip all the values in key_text
key_text = {key: value.strip() for key, value in key_text.items()}
song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True)
album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True)
artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True)
if song is not None:
if album is not None:
song.album_collection.append(album)
if artist is not None:
song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
if artist is not None:
album.artist_collection.append(artist)
return Query(raw_query=query, music_object=album)
if artist is not None:
return Query(raw_query=query, music_object=artist)
return Query(raw_query=query)
def search(self, query: str):
if re.match(URL_PATTERN, query) is not None:
try:
data_object = self.downloader.fetch_url(query)
except UrlNotFoundException as e:
print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n"
f"PR appreciated if the site isn't implemented.\n"
f"Recommendations and suggestions on sites to implement appreciated.\n"
f"But don't be a bitch if I don't end up implementing it.")
return
self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options))
self.print_current_options()
return
special_characters = "#\\"
query = query + " "
key_text = {}
skip_next = False
escape_next = False
new_text = ""
latest_key: str = None
for i in range(len(query) - 1):
current_char = query[i]
next_char = query[i + 1]
if skip_next:
skip_next = False
continue
if escape_next:
new_text += current_char
escape_next = False
# escaping
if current_char == "\\":
if next_char in special_characters:
escape_next = True
continue
if current_char == "#":
if latest_key is not None:
key_text[latest_key] = new_text
new_text = ""
latest_key = next_char
skip_next = True
continue
new_text += current_char
if latest_key is not None:
key_text[latest_key] = new_text
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.downloader.search(parsed_query))
self.print_current_options()
def goto(self, data_object: Union[DatabaseObject, components.Select]):
page: Type[Page]
if isinstance(data_object, components.Select):
self.set_current_options(data_object)
else:
self.downloader.fetch_details(data_object, stop_at_level=1)
self.set_current_options(data_object.options)
self.print_current_options()
def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool:
output()
if len(data_objects) > 1:
output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects:
r = self.downloader.download(
data_object=database_object,
genre=self.genre,
**kwargs
)
_result_map[database_object] = r
for music_object, result in _result_map.items():
output()
output(music_object.option_string)
output(result)
return True
def process_input(self, input_str: str) -> bool:
try:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options()
return False
command = ""
query = processed_input
if ":" in processed_input:
_ = processed_input.split(":")
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False
def get_selected_objects(q: str):
if q.strip().lower() == "all":
return list(self.current_results)
indices = []
for possible_index in q.split(","):
if possible_index == "":
continue
if possible_index not in self.current_results:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.")
yield self.current_results[possible_index]
selected_objects = list(get_selected_objects(query))
if do_merge:
old_selected_objects = selected_objects
a = old_selected_objects[0]
for b in old_selected_objects[1:]:
if type(a) != type(b):
raise MKInvalidInputException(message="You can't merge different types of objects.")
a.merge(b)
selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.downloader.fetch_details(data_object)
self.print_current_options()
return False
if do_download:
self.download(list(o.value for o in selected_objects))
return False
if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0].value)
return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
help_message()
return False
def mainloop(self):
while True:
if self.process_input(input("> ")):
return
@cli_function
def download(
genre: str = None,
download_all: bool = False,
direct_download_url: str = None,
command_list: List[str] = None,
process_metadata_anyway: bool = False,
):
if main_settings["hasnt_yet_started"]:
code = initial_config()
if code == 0:
main_settings["hasnt_yet_started"] = False
write_config()
print(f"{BColors.OKGREEN.value}Restart the programm to use it.{BColors.ENDC.value}")
else:
print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}")
shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None:
for command in command_list:
shell.process_input(command)
return
if direct_download_url is not None:
if shell.download(direct_download_url, download_all=download_all):
return
shell.mainloop()