diff --git a/src/music_kraken/__init__.py b/src/music_kraken/__init__.py index f4debe2..147be6a 100644 --- a/src/music_kraken/__init__.py +++ b/src/music_kraken/__init__.py @@ -6,6 +6,7 @@ from typing import List import gc import musicbrainzngs +from .cli import Shell from . import objects, pages, download from .utils import exception, shared, path_manager from .utils.config import config, read, write, PATHS_SECTION @@ -132,138 +133,18 @@ def cli( direct_download_url: str = None, command_list: List[str] = None ): - def get_existing_genre() -> List[str]: - """ - gets the name of all subdirectories of shared.MUSIC_DIR, - but filters out all directories, where the name matches with any patern - from shared.NOT_A_GENRE_REGEX. - """ - existing_genres: List[str] = [] - - # get all subdirectories of MUSIC_DIR, not the files in the dir. - existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()] - - for subdirectory in existing_subdirectories: - name: str = subdirectory.name - - if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX): - existing_genres.append(name) - - existing_genres.sort() - - return existing_genres - - def get_genre(): - existing_genres = get_existing_genre() - for i, genre_option in enumerate(existing_genres): - print(f"{i + 1:0>2}: {genre_option}") - - while True: - genre = input("Id or new genre: ") - - if genre.isdigit(): - genre_id = int(genre) - 1 - if genre_id >= len(existing_genres): - print(f"No genre under the id {genre_id + 1}.") - continue - - return existing_genres[genre_id] - - new_genre = fit_to_file_system(genre) - - agree_inputs = {"y", "yes", "ok"} - verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower() - if verification in agree_inputs: - return new_genre - - def next_search(_search: download.Search, query: str) -> bool: - """ - :param _search: - :param query: - :return exit in the next step: - """ - nonlocal genre - nonlocal download_all - - query: str = query.strip() - parsed: str = query.lower() - - if parsed in EXIT_COMMANDS: - return True - - if parsed == ".": - return False - if parsed == "..": - _search.goto_previous() - return False - - if parsed.isdigit(): - _search.choose_index(int(parsed)) - return False - - if parsed in DOWNLOAD_COMMANDS: - r = _search.download_chosen(genre=genre, download_all=download_all) - - print() - print(r) - print() - - return not r.is_mild_failure - - url = re.match(URL_REGEX, query) - if url is not None: - if not _search.search_url(url.string): - print("The given url couldn't be found.") - return False - - page = _search.get_page_from_query(parsed) - if page is not None: - _search.choose_page(page) - return False - - # if everything else is not valid search - _search.search(query) - return False - - if genre is None: - genre = get_genre() - print() - - print_cute_message() - print() - print(f"Downloading to: \"{genre}\"") - print() - - search = download.Search() - - # directly download url - if direct_download_url is not None: - if search.search_url(direct_download_url): - r = search.download_chosen(genre=genre, download_all=download_all) - print() - print(r) - print() - else: - print(f"Sorry, could not download the url: {direct_download_url}") - - exit_message() - return - - # run one command after another from the command list + shell = Shell(genre=genre) + if command_list is not None: for command in command_list: - print(f">> {command}") - if next_search(search, command): - break - print(search) - - exit_message() + shell.process_input(command) return - # the actual cli - while True: - if next_search(search, input(">> ")): - break - print(search) - + if direct_download_url is not None: + if shell.download(direct_download_url, download_all=download_all): + exit_message() + return + + shell.mainloop() + exit_message() diff --git a/src/music_kraken/cli/__init__.py b/src/music_kraken/cli/__init__.py new file mode 100644 index 0000000..c21af81 --- /dev/null +++ b/src/music_kraken/cli/__init__.py @@ -0,0 +1 @@ +from .download.shell import Shell \ No newline at end of file diff --git a/src/music_kraken/cli/download/__init__.py b/src/music_kraken/cli/download/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/music_kraken/cli/download/shell.py b/src/music_kraken/cli/download/shell.py new file mode 100644 index 0000000..8cb627a --- /dev/null +++ b/src/music_kraken/cli/download/shell.py @@ -0,0 +1,316 @@ +from typing import Set, Type, Dict, List +from pathlib import Path +import re + +from ...utils.shared import MUSIC_DIR, NOT_A_GENRE_REGEX +from ...utils.regex import URL_PATTERN +from ...utils.string_processing import fit_to_file_system +from ...utils.support_classes import Query +from ...download.results import Results, SearchResults, Option, PageResults +from ...download.page_attributes import Pages +from ...pages import Page +from ...objects import Song, Album, Artist, DatabaseObject + + +""" +This is the implementation of the Shell + +# Behaviour + +## Searching + +```mkshell +> s: {querry or url} + +# examples +> s: https://musify.club/release/some-random-release-183028492 +> s: r: #a an Artist #r some random Release +``` + +Searches for an url, or an query + +### Query Syntax + +``` +#a {artist} #r {release} #t {track} +``` + +You can escape stuff like `#` doing this: `\#` + +## Downloading + +To download something, you either need a direct link, or you need to have already searched for options + +```mkshell +> d: {option ids or direct url} + +# examples +> d: 0, 3, 4 +> d: 1 +> d: https://musify.club/release/some-random-release-183028492 +``` + +## Misc + +### Exit + +```mkshell +> q +> quit +> exit +> abort +``` + +### Current Options + +```mkshell +> . +``` + +### Previous Options + +``` +> .. +``` + +""" + +EXIT_COMMANDS = {"q", "quit", "exit", "abort"} +ALPHABET = "abcdefghijklmnopqrstuvwxyz" +PAGE_NAME_FILL = "-" +MAX_PAGE_LEN = 21 + + +def get_existing_genre() -> List[str]: + """ + gets the name of all subdirectories of shared.MUSIC_DIR, + but filters out all directories, where the name matches with any patern + from shared.NOT_A_GENRE_REGEX. + """ + existing_genres: List[str] = [] + + # get all subdirectories of MUSIC_DIR, not the files in the dir. + existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()] + + for subdirectory in existing_subdirectories: + name: str = subdirectory.name + + if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX): + existing_genres.append(name) + + existing_genres.sort() + + return existing_genres + +def get_genre(): + existing_genres = get_existing_genre() + for i, genre_option in enumerate(existing_genres): + print(f"{i + 1:0>2}: {genre_option}") + + while True: + genre = input("Id or new genre: ") + + if genre.isdigit(): + genre_id = int(genre) - 1 + if genre_id >= len(existing_genres): + print(f"No genre under the id {genre_id + 1}.") + continue + + return existing_genres[genre_id] + + new_genre = fit_to_file_system(genre) + + agree_inputs = {"y", "yes", "ok"} + verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower() + if verification in agree_inputs: + return new_genre + + +def help_message(): + print() + print(""" +to search: +> s: {query or url} +> s: https://musify.club/release/some-random-release-183028492 +> s: #a {artist} #r {release} #t {track} + +to download: +> d: {option ids or direct url} +> d: 0, 3, 4 +> d: 1 +> d: https://musify.club/release/some-random-release-183028492 + +have fun :3 + """.strip()) + print() + + +class Shell: + """ + TODO: + + - Implement search and download for direct urls + """ + + def __init__( + self, + exclude_pages: Set[Type[Page]] = None, + exclude_shady: bool = False, + max_displayed_options: int = 10, + option_digits: int = 3, + genre: str = None + ) -> None: + self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady) + + self.page_dict: Dict[str, Type[Page]] = dict() + + self.max_displayed_options = max_displayed_options + self.option_digits: int = option_digits + + self.current_results: Results = SearchResults + + self.genre = genre or get_genre() + + print() + print(f"Downloading to: \"{self.genre}\"") + print() + + + def print_current_options(self): + self.page_dict = dict() + + page_count = 0 + for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options): + if isinstance(option, Option): + print(f"{option.index:0{self.option_digits}} {option.music_object.option_string}") + else: + prefix = ALPHABET[page_count%len(ALPHABET)] + print(f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------") + + self.page_dict[prefix] = option + self.page_dict[option.__name__] = option + + page_count += 1 + + def set_current_options(self, current_options: Results): + self.current_results = current_options + + def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query: + song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True) + album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True) + artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True) + + if song is not None: + song.album_collection.append(album) + song.main_artist_collection.append(artist) + return Query(raw_query=query, music_object=song) + + if album is not None: + album.artist_collection.append(artist) + return Query(raw_query=query, music_object=album) + + if artist is not None: + return Query(raw_query=query, music_object=artist) + + def search(self, query: str): + special_characters = "#\\" + query = query + " " + + key_text = {} + + skip_next = False + escape_next = False + new_text = "" + latest_key: str = None + for i in range(len(query) - 1): + current_char = query[i] + next_char = query[i+1] + + if skip_next: + skip_next = False + continue + + if escape_next: + new_text += current_char + escape_next = False + + # escaping + if current_char == "\\": + if next_char in special_characters: + escape_next = True + continue + + if current_char == "#": + if latest_key is not None: + key_text[latest_key] = new_text + new_text = "" + + latest_key = next_char + skip_next = True + continue + + new_text += current_char + + if latest_key is not None: + key_text[latest_key] = new_text + + + parsed_query: Query = self._process_parsed(key_text, query) + + self.set_current_options(self.pages.search(parsed_query)) + self.print_current_options() + + def goto(self, index: int): + page: Type[Page] + music_object: DatabaseObject + + try: + page, music_object = self.current_results.get_music_object_by_index(index) + except IndexError: + print() + print(f"The option {index} doesn't exist.") + print() + return + + self.pages.fetch_details(music_object) + + self.set_current_options(PageResults(page, music_object.options)) + + self.print_current_options() + + + def download(self, download_str: str, download_all: bool = False) -> bool: + return False + + def process_input(self, input_str: str) -> bool: + input_str = input_str.strip() + processed_input: str = input_str.lower() + + if processed_input in EXIT_COMMANDS: + return True + + if processed_input == ".": + self.print_current_options() + return False + + if processed_input.startswith("s: "): + self.search(input_str[3:]) + return False + + if processed_input.startswith("d: "): + return self.download(input_str[3:]) + + if processed_input.isdigit(): + self.goto(int(processed_input)) + return False + + if processed_input != "help": + print("Invalid input.") + help_message() + return False + + def mainloop(self): + while True: + if self.process_input(input("> ")): + return + \ No newline at end of file diff --git a/src/music_kraken/cli/options/__init__.py b/src/music_kraken/cli/options/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/music_kraken/download/__init__.py b/src/music_kraken/download/__init__.py index e670c65..327a571 100644 --- a/src/music_kraken/download/__init__.py +++ b/src/music_kraken/download/__init__.py @@ -1,2 +1 @@ from .search import Search -from .download import Download diff --git a/src/music_kraken/download/page_attributes.py b/src/music_kraken/download/page_attributes.py index c3ef381..bb392f8 100644 --- a/src/music_kraken/download/page_attributes.py +++ b/src/music_kraken/download/page_attributes.py @@ -40,7 +40,7 @@ class Pages: return tuple(sorted(page_set, key=lambda page: page.__name__)) self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages) - self.pages: Tuple[Type[Page], ...] = _set_to_tuple(ALL_PAGES.difference(self.pages)) + self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set) self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES) self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) diff --git a/src/music_kraken/download/results.py b/src/music_kraken/download/results.py index 4d9d72e..fa41369 100644 --- a/src/music_kraken/download/results.py +++ b/src/music_kraken/download/results.py @@ -1,4 +1,4 @@ -from typing import Tuple, Type, Dict, List, Generator +from typing import Tuple, Type, Dict, List, Generator, Union from dataclasses import dataclass from ..objects import DatabaseObject @@ -14,33 +14,35 @@ class Option: class Results: def __init__(self) -> None: - self._by_index = Dict[int, DatabaseObject] = dict() + self._by_index: Dict[int, DatabaseObject] = dict() + self._page_by_index: Dict[int: Type[Page]] = dict() - def __iter__(self) -> Generator[DatabaseObject]: + def __iter__(self) -> Generator[DatabaseObject, None, None]: for option in self.formated_generator(): if isinstance(option, Option): yield option.music_object - def formated_generator(self, max_items_per_page: int = 10) -> Generator[Type[Page], Option]: + def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]: self._by_index = dict() + self._page_by_index = dict() - def get_music_object_by_index(self, index: int) -> DatabaseObject: + def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]: # if this throws a key error, either the formated generator needs to be iterated, or the option doesn't exist. - return self._by_index[index] + return self._page_by_index[index], self._by_index[index] class SearchResults(Results): def __init__( self, - pages: Tuple[Type[Page], ...] + pages: Tuple[Type[Page], ...] = None ) -> None: - super().__init() + super().__init__() - self.pages = pages + self.pages = pages or [] # this would initialize a list for every page, which I don't think I want # self.results = Dict[Type[Page], List[DatabaseObject]] = {page: [] for page in self.pages} - self.results = Dict[Type[Page], List[DatabaseObject]] = {} + self.results: Dict[Type[Page], List[DatabaseObject]] = {} def add(self, page: Type[Page], search_result: List[DatabaseObject]): """ @@ -64,6 +66,7 @@ class SearchResults(Results): for option in self.results[page]: yield Option(i, option) self._by_index[i] = option + self._page_by_index[i] = page i += 1 j += 1 @@ -73,7 +76,7 @@ class SearchResults(Results): class PageResults(Results): def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None: - super().__init() + super().__init__() self.page: Type[Page] = page self.results: List[DatabaseObject] = results @@ -87,4 +90,5 @@ class PageResults(Results): for option in self.results: yield Option(i, option) self._by_index[i] = option + self._page_by_index[i] = self.page i += 1 diff --git a/src/music_kraken/objects/parents.py b/src/music_kraken/objects/parents.py index d68463f..20c8303 100644 --- a/src/music_kraken/objects/parents.py +++ b/src/music_kraken/objects/parents.py @@ -95,8 +95,8 @@ class DatabaseObject: return Metadata() @property - def options(self) -> Options: - return Options([self]) + def options(self) -> List["DatabaseObject"]: + return [self] @property def option_string(self) -> str: diff --git a/src/music_kraken/objects/song.py b/src/music_kraken/objects/song.py index b253539..e984376 100644 --- a/src/music_kraken/objects/song.py +++ b/src/music_kraken/objects/song.py @@ -14,7 +14,7 @@ from .metadata import ( Metadata ) from .option import Options -from .parents import MainObject +from .parents import MainObject, DatabaseObject from .source import Source, SourceCollection from .target import Target from ..utils.string_processing import unify @@ -162,7 +162,7 @@ class Song(MainObject): f"feat. Artist({OPTION_STRING_DELIMITER.join(artist.name for artist in self.feature_artist_collection)})" @property - def options(self) -> Options: + def options(self) -> List[DatabaseObject]: """ Return a list of related objects including the song object, album object, main artist objects, and feature artist objects. @@ -173,7 +173,7 @@ class Song(MainObject): options.extend(self.feature_artist_collection) options.extend(self.album_collection) options.append(self) - return Options(options) + return options @property def tracksort_str(self) -> str: @@ -309,12 +309,12 @@ class Album(MainObject): f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})" @property - def options(self) -> Options: + def options(self) -> List[DatabaseObject]: options = self.artist_collection.shallow_list options.append(self) options.extend(self.song_collection) - return Options(options) + return options def update_tracksort(self): """ @@ -600,11 +600,11 @@ class Artist(MainObject): f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})" @property - def options(self) -> Options: + def options(self) -> List[DatabaseObject]: options = [self] options.extend(self.main_album_collection) options.extend(self.feature_song_collection) - return Options(options) + return options @property def country_string(self): @@ -704,7 +704,9 @@ class Label(MainObject): ] @property - def options(self) -> Options: + def options(self) -> List[DatabaseObject]: options = [self] options.extend(self.current_artist_collection.shallow_list) options.extend(self.album_collection.shallow_list) + + return options diff --git a/src/music_kraken/pages/abstract.py b/src/music_kraken/pages/abstract.py index 2562f5e..2895cf0 100644 --- a/src/music_kraken/pages/abstract.py +++ b/src/music_kraken/pages/abstract.py @@ -166,7 +166,6 @@ class Page(): for default_query in query.default_search: for single_option in self.general_search(default_query): r.append(single_option) - self.search_result_queue.put(single_option) return r