feat: implemented downloader

This commit is contained in:
Hazel 2024-06-19 10:04:41 +02:00
parent 097211b3cd
commit b236291378
12 changed files with 25 additions and 781 deletions

View File

@ -1,5 +0,0 @@
from .informations import print_paths
from .main_downloader import download
from .options.settings import settings
from .options.frontend import set_frontend

View File

@ -1,53 +0,0 @@
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from ..download import Downloader, Page, components
from ..utils.config import main_settings
from .utils import ask_for_bool, cli_function
class GenreIO(components.HumanIO):
@staticmethod
def ask_to_create(option: components.Option) -> bool:
output()
return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}")
@staticmethod
def not_found(key: str) -> None:
output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL)
def _genre_generator() -> Generator[str, None, None]:
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
def get_genre() -> str:
select_genre = components.Select(
human_io=GenreIO,
can_create_options=True,
data=_genre_generator(),
)
genre: Optional[components.Option[str]] = None
while genre is None:
print(select_genre.pprint())
print()
genre = select_genre.choose(input("> "))
return genre.value

View File

@ -1 +0,0 @@
from .paths import print_paths

View File

@ -1,22 +0,0 @@
from ..utils import cli_function
from ...utils.path_manager import LOCATIONS
from ...utils.config import main_settings
def all_paths():
return {
"Temp dir": main_settings["temp_directory"],
"Music dir": main_settings["music_directory"],
"Conf dir": LOCATIONS.CONFIG_DIRECTORY,
"Conf file": LOCATIONS.CONFIG_FILE,
"logging file": main_settings["log_file"],
"FFMPEG bin": main_settings["ffmpeg_binary"],
"Cache Dir": main_settings["cache_directory"],
}
@cli_function
def print_paths():
for name, path in all_paths().items():
print(f"{name}:\t{path}")

View File

@ -1,354 +0,0 @@
import random
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from .. import console
from ..download import Downloader, Page, components
from ..download.results import GoToResults
from ..download.results import Option as ResultOption
from ..download.results import PageResults, Results
from ..objects import Album, Artist, DatabaseObject, Song
from ..utils import BColors, output
from ..utils.config import main_settings, write_config
from ..utils.enums.colors import BColors
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .genre import get_genre
from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def help_message():
print(HELP_MESSAGE)
print()
print(random.choice(main_settings["happy_messages"]))
print()
class CliDownloader:
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
genre: str = None,
process_metadata_anyway: bool = False,
) -> None:
self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self.current_results: Results = None
self._result_history: List[Results] = []
self.genre = genre or get_genre()
self.process_metadata_anyway = process_metadata_anyway
output()
output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER)
output()
def print_current_options(self):
print()
print(self.current_results.pprint())
"""
self.page_dict = dict()
page_count = 0
for option in self.current_results.formatted_generator():
if isinstance(option, ResultOption):
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:
prefix = ALPHABET[page_count % len(ALPHABET)]
print(
f"{BColors.HEADER.value}({prefix}) --------------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}--------------------{BColors.ENDC.value}")
self.page_dict[prefix] = option
self.page_dict[option.__name__] = option
page_count += 1
"""
print()
def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]):
current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options)
if main_settings["result_history"]:
self._result_history.append(current_options)
if main_settings["history_length"] != -1:
if len(self._result_history) > main_settings["history_length"]:
self._result_history.pop(0)
self.current_results = current_options
def previous_option(self) -> bool:
if not main_settings["result_history"]:
print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.")
return False
if len(self._result_history) <= 1:
print(f"No results in history.")
return False
self._result_history.pop()
self.current_results = self._result_history[-1]
return True
def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query:
# strip all the values in key_text
key_text = {key: value.strip() for key, value in key_text.items()}
song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True)
album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True)
artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True)
if song is not None:
if album is not None:
song.album_collection.append(album)
if artist is not None:
song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
if artist is not None:
album.artist_collection.append(artist)
return Query(raw_query=query, music_object=album)
if artist is not None:
return Query(raw_query=query, music_object=artist)
return Query(raw_query=query)
def search(self, query: str):
if re.match(URL_PATTERN, query) is not None:
try:
data_object = self.downloader.fetch_url(query)
except UrlNotFoundException as e:
print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n"
f"PR appreciated if the site isn't implemented.\n"
f"Recommendations and suggestions on sites to implement appreciated.\n"
f"But don't be a bitch if I don't end up implementing it.")
return
self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options))
self.print_current_options()
return
special_characters = "#\\"
query = query + " "
key_text = {}
skip_next = False
escape_next = False
new_text = ""
latest_key: str = None
for i in range(len(query) - 1):
current_char = query[i]
next_char = query[i + 1]
if skip_next:
skip_next = False
continue
if escape_next:
new_text += current_char
escape_next = False
# escaping
if current_char == "\\":
if next_char in special_characters:
escape_next = True
continue
if current_char == "#":
if latest_key is not None:
key_text[latest_key] = new_text
new_text = ""
latest_key = next_char
skip_next = True
continue
new_text += current_char
if latest_key is not None:
key_text[latest_key] = new_text
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.downloader.search(parsed_query))
self.print_current_options()
def goto(self, data_object: Union[DatabaseObject, components.Select]):
page: Type[Page]
if isinstance(data_object, components.Select):
self.set_current_options(data_object)
else:
self.downloader.fetch_details(data_object, stop_at_level=1)
self.set_current_options(data_object.options)
self.print_current_options()
def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool:
output()
if len(data_objects) > 1:
output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects:
r = self.downloader.download(
data_object=database_object,
genre=self.genre,
**kwargs
)
_result_map[database_object] = r
for music_object, result in _result_map.items():
output()
output(music_object.option_string)
output(result)
return True
def process_input(self, input_str: str) -> bool:
try:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options()
return False
command = ""
query = processed_input
if ":" in processed_input:
_ = processed_input.split(":")
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False
def get_selected_objects(q: str):
if q.strip().lower() == "all":
return list(self.current_results)
indices = []
for possible_index in q.split(","):
if possible_index == "":
continue
if possible_index not in self.current_results:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.")
yield self.current_results[possible_index]
selected_objects = list(get_selected_objects(query))
if do_merge:
old_selected_objects = selected_objects
a = old_selected_objects[0]
for b in old_selected_objects[1:]:
if type(a) != type(b):
raise MKInvalidInputException(message="You can't merge different types of objects.")
a.merge(b)
selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.downloader.fetch_details(data_object)
self.print_current_options()
return False
if do_download:
self.download(list(o.value for o in selected_objects))
return False
if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0].value)
return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
help_message()
return False
def mainloop(self):
while True:
if self.process_input(input("> ")):
return
@cli_function
def download(
genre: str = None,
download_all: bool = False,
direct_download_url: str = None,
command_list: List[str] = None,
process_metadata_anyway: bool = False,
):
if main_settings["hasnt_yet_started"]:
code = initial_config()
if code == 0:
main_settings["hasnt_yet_started"] = False
write_config()
print(f"{BColors.OKGREEN.value}Restart the programm to use it.{BColors.ENDC.value}")
else:
print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}")
shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None:
for command in command_list:
shell.process_input(command)
return
if direct_download_url is not None:
if shell.download(direct_download_url, download_all=download_all):
return
shell.mainloop()

View File

@ -1,26 +0,0 @@
from logging import getLogger
from ..utils import cli_function
from ...connection.cache import Cache
@cli_function
def clear_cache():
"""
Deletes the cache.
:return:
"""
Cache("main", getLogger("cache")).clear()
print("Cleared cache")
@cli_function
def clean_cache():
"""
Deletes the outdated cache. (all expired cached files, and not indexed files)
:return:
"""
Cache("main", getLogger("cache")).clean()
print("Cleaned cache")

View File

@ -1,6 +0,0 @@
from .frontend import set_frontend
def initial_config():
code = set_frontend(no_cli=True)
return code

View File

@ -1,196 +0,0 @@
from typing import Dict, List
from dataclasses import dataclass
from collections import defaultdict
from urllib.parse import urlparse
from ..utils import cli_function
from ...objects import Country
from ...utils import config, write_config
from ...utils.config import youtube_settings
from ...connection import Connection
@dataclass
class Instance:
"""
Attributes which influence the quality of an instance:
- users
"""
name: str
uri: str
regions: List[Country]
users: int = 0
def __str__(self) -> str:
return f"{self.name} with {self.users} users."
class FrontendInstance:
SETTING_NAME = "placeholder"
def __init__(self) -> None:
self.region_instances: Dict[Country, List[Instance]] = defaultdict(list)
self.all_instances: List[Instance] = []
def add_instance(self, instance: Instance):
self.all_instances.append(instance)
youtube_lists = youtube_settings["youtube_url"]
existing_netlocs = set(tuple(url.netloc for url in youtube_lists))
parsed_instance = urlparse(instance.uri)
instance_netloc = parsed_instance.netloc
if instance_netloc not in existing_netlocs:
youtube_lists.append(parsed_instance)
youtube_settings.__setitem__("youtube_url", youtube_lists, is_parsed=True)
for region in instance.regions:
self.region_instances[region].append(instance)
def fetch(self, silent: bool = False):
if not silent:
print(f"Downloading {type(self).__name__} instances...")
def set_instance(self, instance: Instance):
youtube_settings.__setitem__(self.SETTING_NAME, instance.uri)
def _choose_country(self) -> List[Instance]:
print("Input the country code, an example would be \"US\"")
print('\n'.join(f'{region.name} ({region.alpha_2})' for region in self.region_instances))
print()
available_instances = set(i.alpha_2 for i in self.region_instances)
chosen_region = ""
while chosen_region not in available_instances:
chosen_region = input("nearest country: ").strip().upper()
return self.region_instances[Country.by_alpha_2(chosen_region)]
def choose(self, silent: bool = False):
instances = self.all_instances if silent else self._choose_country()
instances.sort(key=lambda x: x.users, reverse=True)
if silent:
self.set_instance(instances[0])
return
# output the options
print("Choose your instance (input needs to be a digit):")
for i, instance in enumerate(instances):
print(f"{i}) {instance}")
print()
# ask for index
index = ""
while not index.isdigit() or int(index) >= len(instances):
index = input("> ").strip()
instance = instances[int(index)]
print()
print(f"Setting the instance to {instance}")
self.set_instance(instance)
class Invidious(FrontendInstance):
SETTING_NAME = "invidious_instance"
def __init__(self) -> None:
self.connection = Connection(host="https://api.invidious.io/")
self.endpoint = "https://api.invidious.io/instances.json"
super().__init__()
def _process_instance(self, all_instance_data: dict):
instance_data = all_instance_data[1]
stats = instance_data["stats"]
if not instance_data["api"]:
return
if instance_data["type"] != "https":
return
region = instance_data["region"]
instance = Instance(
name=all_instance_data[0],
uri=instance_data["uri"],
regions=[Country.by_alpha_2(region)],
users=stats["usage"]["users"]["total"]
)
self.add_instance(instance)
def fetch(self, silent: bool):
r = self.connection.get(self.endpoint)
if r is None:
return
for instance in r.json():
self._process_instance(all_instance_data=instance)
class Piped(FrontendInstance):
SETTING_NAME = "piped_instance"
def __init__(self) -> None:
self.connection = Connection(host="https://raw.githubusercontent.com")
super().__init__()
def process_instance(self, instance_data: str):
cells = instance_data.split(" | ")
instance = Instance(
name=cells[0].strip(),
uri=cells[1].strip(),
regions=[Country.by_emoji(flag) for flag in cells[2].split(", ")]
)
self.add_instance(instance)
def fetch(self, silent: bool = False):
r = self.connection.get("https://raw.githubusercontent.com/wiki/TeamPiped/Piped-Frontend/Instances.md")
if r is None:
return
process = False
for line in r.content.decode("utf-8").split("\n"):
line = line.strip()
if line != "" and process:
self.process_instance(line)
if line.startswith("---"):
process = True
class FrontendSelection:
def __init__(self):
self.invidious = Invidious()
self.piped = Piped()
def choose(self, silent: bool = False):
self.invidious.fetch(silent)
self.invidious.choose(silent)
self.piped.fetch(silent)
self.piped.choose(silent)
@cli_function
def set_frontend(silent: bool = False):
shell = FrontendSelection()
shell.choose(silent=silent)
return 0

View File

@ -1,71 +0,0 @@
from ..utils import cli_function
from ...utils.config import config, write_config
from ...utils import exception
def modify_setting(_name: str, _value: str, invalid_ok: bool = True) -> bool:
try:
config.set_name_to_value(_name, _value)
except exception.config.SettingException as e:
if invalid_ok:
print(e)
return False
else:
raise e
write_config()
return True
def print_settings():
for i, attribute in enumerate(config):
print(f"{i:0>2}: {attribute.name}={attribute.value}")
def modify_setting_by_index(index: int) -> bool:
attribute = list(config)[index]
print()
print(attribute)
input__ = input(f"{attribute.name}=")
if not modify_setting(attribute.name, input__.strip()):
return modify_setting_by_index(index)
return True
def modify_setting_by_index(index: int) -> bool:
attribute = list(config)[index]
print()
print(attribute)
input__ = input(f"{attribute.name}=")
if not modify_setting(attribute.name, input__.strip()):
return modify_setting_by_index(index)
return True
@cli_function
def settings(
name: str = None,
value: str = None,
):
if name is not None and value is not None:
modify_setting(name, value, invalid_ok=True)
return
while True:
print_settings()
input_ = input("Id of setting to modify: ")
print()
if input_.isdigit() and int(input_) < len(config):
if modify_setting_by_index(int(input_)):
return
else:
print("Please input a valid ID.")
print()

View File

@ -1,47 +0,0 @@
from ..utils import BColors
from ..utils.shared import get_random_message
def cli_function(function):
def wrapper(*args, **kwargs):
silent = kwargs.get("no_cli", False)
if "no_cli" in kwargs:
del kwargs["no_cli"]
if silent:
return function(*args, **kwargs)
return
code = 0
print_cute_message()
print()
try:
code = function(*args, **kwargs)
except KeyboardInterrupt:
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
finally:
print()
print_cute_message()
print("See you soon! :3")
exit()
return wrapper
def print_cute_message():
message = get_random_message()
try:
print(message)
except UnicodeEncodeError:
message = str(c for c in message if 0 < ord(c) < 127)
print(message)
AGREE_INPUTS = {"y", "yes", "ok"}
def ask_for_bool(msg: str) -> bool:
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
return i in AGREE_INPUTS

View File

@ -415,6 +415,31 @@ class Downloader:
return source.page.fetch_object_from_source(source=source, **kwargs)
# misc function
def get_existing_genres(self) -> Generator[str, None, None]:
"""Yields every existing genre, for the user to select from.
Yields:
Generator[str, None, None]: a generator that yields every existing genre.
"""
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
class Page:
REGISTER = True