Merge pull request 'feature/add_merge_command' (#23) from feature/add_merge_command into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

Reviewed-on: #23
This commit is contained in:
Hazel 2024-05-07 12:15:07 +00:00
commit 585e8c9671
6 changed files with 161 additions and 91 deletions

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Psychonaut 4", "s: #a Crystal F",
"d: 0" "dm: 10, 20"
] ]

View File

@ -6,16 +6,18 @@ import re
from .utils import cli_function from .utils import cli_function
from .options.first_config import initial_config from .options.first_config import initial_config
from ..utils import output, BColors
from ..utils.config import write_config, main_settings from ..utils.config import write_config, main_settings
from ..utils.shared import URL_PATTERN from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors from ..utils.enums.colors import BColors
from .. import console from .. import console
from ..download.results import Results, Option, PageResults from ..download.results import Results, Option, PageResults, GoToResults
from ..download.page_attributes import Pages from ..download.page_attributes import Pages
from ..pages import Page from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject from ..objects import Song, Album, Artist, DatabaseObject
@ -174,7 +176,7 @@ class Downloader:
print() print()
page_count = 0 page_count = 0
for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options): for option in self.current_results.formatted_generator():
if isinstance(option, Option): if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object) _downloadable = self.pages.is_downloadable(option.music_object)
@ -249,7 +251,7 @@ class Downloader:
f"Recommendations and suggestions on sites to implement appreciated.\n" f"Recommendations and suggestions on sites to implement appreciated.\n"
f"But don't be a bitch if I don't end up implementing it.") f"But don't be a bitch if I don't end up implementing it.")
return return
self.set_current_options(PageResults(page, data_object.options)) self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options))
self.print_current_options() self.print_current_options()
return return
@ -299,95 +301,121 @@ class Downloader:
self.set_current_options(self.pages.search(parsed_query)) self.set_current_options(self.pages.search(parsed_query))
self.print_current_options() self.print_current_options()
def goto(self, index: int): def goto(self, data_object: DatabaseObject):
page: Type[Page] page: Type[Page]
music_object: DatabaseObject
try: self.pages.fetch_details(data_object)
page, music_object = self.current_results.get_music_object_by_index(index)
except KeyError:
print()
print(f"The option {index} doesn't exist.")
print()
return
self.pages.fetch_details(music_object) print(data_object)
print(data_object.options)
print(music_object) self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options))
print(music_object.options)
self.set_current_options(PageResults(page, music_object.options))
self.print_current_options() self.print_current_options()
def download(self, download_str: str, download_all: bool = False) -> bool: def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool:
to_download: List[DatabaseObject] = [] output()
if len(data_objects) == 1:
if re.match(URL_PATTERN, download_str) is not None: output(f"Downloading {data_objects[0].option_string}...", color=BColors.BOLD)
_, music_objects = self.pages.fetch_url(download_str)
to_download.append(music_objects)
else: else:
index: str output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
for index in download_str.split(", "):
if not index.strip().isdigit():
print()
print(f"Every download thingie has to be an index, not {index}.")
print()
return False
for index in download_str.split(", "):
to_download.append(self.current_results.get_music_object_by_index(int(index))[1])
print()
print("Downloading:")
for download_object in to_download:
print(download_object.option_string)
print()
_result_map: Dict[DatabaseObject, DownloadResult] = dict() _result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in to_download: for database_object in data_objects:
r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all, r = self.pages.download(
process_metadata_anyway=self.process_metadata_anyway) music_object=database_object,
genre=self.genre,
**kwargs
)
_result_map[database_object] = r _result_map[database_object] = r
for music_object, result in _result_map.items(): for music_object, result in _result_map.items():
print() output()
print(music_object.option_string) output(music_object.option_string)
print(result) output(result)
return True return True
def process_input(self, input_str: str) -> bool: def process_input(self, input_str: str) -> bool:
input_str = input_str.strip() try:
processed_input: str = input_str.lower() input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS: if processed_input in EXIT_COMMANDS:
return True return True
if processed_input == ".": if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options() self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options()
return False
command = ""
query = processed_input
if ":" in processed_input:
_ = processed_input.split(":")
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and do_download:
raise MKInvalidInputException(message="You can't search and download at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False
indices = []
for possible_index in query.split(","):
possible_index = possible_index.strip()
if possible_index == "":
continue
i = 0
if possible_index.isdigit():
i = int(possible_index)
else:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
if i < 0 and i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results)}.")
indices.append(i)
selected_objects = [self.current_results[i] for i in indices]
if do_merge:
old_selected_objects = selected_objects
a = old_selected_objects[0]
for b in old_selected_objects[1:]:
if type(a) != type(b):
raise MKInvalidInputException(message="You can't merge different types of objects.")
a.merge(b)
selected_objects = [a]
if do_download:
self.download(selected_objects)
return False
if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0])
return False return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
help_message()
if processed_input.startswith("s: "):
self.search(input_str[3:])
return False
if processed_input.startswith("d: "):
return self.download(input_str[3:])
if processed_input.isdigit():
self.goto(int(processed_input))
return False
if processed_input != "help":
print(f"{BColors.WARNING.value}Invalid input.{BColors.ENDC.value}")
help_message()
return False return False
def mainloop(self): def mainloop(self):

View File

@ -13,31 +13,32 @@ class Option:
class Results: class Results:
def __init__(self) -> None: def __init__(self, max_items_per_page: int = 10, **kwargs) -> None:
self._by_index: Dict[int, DatabaseObject] = dict() self._by_index: Dict[int, DatabaseObject] = dict()
self._page_by_index: Dict[int: Type[Page]] = dict() self._page_by_index: Dict[int: Type[Page]] = dict()
self.max_items_per_page = max_items_per_page
def __iter__(self) -> Generator[DatabaseObject, None, None]: def __iter__(self) -> Generator[DatabaseObject, None, None]:
for option in self.formated_generator(): for option in self.formatted_generator():
if isinstance(option, Option): if isinstance(option, Option):
yield option.music_object yield option.music_object
def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]: def formatted_generator(self) -> Generator[Union[Type[Page], Option], None, None]:
self._by_index = dict() self._by_index = dict()
self._page_by_index = dict() self._page_by_index = dict()
def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]: def __getitem__(self, index: int):
# if this throws a key error, either the formatted generator needs to be iterated, or the option doesn't exist. return self._by_index[index]
return self._page_by_index[index], self._by_index[index]
class SearchResults(Results): class SearchResults(Results):
def __init__( def __init__(
self, self,
pages: Tuple[Type[Page], ...] = None pages: Tuple[Type[Page], ...] = None,
**kwargs,
) -> None: ) -> None:
super().__init__() super().__init__(**kwargs)
self.pages = pages or [] self.pages = pages or []
# this would initialize a list for every page, which I don't think I want # this would initialize a list for every page, which I don't think I want
@ -54,9 +55,12 @@ class SearchResults(Results):
def get_page_results(self, page: Type[Page]) -> "PageResults": def get_page_results(self, page: Type[Page]) -> "PageResults":
return PageResults(page, self.results.get(page, [])) return PageResults(page, self.results.get(page, []))
def __len__(self) -> int:
return sum(min(self.max_items_per_page, len(results)) for results in self.results.values())
def formated_generator(self, max_items_per_page: int = 10): def formatted_generator(self):
super().formated_generator() super().formatted_generator()
i = 0 i = 0
for page in self.results: for page in self.results:
@ -70,19 +74,37 @@ class SearchResults(Results):
i += 1 i += 1
j += 1 j += 1
if j >= max_items_per_page: if j >= self.max_items_per_page:
break break
class GoToResults(Results):
def __init__(self, results: List[DatabaseObject], **kwargs):
self.results: List[DatabaseObject] = results
super().__init__(**kwargs)
def __getitem__(self, index: int):
return self.results[index]
def __len__(self) -> int:
return len(self.results)
def formatted_generator(self):
yield from (Option(i, o) for i, o in enumerate(self.results))
class PageResults(Results): class PageResults(Results):
def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None: def __init__(self, page: Type[Page], results: List[DatabaseObject], **kwargs) -> None:
super().__init__() super().__init__(**kwargs)
self.page: Type[Page] = page self.page: Type[Page] = page
self.results: List[DatabaseObject] = results self.results: List[DatabaseObject] = results
def formated_generator(self, max_items_per_page: int = 10): def formatted_generator(self, max_items_per_page: int = 10):
super().formated_generator() super().formatted_generator()
i = 0 i = 0
yield self.page yield self.page
@ -92,3 +114,6 @@ class PageResults(Results):
self._by_index[i] = option self._by_index[i] = option
self._page_by_index[i] = self.page self._page_by_index[i] = self.page
i += 1 i += 1
def __len__(self) -> int:
return len(self.results)

View File

@ -403,18 +403,20 @@ class Page:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.") self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r return r
skip_intervals = []
if not found_on_disc: if not found_on_disc:
for source in sources: for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error: if not r.is_fatal_error:
skip_intervals = self.get_skip_intervals(song, source)
break break
if temp_target.exists: if temp_target.exists:
r.merge(self._post_process_targets( r.merge(self._post_process_targets(
song=song, song=song,
temp_target=temp_target, temp_target=temp_target,
interval_list=[] if found_on_disc else self.get_skip_intervals(song, source) interval_list=skip_intervals,
)) ))
return r return r

View File

@ -19,8 +19,13 @@ def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str): if not isinstance(msg, str):
msg = str(msg) msg = str(msg)
endc = BColors.ENDC.value
if color is BColors.ENDC: if color is BColors.ENDC:
return msg return msg
msg = msg.replace(BColors.ENDC.value, BColors.ENDC.value + color.value)
return color.value + msg + BColors.ENDC.value return color.value + msg + BColors.ENDC.value

View File

@ -1 +1,11 @@
__all__ = ["config"] class MKBaseException(Exception):
def __init__(self, message: str = None, **kwargs) -> None:
self.message = message
super().__init__(message, **kwargs)
class MKFrontendException(MKBaseException):
pass
class MKInvalidInputException(MKFrontendException):
pass