started reimplementing the shell

This commit is contained in:
Hellow2 2023-06-12 14:56:14 +02:00
parent 58dbb5a9c7
commit 4cf902b05b
11 changed files with 356 additions and 154 deletions

View File

@ -6,6 +6,7 @@ from typing import List
import gc
import musicbrainzngs
from .cli import Shell
from . import objects, pages, download
from .utils import exception, shared, path_manager
from .utils.config import config, read, write, PATHS_SECTION
@ -132,138 +133,18 @@ def cli(
direct_download_url: str = None,
command_list: List[str] = None
):
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def next_search(_search: download.Search, query: str) -> bool:
"""
:param _search:
:param query:
:return exit in the next step:
"""
nonlocal genre
nonlocal download_all
query: str = query.strip()
parsed: str = query.lower()
if parsed in EXIT_COMMANDS:
return True
if parsed == ".":
return False
if parsed == "..":
_search.goto_previous()
return False
if parsed.isdigit():
_search.choose_index(int(parsed))
return False
if parsed in DOWNLOAD_COMMANDS:
r = _search.download_chosen(genre=genre, download_all=download_all)
print()
print(r)
print()
return not r.is_mild_failure
url = re.match(URL_REGEX, query)
if url is not None:
if not _search.search_url(url.string):
print("The given url couldn't be found.")
return False
page = _search.get_page_from_query(parsed)
if page is not None:
_search.choose_page(page)
return False
# if everything else is not valid search
_search.search(query)
return False
if genre is None:
genre = get_genre()
print()
print_cute_message()
print()
print(f"Downloading to: \"{genre}\"")
print()
search = download.Search()
# directly download url
if direct_download_url is not None:
if search.search_url(direct_download_url):
r = search.download_chosen(genre=genre, download_all=download_all)
print()
print(r)
print()
else:
print(f"Sorry, could not download the url: {direct_download_url}")
exit_message()
return
# run one command after another from the command list
shell = Shell(genre=genre)
if command_list is not None:
for command in command_list:
print(f">> {command}")
if next_search(search, command):
break
print(search)
exit_message()
shell.process_input(command)
return
# the actual cli
while True:
if next_search(search, input(">> ")):
break
print(search)
if direct_download_url is not None:
if shell.download(direct_download_url, download_all=download_all):
exit_message()
return
shell.mainloop()
exit_message()

View File

@ -0,0 +1 @@
from .download.shell import Shell

View File

@ -0,0 +1,316 @@
from typing import Set, Type, Dict, List
from pathlib import Path
import re
from ...utils.shared import MUSIC_DIR, NOT_A_GENRE_REGEX
from ...utils.regex import URL_PATTERN
from ...utils.string_processing import fit_to_file_system
from ...utils.support_classes import Query
from ...download.results import Results, SearchResults, Option, PageResults
from ...download.page_attributes import Pages
from ...pages import Page
from ...objects import Song, Album, Artist, DatabaseObject
"""
This is the implementation of the Shell
# Behaviour
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```
"""
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def help_message():
print()
print("""
to search:
> s: {query or url}
> s: https://musify.club/release/some-random-release-183028492
> s: #a {artist} #r {release} #t {track}
to download:
> d: {option ids or direct url}
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
have fun :3
""".strip())
print()
class Shell:
"""
TODO:
- Implement search and download for direct urls
"""
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
genre: str = None
) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self.current_results: Results = SearchResults
self.genre = genre or get_genre()
print()
print(f"Downloading to: \"{self.genre}\"")
print()
def print_current_options(self):
self.page_dict = dict()
page_count = 0
for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options):
if isinstance(option, Option):
print(f"{option.index:0{self.option_digits}} {option.music_object.option_string}")
else:
prefix = ALPHABET[page_count%len(ALPHABET)]
print(f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------")
self.page_dict[prefix] = option
self.page_dict[option.__name__] = option
page_count += 1
def set_current_options(self, current_options: Results):
self.current_results = current_options
def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query:
song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True)
album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True)
artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True)
if song is not None:
song.album_collection.append(album)
song.main_artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
album.artist_collection.append(artist)
return Query(raw_query=query, music_object=album)
if artist is not None:
return Query(raw_query=query, music_object=artist)
def search(self, query: str):
special_characters = "#\\"
query = query + " "
key_text = {}
skip_next = False
escape_next = False
new_text = ""
latest_key: str = None
for i in range(len(query) - 1):
current_char = query[i]
next_char = query[i+1]
if skip_next:
skip_next = False
continue
if escape_next:
new_text += current_char
escape_next = False
# escaping
if current_char == "\\":
if next_char in special_characters:
escape_next = True
continue
if current_char == "#":
if latest_key is not None:
key_text[latest_key] = new_text
new_text = ""
latest_key = next_char
skip_next = True
continue
new_text += current_char
if latest_key is not None:
key_text[latest_key] = new_text
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.pages.search(parsed_query))
self.print_current_options()
def goto(self, index: int):
page: Type[Page]
music_object: DatabaseObject
try:
page, music_object = self.current_results.get_music_object_by_index(index)
except IndexError:
print()
print(f"The option {index} doesn't exist.")
print()
return
self.pages.fetch_details(music_object)
self.set_current_options(PageResults(page, music_object.options))
self.print_current_options()
def download(self, download_str: str, download_all: bool = False) -> bool:
return False
def process_input(self, input_str: str) -> bool:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input.startswith("s: "):
self.search(input_str[3:])
return False
if processed_input.startswith("d: "):
return self.download(input_str[3:])
if processed_input.isdigit():
self.goto(int(processed_input))
return False
if processed_input != "help":
print("Invalid input.")
help_message()
return False
def mainloop(self):
while True:
if self.process_input(input("> ")):
return

View File

View File

@ -1,2 +1 @@
from .search import Search
from .download import Download

View File

@ -40,7 +40,7 @@ class Pages:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(ALL_PAGES.difference(self.pages))
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)

View File

@ -1,4 +1,4 @@
from typing import Tuple, Type, Dict, List, Generator
from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass
from ..objects import DatabaseObject
@ -14,33 +14,35 @@ class Option:
class Results:
def __init__(self) -> None:
self._by_index = Dict[int, DatabaseObject] = dict()
self._by_index: Dict[int, DatabaseObject] = dict()
self._page_by_index: Dict[int: Type[Page]] = dict()
def __iter__(self) -> Generator[DatabaseObject]:
def __iter__(self) -> Generator[DatabaseObject, None, None]:
for option in self.formated_generator():
if isinstance(option, Option):
yield option.music_object
def formated_generator(self, max_items_per_page: int = 10) -> Generator[Type[Page], Option]:
def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]:
self._by_index = dict()
self._page_by_index = dict()
def get_music_object_by_index(self, index: int) -> DatabaseObject:
def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]:
# if this throws a key error, either the formated generator needs to be iterated, or the option doesn't exist.
return self._by_index[index]
return self._page_by_index[index], self._by_index[index]
class SearchResults(Results):
def __init__(
self,
pages: Tuple[Type[Page], ...]
pages: Tuple[Type[Page], ...] = None
) -> None:
super().__init()
super().__init__()
self.pages = pages
self.pages = pages or []
# this would initialize a list for every page, which I don't think I want
# self.results = Dict[Type[Page], List[DatabaseObject]] = {page: [] for page in self.pages}
self.results = Dict[Type[Page], List[DatabaseObject]] = {}
self.results: Dict[Type[Page], List[DatabaseObject]] = {}
def add(self, page: Type[Page], search_result: List[DatabaseObject]):
"""
@ -64,6 +66,7 @@ class SearchResults(Results):
for option in self.results[page]:
yield Option(i, option)
self._by_index[i] = option
self._page_by_index[i] = page
i += 1
j += 1
@ -73,7 +76,7 @@ class SearchResults(Results):
class PageResults(Results):
def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None:
super().__init()
super().__init__()
self.page: Type[Page] = page
self.results: List[DatabaseObject] = results
@ -87,4 +90,5 @@ class PageResults(Results):
for option in self.results:
yield Option(i, option)
self._by_index[i] = option
self._page_by_index[i] = self.page
i += 1

View File

@ -95,8 +95,8 @@ class DatabaseObject:
return Metadata()
@property
def options(self) -> Options:
return Options([self])
def options(self) -> List["DatabaseObject"]:
return [self]
@property
def option_string(self) -> str:

View File

@ -14,7 +14,7 @@ from .metadata import (
Metadata
)
from .option import Options
from .parents import MainObject
from .parents import MainObject, DatabaseObject
from .source import Source, SourceCollection
from .target import Target
from ..utils.string_processing import unify
@ -162,7 +162,7 @@ class Song(MainObject):
f"feat. Artist({OPTION_STRING_DELIMITER.join(artist.name for artist in self.feature_artist_collection)})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
"""
Return a list of related objects including the song object, album object, main artist objects, and
feature artist objects.
@ -173,7 +173,7 @@ class Song(MainObject):
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return Options(options)
return options
@property
def tracksort_str(self) -> str:
@ -309,12 +309,12 @@ class Album(MainObject):
f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = self.artist_collection.shallow_list
options.append(self)
options.extend(self.song_collection)
return Options(options)
return options
def update_tracksort(self):
"""
@ -600,11 +600,11 @@ class Artist(MainObject):
f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = [self]
options.extend(self.main_album_collection)
options.extend(self.feature_song_collection)
return Options(options)
return options
@property
def country_string(self):
@ -704,7 +704,9 @@ class Label(MainObject):
]
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = [self]
options.extend(self.current_artist_collection.shallow_list)
options.extend(self.album_collection.shallow_list)
return options

View File

@ -166,7 +166,6 @@ class Page():
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
self.search_result_queue.put(single_option)
return r