Merge remote-tracking branch 'origin/youtube' into youtube

This commit is contained in:
Hellow 2023-06-12 17:46:38 +02:00
commit 308e34a91c
16 changed files with 549 additions and 227 deletions

57
documentation/shell.md Normal file
View File

@ -0,0 +1,57 @@
# Shell
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```

View File

@ -6,6 +6,7 @@ from typing import List
import gc
import musicbrainzngs
from .cli import Shell
from . import objects, pages, download
from .utils import exception, shared, path_manager
from .utils.config import config, read, write, PATHS_SECTION
@ -132,138 +133,18 @@ def cli(
direct_download_url: str = None,
command_list: List[str] = None
):
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def next_search(_search: download.Search, query: str) -> bool:
"""
:param _search:
:param query:
:return exit in the next step:
"""
nonlocal genre
nonlocal download_all
query: str = query.strip()
parsed: str = query.lower()
if parsed in EXIT_COMMANDS:
return True
if parsed == ".":
return False
if parsed == "..":
_search.goto_previous()
return False
if parsed.isdigit():
_search.choose_index(int(parsed))
return False
if parsed in DOWNLOAD_COMMANDS:
r = _search.download_chosen(genre=genre, download_all=download_all)
print()
print(r)
print()
return not r.is_mild_failure
url = re.match(URL_REGEX, query)
if url is not None:
if not _search.search_url(url.string):
print("The given url couldn't be found.")
return False
page = _search.get_page_from_query(parsed)
if page is not None:
_search.choose_page(page)
return False
# if everything else is not valid search
_search.search(query)
return False
if genre is None:
genre = get_genre()
print()
print_cute_message()
print()
print(f"Downloading to: \"{genre}\"")
print()
search = download.Search()
# directly download url
if direct_download_url is not None:
if search.search_url(direct_download_url):
r = search.download_chosen(genre=genre, download_all=download_all)
print()
print(r)
print()
else:
print(f"Sorry, could not download the url: {direct_download_url}")
exit_message()
return
# run one command after another from the command list
shell = Shell(genre=genre)
if command_list is not None:
for command in command_list:
print(f">> {command}")
if next_search(search, command):
break
print(search)
exit_message()
shell.process_input(command)
return
# the actual cli
while True:
if next_search(search, input(">> ")):
break
print(search)
if direct_download_url is not None:
if shell.download(direct_download_url, download_all=download_all):
exit_message()
return
shell.mainloop()
exit_message()

View File

@ -0,0 +1 @@
from .download.shell import Shell

View File

@ -0,0 +1,358 @@
from typing import Set, Type, Dict, List
from pathlib import Path
import re
from ...utils.shared import MUSIC_DIR, NOT_A_GENRE_REGEX
from ...utils.regex import URL_PATTERN
from ...utils.string_processing import fit_to_file_system
from ...utils.support_classes import Query, DownloadResult
from ...download.results import Results, SearchResults, Option, PageResults
from ...download.page_attributes import Pages
from ...pages import Page
from ...objects import Song, Album, Artist, DatabaseObject
"""
This is the implementation of the Shell
# Behaviour
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```
"""
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in MUSIC_DIR.iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in NOT_A_GENRE_REGEX):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def help_message():
print()
print("""
to search:
> s: {query or url}
> s: https://musify.club/release/some-random-release-183028492
> s: #a {artist} #r {release} #t {track}
to download:
> d: {option ids or direct url}
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
have fun :3
""".strip())
print()
class Shell:
"""
TODO:
- Implement search and download for direct urls
"""
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
genre: str = None
) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self.current_results: Results = SearchResults
self.genre = genre or get_genre()
print()
print(f"Downloading to: \"{self.genre}\"")
print()
def print_current_options(self):
self.page_dict = dict()
page_count = 0
for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options):
if isinstance(option, Option):
print(f"{option.index:0{self.option_digits}} {option.music_object.option_string}")
else:
prefix = ALPHABET[page_count%len(ALPHABET)]
print(f"({prefix}) ------------------------{option.__name__:{PAGE_NAME_FILL}<{MAX_PAGE_LEN}}------------")
self.page_dict[prefix] = option
self.page_dict[option.__name__] = option
page_count += 1
def set_current_options(self, current_options: Results):
self.current_results = current_options
def _process_parsed(self, key_text: Dict[str, str], query: str) -> Query:
song = None if not "t" in key_text else Song(title=key_text["t"], dynamic=True)
album = None if not "r" in key_text else Album(title=key_text["r"], dynamic=True)
artist = None if not "a" in key_text else Artist(name=key_text["a"], dynamic=True)
if song is not None:
song.album_collection.append(album)
song.main_artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
album.artist_collection.append(artist)
return Query(raw_query=query, music_object=album)
if artist is not None:
return Query(raw_query=query, music_object=artist)
return Query(raw_query=query)
def search(self, query: str):
if re.match(URL_PATTERN, query) is not None:
self.set_current_options(*self.pages.fetch_url(re.match(URL_PATTERN, query)))
self.print_current_options()
return
special_characters = "#\\"
query = query + " "
key_text = {}
skip_next = False
escape_next = False
new_text = ""
latest_key: str = None
for i in range(len(query) - 1):
current_char = query[i]
next_char = query[i+1]
if skip_next:
skip_next = False
continue
if escape_next:
new_text += current_char
escape_next = False
# escaping
if current_char == "\\":
if next_char in special_characters:
escape_next = True
continue
if current_char == "#":
if latest_key is not None:
key_text[latest_key] = new_text
new_text = ""
latest_key = next_char
skip_next = True
continue
new_text += current_char
if latest_key is not None:
key_text[latest_key] = new_text
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.pages.search(parsed_query))
self.print_current_options()
def goto(self, index: int):
page: Type[Page]
music_object: DatabaseObject
try:
page, music_object = self.current_results.get_music_object_by_index(index)
except KeyError:
print()
print(f"The option {index} doesn't exist.")
print()
return
self.pages.fetch_details(music_object)
self.set_current_options(PageResults(page, music_object.options))
self.print_current_options()
def download(self, download_str: str, download_all: bool = False) -> bool:
to_download: List[DatabaseObject] = []
if re.match(URL_PATTERN, download_str) is not None:
_, music_objects = self.pages.fetch_url(re.match(URL_PATTERN, download_str))
to_download.append(music_objects)
else:
index: str
for index in download_str.split(", "):
if not index.strip().isdigit():
print()
print(f"Every download thingie has to be an index, not {index}.")
print()
return False
for index in download_str.split(", "):
to_download.append(self.current_results.get_music_object_by_index(int(index))[1])
print()
print("Downloading:")
for download_object in to_download:
print(download_object.option_string)
print()
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in to_download:
r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all)
_result_map[database_object] = r
for music_object, result in _result_map.items():
print()
print(music_object.option_string)
print(result)
return False
def process_input(self, input_str: str) -> bool:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input.startswith("s: "):
self.search(input_str[3:])
return False
if processed_input.startswith("d: "):
return self.download(input_str[3:])
if processed_input.isdigit():
self.goto(int(processed_input))
return False
if processed_input != "help":
print("Invalid input.")
help_message()
return False
def mainloop(self):
while True:
if self.process_input(input("> ")):
return

View File

View File

@ -131,6 +131,7 @@ class Connection:
accepted_response_code=accepted_response_code,
url=url,
timeout=timeout,
headers=headers,
**kwargs
)

View File

@ -1,2 +1 @@
from .search import Search
from .download import Download

View File

@ -1,48 +0,0 @@
from typing import Optional, Tuple, Type, Set, Union, List
from . import page_attributes
from ..pages import Page
from ..objects import Song, Album, Artist, Label, Source
MusicObject = Union[Song, Album, Artist, Label]
class Download:
def __init__(
self,
pages: Tuple[Page] = page_attributes.ALL_PAGES,
exclude_pages=None,
exclude_shady: bool = False,
) -> None:
if exclude_pages is None:
exclude_pages = set()
_page_list: List[Page] = []
_audio_page_list: List[Page] = []
for page in pages:
if exclude_shady and page in page_attributes.SHADY_PAGES:
continue
if page in exclude_pages:
continue
_page_list.append(page)
if page in page_attributes.AUDIO_PAGES:
_audio_page_list.append(page)
self.pages: Tuple[Page] = tuple(_page_list)
self.audio_pages: Tuple[Page] = tuple(_audio_page_list)
def fetch_details(self, music_object: MusicObject) -> MusicObject:
for page in self.pages:
page.fetch_details(music_object=music_object)
return music_object
def fetch_source(self, source: Source) -> Optional[MusicObject]:
source_page = page_attributes.SOURCE_PAGE_MAP[source.page_enum]
if source_page not in self.pages:
return
return source_page.fetch_object_from_source(source)

View File

@ -1,9 +1,10 @@
from typing import Tuple, Type, Dict, List, Set
from .results import SearchResults
from ..objects import DatabaseObject
from ..objects import DatabaseObject, Source
from ..utils.enums.source import SourcePages
from ..utils.support_classes import Query, DownloadResult
from ..utils.exception.download import UrlNotFoundException
from ..pages import Page, EncyclopaediaMetallum, Musify, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
@ -39,7 +40,7 @@ class Pages:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(ALL_PAGES.difference(self.pages))
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
@ -79,19 +80,16 @@ class Pages:
audio_pages = self._audio_pages_set.intersection(_page_types)
for download_page in audio_pages:
return self._page_instances[download_page].download(genre=genre, download_all=download_all)
return self._page_instances[download_page].download(music_object=music_object, genre=genre, download_all=download_all)
return DownloadResult(error_message=f"No audio source has been found for {music_object}.")
"""
# this needs to be case-insensitive
SHORTHANDS = ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z')
for i, page in enumerate(ALL_PAGES):
NAME_PAGE_MAP[type(page).__name__.lower()] = page
NAME_PAGE_MAP[SHORTHANDS[i].lower()] = page
PAGE_NAME_MAP[type(page)] = SHORTHANDS[i]
SOURCE_PAGE_MAP[page.SOURCE_TYPE] = page
"""
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DatabaseObject]:
source = Source.match_url(url, SourcePages.MANUAL)
if source is None:
raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.page_enum]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@ -1,25 +1,94 @@
from typing import Tuple, Type, Dict, List
from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass
from ..objects import DatabaseObject
from ..utils.enums.source import SourcePages
from ..pages import Page, EncyclopaediaMetallum, Musify
class SearchResults:
@dataclass
class Option:
index: int
music_object: DatabaseObject
class Results:
def __init__(self) -> None:
self._by_index: Dict[int, DatabaseObject] = dict()
self._page_by_index: Dict[int: Type[Page]] = dict()
def __iter__(self) -> Generator[DatabaseObject, None, None]:
for option in self.formated_generator():
if isinstance(option, Option):
yield option.music_object
def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]:
self._by_index = dict()
self._page_by_index = dict()
def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]:
# if this throws a key error, either the formated generator needs to be iterated, or the option doesn't exist.
return self._page_by_index[index], self._by_index[index]
class SearchResults(Results):
def __init__(
self,
pages: Tuple[Type[Page], ...]
pages: Tuple[Type[Page], ...] = None
) -> None:
self.pages = pages
super().__init__()
self.pages = pages or []
# this would initialize a list for every page, which I don't think I want
# self.results = Dict[Type[Page], List[DatabaseObject]] = {page: [] for page in self.pages}
self.results = Dict[Type[Page], List[DatabaseObject]] = {}
self.results: Dict[Type[Page], List[DatabaseObject]] = {}
def add(self, page: Type[Page], search_result: List[DatabaseObject]):
self.results[page] = search_result
"""
adds a list of found music objects to the according page
WARNING: if a page already has search results, they are just gonna be overwritten
"""
def __str__(self) -> str:
for page in self.pages:
if page not in self.results:
continue
self.results[page] = search_result
def get_page_results(self, page: Type[Page]) -> "PageResults":
return PageResults(page, self.results.get(page, []))
def formated_generator(self, max_items_per_page: int = 10):
super().formated_generator()
i = 0
for page in self.results:
yield page
j = 0
for option in self.results[page]:
yield Option(i, option)
self._by_index[i] = option
self._page_by_index[i] = page
i += 1
j += 1
if j >= max_items_per_page:
break
class PageResults(Results):
def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None:
super().__init__()
self.page: Type[Page] = page
self.results: List[DatabaseObject] = results
def formated_generator(self, max_items_per_page: int = 10):
super().formated_generator()
i = 0
yield self.page
for option in self.results:
yield Option(i, option)
self._by_index[i] = option
self._page_by_index[i] = self.page
i += 1

View File

@ -1,7 +1,6 @@
from typing import Tuple, List, Set, Type, Optional, Dict
from . import page_attributes
from .download import Download
from .page_attributes import Pages
from .multiple_options import MultiPageOptions
from ..pages.abstract import Page
from ..utils.support_classes import DownloadResult, Query
@ -9,20 +8,15 @@ from ..objects import DatabaseObject, Source, Artist, Song, Album
from ..utils.enums.source import SourcePages
class Search(Download):
class Search:
def __init__(
self,
pages: Tuple[Type[Page]] = page_attributes.ALL_PAGES,
exclude_pages: Set[Type[Page]] = set(),
exclude_pages: Set[Type[Page]] = None,
exclude_shady: bool = False,
max_displayed_options: int = 10,
option_digits: int = 3,
) -> None:
super().__init__(
pages=pages,
exclude_pages=exclude_pages,
exclude_shady=exclude_shady
)
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits

View File

@ -95,8 +95,8 @@ class DatabaseObject:
return Metadata()
@property
def options(self) -> Options:
return Options([self])
def options(self) -> List["DatabaseObject"]:
return [self]
@property
def option_string(self) -> str:

View File

@ -14,7 +14,7 @@ from .metadata import (
Metadata
)
from .option import Options
from .parents import MainObject
from .parents import MainObject, DatabaseObject
from .source import Source, SourceCollection
from .target import Target
from ..utils.string_processing import unify
@ -162,7 +162,7 @@ class Song(MainObject):
f"feat. Artist({OPTION_STRING_DELIMITER.join(artist.name for artist in self.feature_artist_collection)})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
"""
Return a list of related objects including the song object, album object, main artist objects, and
feature artist objects.
@ -173,7 +173,7 @@ class Song(MainObject):
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return Options(options)
return options
@property
def tracksort_str(self) -> str:
@ -309,12 +309,12 @@ class Album(MainObject):
f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = self.artist_collection.shallow_list
options.append(self)
options.extend(self.song_collection)
return Options(options)
return options
def update_tracksort(self):
"""
@ -600,11 +600,11 @@ class Artist(MainObject):
f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})"
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = [self]
options.extend(self.main_album_collection)
options.extend(self.feature_song_collection)
return Options(options)
return options
@property
def country_string(self):
@ -704,7 +704,9 @@ class Label(MainObject):
]
@property
def options(self) -> Options:
def options(self) -> List[DatabaseObject]:
options = [self]
options.extend(self.current_artist_collection.shallow_list)
options.extend(self.album_collection.shallow_list)
return options

View File

@ -166,7 +166,6 @@ class Page():
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
self.search_result_queue.put(single_option)
return r
@ -269,7 +268,7 @@ class Page():
nonlocal naming_objects
for collection_name in naming_music_object.UPWARDS_COLLECTION_ATTRIBUTES:
collection: Collection = getattr(self, collection_name)
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty():
continue

View File

@ -0,0 +1,11 @@
class DownloadException(Exception):
pass
class UrlNotFoundException(DownloadException):
def __init__(self, url: str, *args: object) -> None:
self.url = url
super().__init__(*args)
def __str__(self) -> str:
return f"Couldn't find the page of {self.url}"