Compare commits
	
		
			3 Commits
		
	
	
		
			ead4f83456
			...
			feature/mu
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 265c9f462f | |||
| 780daac0ef | |||
| 465af49057 | 
							
								
								
									
										6
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							| @@ -17,12 +17,6 @@ | |||||||
|             "request": "launch", |             "request": "launch", | ||||||
|             "program": "development/actual_donwload.py", |             "program": "development/actual_donwload.py", | ||||||
|             "console": "integratedTerminal" |             "console": "integratedTerminal" | ||||||
|         }, |  | ||||||
|         { |  | ||||||
|             "name": "Python Debugger: Music Kraken", |  | ||||||
|             "type": "debugpy", |  | ||||||
|             "request": "launch", // run the module |  | ||||||
|             "program": "${workspaceFolder}/.vscode/run_script.py", |  | ||||||
|         } |         } | ||||||
|     ] |     ] | ||||||
| } | } | ||||||
							
								
								
									
										3
									
								
								.vscode/run_script.py
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.vscode/run_script.py
									
									
									
									
										vendored
									
									
								
							| @@ -1,3 +0,0 @@ | |||||||
| from music_kraken.__main__ import cli |  | ||||||
|  |  | ||||||
| cli() |  | ||||||
| @@ -1,25 +1,89 @@ | |||||||
| import random | import random | ||||||
| import re | from typing import Set, Type, Dict, List | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from typing import Dict, Generator, List, Set, Type, Union | import re | ||||||
|  |  | ||||||
| from .. import console | from .utils import cli_function | ||||||
| from ..download import Downloader, Page, components | from .options.first_config import initial_config | ||||||
| from ..download.results import GoToResults |  | ||||||
| from ..download.results import Option as ResultOption | from ..utils import output, BColors | ||||||
| from ..download.results import PageResults, Results | from ..utils.config import write_config, main_settings | ||||||
| from ..objects import Album, Artist, DatabaseObject, Song | from ..utils.shared import URL_PATTERN | ||||||
| from ..utils import BColors, output | from ..utils.string_processing import fit_to_file_system | ||||||
| from ..utils.config import main_settings, write_config | from ..utils.support_classes.query import Query | ||||||
| from ..utils.enums.colors import BColors | from ..utils.support_classes.download_result import DownloadResult | ||||||
| from ..utils.exception import MKInvalidInputException | from ..utils.exception import MKInvalidInputException | ||||||
| from ..utils.exception.download import UrlNotFoundException | from ..utils.exception.download import UrlNotFoundException | ||||||
| from ..utils.shared import HELP_MESSAGE, URL_PATTERN | from ..utils.enums.colors import BColors | ||||||
| from ..utils.string_processing import fit_to_file_system | from .. import console | ||||||
| from ..utils.support_classes.download_result import DownloadResult |  | ||||||
| from ..utils.support_classes.query import Query | from ..download.results import Results, Option, PageResults, GoToResults | ||||||
| from .options.first_config import initial_config | from ..download.page_attributes import Pages | ||||||
| from .utils import ask_for_bool, cli_function | from ..pages import Page | ||||||
|  | from ..objects import Song, Album, Artist, DatabaseObject | ||||||
|  |  | ||||||
|  | """ | ||||||
|  | This is the implementation of the Shell | ||||||
|  |  | ||||||
|  | # Behaviour | ||||||
|  |  | ||||||
|  | ## Searching | ||||||
|  |  | ||||||
|  | ```mkshell | ||||||
|  | > s: {querry or url} | ||||||
|  |  | ||||||
|  | # examples | ||||||
|  | > s: https://musify.club/release/some-random-release-183028492 | ||||||
|  | > s: r: #a an Artist #r some random Release | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | Searches for an url, or an query | ||||||
|  |  | ||||||
|  | ### Query Syntax | ||||||
|  |  | ||||||
|  | ``` | ||||||
|  | #a {artist} #r {release} #t {track} | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | You can escape stuff like `#` doing this: `\#` | ||||||
|  |  | ||||||
|  | ## Downloading | ||||||
|  |  | ||||||
|  | To download something, you either need a direct link, or you need to have already searched for options | ||||||
|  |  | ||||||
|  | ```mkshell | ||||||
|  | > d: {option ids or direct url} | ||||||
|  |  | ||||||
|  | # examples | ||||||
|  | > d: 0, 3, 4 | ||||||
|  | > d: 1 | ||||||
|  | > d: https://musify.club/release/some-random-release-183028492 | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | ## Misc | ||||||
|  |  | ||||||
|  | ### Exit | ||||||
|  |  | ||||||
|  | ```mkshell | ||||||
|  | > q | ||||||
|  | > quit | ||||||
|  | > exit | ||||||
|  | > abort | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | ### Current Options | ||||||
|  |  | ||||||
|  | ```mkshell | ||||||
|  | > . | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | ### Previous Options | ||||||
|  |  | ||||||
|  | ``` | ||||||
|  | > .. | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | """ | ||||||
|  |  | ||||||
| EXIT_COMMANDS = {"q", "quit", "exit", "abort"} | EXIT_COMMANDS = {"q", "quit", "exit", "abort"} | ||||||
| ALPHABET = "abcdefghijklmnopqrstuvwxyz" | ALPHABET = "abcdefghijklmnopqrstuvwxyz" | ||||||
| @@ -27,40 +91,59 @@ PAGE_NAME_FILL = "-" | |||||||
| MAX_PAGE_LEN = 21 | MAX_PAGE_LEN = 21 | ||||||
|  |  | ||||||
|  |  | ||||||
| class GenreIO(components.HumanIO): | def get_existing_genre() -> List[str]: | ||||||
|     @staticmethod |     """ | ||||||
|     def ask_to_create(option: components.Option) -> bool: |     gets the name of all subdirectories of shared.MUSIC_DIR, | ||||||
|         output() |     but filters out all directories, where the name matches with any patern | ||||||
|         return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}") |     from shared.NOT_A_GENRE_REGEX. | ||||||
|  |     """ | ||||||
|  |     existing_genres: List[str] = [] | ||||||
|  |  | ||||||
|     @staticmethod |     # get all subdirectories of MUSIC_DIR, not the files in the dir. | ||||||
|     def not_found(key: str) -> None: |     existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()] | ||||||
|         output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL) |  | ||||||
|  |     for subdirectory in existing_subdirectories: | ||||||
|  |         name: str = subdirectory.name | ||||||
|  |  | ||||||
|  |         if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]): | ||||||
|  |             existing_genres.append(name) | ||||||
|  |  | ||||||
|  |     existing_genres.sort() | ||||||
|  |  | ||||||
|  |     return existing_genres | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_genre(): | def get_genre(): | ||||||
|     select_genre = components.GenreSelect() |     existing_genres = get_existing_genre() | ||||||
|     select_genre.human_io = GenreIO |     for i, genre_option in enumerate(existing_genres): | ||||||
|  |         print(f"{i + 1:0>2}: {genre_option}") | ||||||
|  |  | ||||||
|     genre: Optional[components.Option] = None |     while True: | ||||||
|  |         genre = input("Id or new genre: ") | ||||||
|  |  | ||||||
|     while genre is None: |         if genre.isdigit(): | ||||||
|         print(select_genre.pprint()) |             genre_id = int(genre) - 1 | ||||||
|         print() |             if genre_id >= len(existing_genres): | ||||||
|  |                 print(f"No genre under the id {genre_id + 1}.") | ||||||
|  |                 continue | ||||||
|  |  | ||||||
|         genre = select_genre.choose(input("> ")) |             return existing_genres[genre_id] | ||||||
|  |  | ||||||
|     return genre.value |         new_genre = fit_to_file_system(genre) | ||||||
|  |  | ||||||
|  |         agree_inputs = {"y", "yes", "ok"} | ||||||
|  |         verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower() | ||||||
|  |         if verification in agree_inputs: | ||||||
|  |             return new_genre | ||||||
|  |  | ||||||
|  |  | ||||||
| def help_message(): | def help_message(): | ||||||
|     print(HELP_MESSAGE) |  | ||||||
|     print() |     print() | ||||||
|     print(random.choice(main_settings["happy_messages"])) |     print(random.choice(main_settings["happy_messages"])) | ||||||
|     print() |     print() | ||||||
|  |  | ||||||
|  |  | ||||||
| class CliDownloader: | class Downloader: | ||||||
|     def __init__( |     def __init__( | ||||||
|             self, |             self, | ||||||
|             exclude_pages: Set[Type[Page]] = None, |             exclude_pages: Set[Type[Page]] = None, | ||||||
| @@ -70,7 +153,7 @@ class CliDownloader: | |||||||
|             genre: str = None, |             genre: str = None, | ||||||
|             process_metadata_anyway: bool = False, |             process_metadata_anyway: bool = False, | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady) |         self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady) | ||||||
|  |  | ||||||
|         self.page_dict: Dict[str, Type[Page]] = dict() |         self.page_dict: Dict[str, Type[Page]] = dict() | ||||||
|  |  | ||||||
| @@ -88,16 +171,13 @@ class CliDownloader: | |||||||
|         output() |         output() | ||||||
|  |  | ||||||
|     def print_current_options(self): |     def print_current_options(self): | ||||||
|  |         self.page_dict = dict() | ||||||
|  |  | ||||||
|         print() |         print() | ||||||
|         print(self.current_results.pprint()) |  | ||||||
|  |  | ||||||
|         """ |  | ||||||
|         self.page_dict = dict() |  | ||||||
|          |  | ||||||
|         page_count = 0 |         page_count = 0 | ||||||
|         for option in self.current_results.formatted_generator(): |         for option in self.current_results.formatted_generator(): | ||||||
|             if isinstance(option, ResultOption): |             if isinstance(option, Option): | ||||||
|                 r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" |                 r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" | ||||||
|                 print(r) |                 print(r) | ||||||
|             else: |             else: | ||||||
| @@ -109,13 +189,10 @@ class CliDownloader: | |||||||
|                 self.page_dict[option.__name__] = option |                 self.page_dict[option.__name__] = option | ||||||
|  |  | ||||||
|                 page_count += 1 |                 page_count += 1 | ||||||
|         """ |  | ||||||
|  |  | ||||||
|         print() |         print() | ||||||
|  |  | ||||||
|     def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]): |     def set_current_options(self, current_options: Results): | ||||||
|         current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options) |  | ||||||
|  |  | ||||||
|         if main_settings["result_history"]: |         if main_settings["result_history"]: | ||||||
|             self._result_history.append(current_options) |             self._result_history.append(current_options) | ||||||
|  |  | ||||||
| @@ -165,7 +242,7 @@ class CliDownloader: | |||||||
|     def search(self, query: str): |     def search(self, query: str): | ||||||
|         if re.match(URL_PATTERN, query) is not None: |         if re.match(URL_PATTERN, query) is not None: | ||||||
|             try: |             try: | ||||||
|                 data_object = self.downloader.fetch_url(query) |                 page, data_object = self.pages.fetch_url(query) | ||||||
|             except UrlNotFoundException as e: |             except UrlNotFoundException as e: | ||||||
|                 print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" |                 print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" | ||||||
|                       f"PR appreciated if the site isn't implemented.\n" |                       f"PR appreciated if the site isn't implemented.\n" | ||||||
| @@ -219,17 +296,15 @@ class CliDownloader: | |||||||
|  |  | ||||||
|         parsed_query: Query = self._process_parsed(key_text, query) |         parsed_query: Query = self._process_parsed(key_text, query) | ||||||
|  |  | ||||||
|         self.set_current_options(self.downloader.search(parsed_query)) |         self.set_current_options(self.pages.search(parsed_query)) | ||||||
|         self.print_current_options() |         self.print_current_options() | ||||||
|  |  | ||||||
|     def goto(self, data_object: Union[DatabaseObject, components.Select]): |     def goto(self, data_object: DatabaseObject): | ||||||
|         page: Type[Page] |         page: Type[Page] | ||||||
|  |  | ||||||
|         if isinstance(data_object, components.Select): |         self.pages.fetch_details(data_object, stop_at_level=1) | ||||||
|             self.set_current_options(data_object) |  | ||||||
|         else: |         self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options)) | ||||||
|             self.downloader.fetch_details(data_object, stop_at_level=1) |  | ||||||
|             self.set_current_options(data_object.options) |  | ||||||
|  |  | ||||||
|         self.print_current_options() |         self.print_current_options() | ||||||
|  |  | ||||||
| @@ -241,7 +316,7 @@ class CliDownloader: | |||||||
|         _result_map: Dict[DatabaseObject, DownloadResult] = dict() |         _result_map: Dict[DatabaseObject, DownloadResult] = dict() | ||||||
|  |  | ||||||
|         for database_object in data_objects: |         for database_object in data_objects: | ||||||
|             r = self.downloader.download( |             r = self.pages.download( | ||||||
|                 data_object=database_object,  |                 data_object=database_object,  | ||||||
|                 genre=self.genre,  |                 genre=self.genre,  | ||||||
|                 **kwargs |                 **kwargs | ||||||
| @@ -296,15 +371,24 @@ class CliDownloader: | |||||||
|  |  | ||||||
|                 indices = [] |                 indices = [] | ||||||
|                 for possible_index in q.split(","): |                 for possible_index in q.split(","): | ||||||
|  |                     possible_index = possible_index.strip() | ||||||
|                     if possible_index == "": |                     if possible_index == "": | ||||||
|                         continue |                         continue | ||||||
|  |  | ||||||
|                     if possible_index not in self.current_results: |  | ||||||
|                         raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.") |  | ||||||
|                      |                      | ||||||
|                     yield self.current_results[possible_index] |                     i = 0 | ||||||
|  |                     try: | ||||||
|  |                         i = int(possible_index) | ||||||
|  |                     except ValueError: | ||||||
|  |                         raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.") | ||||||
|  |  | ||||||
|             selected_objects = list(get_selected_objects(query)) |                     if i < 0 or i >= len(self.current_results): | ||||||
|  |                         raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.") | ||||||
|  |                      | ||||||
|  |                     indices.append(i) | ||||||
|  |  | ||||||
|  |                 return [self.current_results[i] for i in indices] | ||||||
|  |  | ||||||
|  |             selected_objects = get_selected_objects(query) | ||||||
|  |  | ||||||
|             if do_merge: |             if do_merge: | ||||||
|                 old_selected_objects = selected_objects |                 old_selected_objects = selected_objects | ||||||
| @@ -319,19 +403,19 @@ class CliDownloader: | |||||||
|  |  | ||||||
|             if do_fetch: |             if do_fetch: | ||||||
|                 for data_object in selected_objects: |                 for data_object in selected_objects: | ||||||
|                     self.downloader.fetch_details(data_object) |                     self.pages.fetch_details(data_object) | ||||||
|  |  | ||||||
|                 self.print_current_options() |                 self.print_current_options() | ||||||
|                 return False |                 return False | ||||||
|  |  | ||||||
|             if do_download: |             if do_download: | ||||||
|                 self.download(list(o.value for o in selected_objects)) |                 self.download(selected_objects) | ||||||
|                 return False |                 return False | ||||||
|  |  | ||||||
|             if len(selected_objects) != 1: |             if len(selected_objects) != 1: | ||||||
|                 raise MKInvalidInputException(message="You can only go to one object at a time without merging.") |                 raise MKInvalidInputException(message="You can only go to one object at a time without merging.") | ||||||
|  |  | ||||||
|             self.goto(selected_objects[0].value) |             self.goto(selected_objects[0]) | ||||||
|             return False |             return False | ||||||
|         except MKInvalidInputException as e: |         except MKInvalidInputException as e: | ||||||
|             output("\n" + e.message + "\n", color=BColors.FAIL) |             output("\n" + e.message + "\n", color=BColors.FAIL) | ||||||
| @@ -362,7 +446,7 @@ def download( | |||||||
|         else: |         else: | ||||||
|             print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") |             print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") | ||||||
|  |  | ||||||
|     shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway) |     shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway) | ||||||
|  |  | ||||||
|     if command_list is not None: |     if command_list is not None: | ||||||
|         for command in command_list: |         for command in command_list: | ||||||
|   | |||||||
| @@ -1,4 +1,3 @@ | |||||||
| from ..utils import BColors |  | ||||||
| from ..utils.shared import get_random_message | from ..utils.shared import get_random_message | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -40,8 +39,4 @@ def print_cute_message(): | |||||||
|         print(message) |         print(message) | ||||||
|  |  | ||||||
|  |  | ||||||
| AGREE_INPUTS = {"y", "yes", "ok"} |  | ||||||
| def ask_for_bool(msg: str) -> bool: |  | ||||||
|     i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower() |  | ||||||
|     return i in AGREE_INPUTS |  | ||||||
|      |      | ||||||
| @@ -1,36 +1,8 @@ | |||||||
| from __future__ import annotations |  | ||||||
|  |  | ||||||
| import logging |  | ||||||
| import random |  | ||||||
| import re |  | ||||||
| from collections import defaultdict |  | ||||||
| from copy import copy |  | ||||||
| from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||||
| from pathlib import Path | from typing import Set | ||||||
| from string import Formatter |  | ||||||
| from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List, |  | ||||||
|                     Optional, Set, Tuple, Type, TypedDict, Union) |  | ||||||
|  |  | ||||||
| import requests | from ..utils.config import main_settings | ||||||
| from bs4 import BeautifulSoup |  | ||||||
|  |  | ||||||
| from ..audio import correct_codec, write_metadata_to_target |  | ||||||
| from ..connection import Connection |  | ||||||
| from ..objects import Album, Artist, Collection |  | ||||||
| from ..objects import DatabaseObject as DataObject |  | ||||||
| from ..objects import Label, Options, Song, Source, Target |  | ||||||
| from ..utils import BColors, output, trace |  | ||||||
| from ..utils.config import main_settings, youtube_settings |  | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ..utils.enums.album import AlbumType | from ..utils.enums.album import AlbumType | ||||||
| from ..utils.exception import MKComposeException, MKMissingNameException |  | ||||||
| from ..utils.exception.download import UrlNotFoundException |  | ||||||
| from ..utils.path_manager import LOCATIONS |  | ||||||
| from ..utils.shared import DEBUG_PAGES |  | ||||||
| from ..utils.string_processing import fit_to_file_system |  | ||||||
| from ..utils.support_classes.download_result import DownloadResult |  | ||||||
| from ..utils.support_classes.query import Query |  | ||||||
| from .results import SearchResults |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @dataclass | @dataclass | ||||||
| @@ -47,402 +19,3 @@ class DownloadOptions: | |||||||
|     download_again_if_found: bool = False |     download_again_if_found: bool = False | ||||||
|     process_audio_if_found: bool = False |     process_audio_if_found: bool = False | ||||||
|     process_metadata_if_found: bool = True |     process_metadata_if_found: bool = True | ||||||
|  |  | ||||||
|  |  | ||||||
| fetch_map = { |  | ||||||
|     Song: "fetch_song", |  | ||||||
|     Album: "fetch_album", |  | ||||||
|     Artist: "fetch_artist", |  | ||||||
|     Label: "fetch_label", |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Downloader: |  | ||||||
|     def __init__( |  | ||||||
|         self,  |  | ||||||
|         auto_register_pages: bool = True,  |  | ||||||
|         download_options: DownloadOptions = None,  |  | ||||||
|         fetch_options: FetchOptions = None,  |  | ||||||
|         **kwargs |  | ||||||
|     ): |  | ||||||
|         self.LOGGER = logging.getLogger("download") |  | ||||||
|          |  | ||||||
|         self.download_options: DownloadOptions = download_options or DownloadOptions() |  | ||||||
|         self.fetch_options: FetchOptions = fetch_options or FetchOptions() |  | ||||||
|  |  | ||||||
|         self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set) |  | ||||||
|         if auto_register_pages: |  | ||||||
|             self.scan_for_pages(**kwargs) |  | ||||||
|  |  | ||||||
|     def register_page(self, page_type: Type[Page], **kwargs): |  | ||||||
|         if page_type in self._registered_pages: |  | ||||||
|             return |  | ||||||
|  |  | ||||||
|         self._registered_pages[page_type].add(page_type( |  | ||||||
|             download_options=self.download_options,  |  | ||||||
|             fetch_options=self.fetch_options,  |  | ||||||
|             **kwargs |  | ||||||
|         )) |  | ||||||
|  |  | ||||||
|     def deregister_page(self, page_type: Type[Page]): |  | ||||||
|         if page_type not in _registered_pages: |  | ||||||
|             return |  | ||||||
|  |  | ||||||
|         for p in self._registered_pages[page_type]: |  | ||||||
|             p.__del__() |  | ||||||
|         del self._registered_pages[page_type] |  | ||||||
|  |  | ||||||
|     def scan_for_pages(self, **kwargs): |  | ||||||
|         # assuming the wanted pages are the leaf classes of the interface |  | ||||||
|         from .. import pages |  | ||||||
|          |  | ||||||
|         leaf_classes = [] |  | ||||||
|  |  | ||||||
|         class_list = [Page] |  | ||||||
|         while len(class_list): |  | ||||||
|             _class = class_list.pop() |  | ||||||
|             class_subclasses = _class.__subclasses__() |  | ||||||
|  |  | ||||||
|             if len(class_subclasses) == 0: |  | ||||||
|                 if _class.REGISTER: |  | ||||||
|                     leaf_classes.append(_class) |  | ||||||
|             else: |  | ||||||
|                 class_list.extend(class_subclasses) |  | ||||||
|  |  | ||||||
|         if Page in leaf_classes: |  | ||||||
|             self.LOGGER.warn("couldn't find any data source") |  | ||||||
|             return |  | ||||||
|         for leaf_class in leaf_classes: |  | ||||||
|             self.register_page(leaf_class, **kwargs) |  | ||||||
|  |  | ||||||
|     def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]: |  | ||||||
|         if len(page_types) == 0: |  | ||||||
|             page_types = self._registered_pages.keys() |  | ||||||
|  |  | ||||||
|         for page_type in page_types: |  | ||||||
|             yield from self._registered_pages[page_type] |  | ||||||
|  |  | ||||||
|     def search(self, query: Query) -> Generator[DataObject, None, None]: |  | ||||||
|         for page in self.get_pages(): |  | ||||||
|             yield from page.search(query=query) |  | ||||||
|      |  | ||||||
|     def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: |  | ||||||
|         source: Source |  | ||||||
|         for source in data_object.source_collection.get_sources(source_type_sorting={ |  | ||||||
|             "only_with_page": True, |  | ||||||
|         }): |  | ||||||
|             new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) |  | ||||||
|             if new_data_object is not None: |  | ||||||
|                 data_object.merge(new_data_object) |  | ||||||
|  |  | ||||||
|         return data_object |  | ||||||
|  |  | ||||||
|     def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: |  | ||||||
|         if not source.has_page: |  | ||||||
|             return None |  | ||||||
|          |  | ||||||
|         source_type = source.page.get_source_type(source=source) |  | ||||||
|         if source_type is None: |  | ||||||
|             self.LOGGER.debug(f"Could not determine source type for {source}.") |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         func = getattr(source.page, fetch_map[source_type]) |  | ||||||
|          |  | ||||||
|         # fetching the data object and marking it as fetched |  | ||||||
|         data_object: DataObject = func(source=source, **kwargs) |  | ||||||
|         data_object.mark_as_fetched(source.hash_url) |  | ||||||
|         return data_object |  | ||||||
|  |  | ||||||
|     def fetch_from_url(self, url: str) -> Optional[DataObject]: |  | ||||||
|         source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) |  | ||||||
|         if source is None: |  | ||||||
|             return None |  | ||||||
|          |  | ||||||
|         return self.fetch_from_source(source=source) |  | ||||||
|      |  | ||||||
|     def _skip_object(self, data_object: DataObject) -> bool: |  | ||||||
|         if isinstance(data_object, Album): |  | ||||||
|             if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: |  | ||||||
|                 return True |  | ||||||
|          |  | ||||||
|         return False |  | ||||||
|  |  | ||||||
|     def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: |  | ||||||
|         # fetch the given object |  | ||||||
|         self.fetch_details(data_object) |  | ||||||
|         output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) |  | ||||||
|          |  | ||||||
|         # fetching all parent objects (e.g. if you only download a song) |  | ||||||
|         if not kwargs.get("fetched_upwards", False): |  | ||||||
|             to_fetch: List[DataObject] = [data_object] |  | ||||||
|  |  | ||||||
|             while len(to_fetch) > 0: |  | ||||||
|                 new_to_fetch = [] |  | ||||||
|                 for d in to_fetch: |  | ||||||
|                     if self._skip_object(d): |  | ||||||
|                         continue |  | ||||||
|  |  | ||||||
|                     self.fetch_details(d) |  | ||||||
|  |  | ||||||
|                     for c in d.get_parent_collections(): |  | ||||||
|                         new_to_fetch.extend(c) |  | ||||||
|  |  | ||||||
|                 to_fetch = new_to_fetch |  | ||||||
|              |  | ||||||
|             kwargs["fetched_upwards"] = True |  | ||||||
|          |  | ||||||
|         # download all children |  | ||||||
|         download_result: DownloadResult = DownloadResult() |  | ||||||
|         for c in data_object.get_child_collections(): |  | ||||||
|             for d in c: |  | ||||||
|                 if self._skip_object(d): |  | ||||||
|                     continue |  | ||||||
|  |  | ||||||
|                 download_result.merge(self.download(d, genre, **kwargs)) |  | ||||||
|  |  | ||||||
|         # actually download if the object is a song |  | ||||||
|         if isinstance(data_object, Song): |  | ||||||
|             """ |  | ||||||
|             TODO |  | ||||||
|             add the traced artist and album to the naming. |  | ||||||
|             I am able to do that, because duplicate values are removed later on. |  | ||||||
|             """ |  | ||||||
|  |  | ||||||
|             self._download_song(data_object, naming={ |  | ||||||
|                 "genre": [genre], |  | ||||||
|                 "audio_format": [main_settings["audio_format"]], |  | ||||||
|             }) |  | ||||||
|  |  | ||||||
|         return download_result |  | ||||||
|  |  | ||||||
|     def _extract_fields_from_template(self, path_template: str) -> Set[str]: |  | ||||||
|         return set(re.findall(r"{([^}]+)}", path_template)) |  | ||||||
|  |  | ||||||
|     def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: |  | ||||||
|         field_names: Set[str] = self._extract_fields_from_template(path_template) |  | ||||||
|          |  | ||||||
|         for field in field_names: |  | ||||||
|             if len(naming[field]) == 0: |  | ||||||
|                 raise MKMissingNameException(f"Missing field for {field}.") |  | ||||||
|  |  | ||||||
|             path_template = path_template.replace(f"{{{field}}}", naming[field][0]) |  | ||||||
|  |  | ||||||
|         return path_template |  | ||||||
|  |  | ||||||
|     def _download_song(self, song: Song, naming: dict) -> DownloadOptions: |  | ||||||
|         """ |  | ||||||
|         TODO |  | ||||||
|         Search the song in the file system. |  | ||||||
|         """ |  | ||||||
|         r = DownloadResult(total=1) |  | ||||||
|          |  | ||||||
|         # pre process the data recursively |  | ||||||
|         song.compile() |  | ||||||
|          |  | ||||||
|         # manage the naming |  | ||||||
|         naming: Dict[str, List[str]] = defaultdict(list, naming) |  | ||||||
|         naming["song"].append(song.title_value) |  | ||||||
|         naming["isrc"].append(song.isrc) |  | ||||||
|         naming["album"].extend(a.title_value for a in song.album_collection) |  | ||||||
|         naming["album_type"].extend(a.album_type.value for a in song.album_collection) |  | ||||||
|         naming["artist"].extend(a.name for a in song.artist_collection) |  | ||||||
|         naming["artist"].extend(a.name for a in song.feature_artist_collection) |  | ||||||
|         for a in song.album_collection: |  | ||||||
|             naming["label"].extend([l.title_value for l in a.label_collection]) |  | ||||||
|         # removing duplicates from the naming, and process the strings |  | ||||||
|         for key, value in naming.items(): |  | ||||||
|             # https://stackoverflow.com/a/17016257 |  | ||||||
|             naming[key] = list(dict.fromkeys(value)) |  | ||||||
|         song.genre = naming["genre"][0] |  | ||||||
|  |  | ||||||
|         # manage the targets |  | ||||||
|         tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) |  | ||||||
|  |  | ||||||
|         song.target_collection.append(Target( |  | ||||||
|             relative_to_music_dir=True, |  | ||||||
|             file_path=Path( |  | ||||||
|                 self._parse_path_template(main_settings["download_path"], naming=naming),  |  | ||||||
|                 self._parse_path_template(main_settings["download_file"], naming=naming), |  | ||||||
|             ) |  | ||||||
|         )) |  | ||||||
|         for target in song.target_collection: |  | ||||||
|             if target.exists: |  | ||||||
|                 output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) |  | ||||||
|                 r.found_on_disk += 1 |  | ||||||
|  |  | ||||||
|                 if not self.download_options.download_again_if_found: |  | ||||||
|                     target.copy_content(tmp) |  | ||||||
|             else: |  | ||||||
|                 target.create_path() |  | ||||||
|                 output(f'{target.file_path}', color=BColors.GREY) |  | ||||||
|  |  | ||||||
|         # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source |  | ||||||
|         used_source: Optional[Source] = None |  | ||||||
|         skip_intervals: List[Tuple[float, float]] = [] |  | ||||||
|         for source in song.source_collection.get_sources(source_type_sorting={ |  | ||||||
|             "only_with_page": True, |  | ||||||
|             "sort_key": lambda page: page.download_priority, |  | ||||||
|             "reverse": True, |  | ||||||
|         }): |  | ||||||
|             if tmp.exists: |  | ||||||
|                 break |  | ||||||
|  |  | ||||||
|             used_source = source |  | ||||||
|             streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") |  | ||||||
|             skip_intervals = source.page.get_skip_intervals(song=song, source=source) |  | ||||||
|  |  | ||||||
|             # if something has been downloaded but it somehow failed, delete the file |  | ||||||
|             if streaming_results.is_fatal_error and tmp.exists: |  | ||||||
|                 tmp.delete() |  | ||||||
|  |  | ||||||
|         # if everything went right, the file should exist now |  | ||||||
|         if not tmp.exists: |  | ||||||
|             if used_source is None: |  | ||||||
|                 r.error_message = f"No source found for {song.option_string}." |  | ||||||
|             else: |  | ||||||
|                 r.error_message = f"Something went wrong downloading {song.option_string}." |  | ||||||
|             return r |  | ||||||
|  |  | ||||||
|         # post process the audio |  | ||||||
|         found_on_disk = used_source is None |  | ||||||
|         if not found_on_disk or self.download_options.process_audio_if_found: |  | ||||||
|             correct_codec(target=tmp, skip_intervals=skip_intervals) |  | ||||||
|             r.sponsor_segments = len(skip_intervals) |  | ||||||
|  |  | ||||||
|         if used_source is not None: |  | ||||||
|             used_source.page.post_process_hook(song=song, temp_target=tmp) |  | ||||||
|  |  | ||||||
|         if not found_on_disk or self.download_options.process_metadata_if_found: |  | ||||||
|             write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) |  | ||||||
|  |  | ||||||
|         # copy the tmp target to the final locations |  | ||||||
|         for target in song.target_collection: |  | ||||||
|             tmp.copy_content(target) |  | ||||||
|  |  | ||||||
|         tmp.delete() |  | ||||||
|         return r |  | ||||||
|  |  | ||||||
|     def fetch_url(self, url: str, **kwargs) -> DataObject: |  | ||||||
|         source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) |  | ||||||
|          |  | ||||||
|         if source is None or source.page is None: |  | ||||||
|             raise UrlNotFoundException(url=url) |  | ||||||
|          |  | ||||||
|         return source.page.fetch_object_from_source(source=source, **kwargs) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Page: |  | ||||||
|     REGISTER = True |  | ||||||
|     SOURCE_TYPE: SourceType |  | ||||||
|     LOGGER: logging.Logger |  | ||||||
|  |  | ||||||
|     def __new__(cls, *args, **kwargs): |  | ||||||
|         cls.LOGGER = logging.getLogger(cls.__name__) |  | ||||||
|         return super().__new__(cls) |  | ||||||
|  |  | ||||||
|     @classmethod |  | ||||||
|     def is_leaf_page(cls) -> bool: |  | ||||||
|         return len(cls.__subclasses__()) == 0 |  | ||||||
|  |  | ||||||
|     def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs): |  | ||||||
|         self.SOURCE_TYPE.register_page(self) |  | ||||||
|          |  | ||||||
|         self.download_options: DownloadOptions = download_options or DownloadOptions() |  | ||||||
|         self.fetch_options: FetchOptions = fetch_options or FetchOptions() |  | ||||||
|  |  | ||||||
|     def __del__(self): |  | ||||||
|         self.SOURCE_TYPE.deregister_page() |  | ||||||
|  |  | ||||||
|     def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): |  | ||||||
|         """ |  | ||||||
|         Perform a regex search on the given string, using a single or a list of |  | ||||||
|         patterns returning the first matching group. |  | ||||||
|         In case of failure return a default value or raise a WARNING or a |  | ||||||
|         RegexNotFoundError, depending on fatal, specifying the field name. |  | ||||||
|         """ |  | ||||||
|  |  | ||||||
|         if isinstance(pattern, str): |  | ||||||
|             mobj = re.search(pattern, string, flags) |  | ||||||
|         else: |  | ||||||
|             for p in pattern: |  | ||||||
|                 mobj = re.search(p, string, flags) |  | ||||||
|                 if mobj: |  | ||||||
|                     break |  | ||||||
|  |  | ||||||
|         if mobj: |  | ||||||
|             if group is None: |  | ||||||
|                 # return the first matching group |  | ||||||
|                 return next(g for g in mobj.groups() if g is not None) |  | ||||||
|             elif isinstance(group, (list, tuple)): |  | ||||||
|                 return tuple(mobj.group(g) for g in group) |  | ||||||
|             else: |  | ||||||
|                 return mobj.group(group) |  | ||||||
|  |  | ||||||
|         return default |  | ||||||
|  |  | ||||||
|     def get_source_type(self, source: Source) -> Optional[Type[DataObject]]: |  | ||||||
|         return None |  | ||||||
|  |  | ||||||
|     def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: |  | ||||||
|         return BeautifulSoup(r.content, "html.parser") |  | ||||||
|  |  | ||||||
|     # to search stuff |  | ||||||
|     def search(self, query: Query) -> List[DataObject]: |  | ||||||
|         music_object = query.music_object |  | ||||||
|  |  | ||||||
|         search_functions = { |  | ||||||
|             Song: self.song_search, |  | ||||||
|             Album: self.album_search, |  | ||||||
|             Artist: self.artist_search, |  | ||||||
|             Label: self.label_search |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if type(music_object) in search_functions: |  | ||||||
|             r = search_functions[type(music_object)](music_object) |  | ||||||
|             if r is not None and len(r) > 0: |  | ||||||
|                 return r |  | ||||||
|  |  | ||||||
|         r = [] |  | ||||||
|         for default_query in query.default_search: |  | ||||||
|             for single_option in self.general_search(default_query): |  | ||||||
|                 r.append(single_option) |  | ||||||
|  |  | ||||||
|         return r |  | ||||||
|  |  | ||||||
|     def general_search(self, search_query: str) -> List[DataObject]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     def label_search(self, label: Label) -> List[Label]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     def artist_search(self, artist: Artist) -> List[Artist]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     def album_search(self, album: Album) -> List[Album]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     def song_search(self, song: Song) -> List[Song]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     # to fetch stuff |  | ||||||
|     def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: |  | ||||||
|         return Song() |  | ||||||
|  |  | ||||||
|     def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: |  | ||||||
|         return Album() |  | ||||||
|  |  | ||||||
|     def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: |  | ||||||
|         return Artist() |  | ||||||
|  |  | ||||||
|     def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: |  | ||||||
|         return Label() |  | ||||||
|  |  | ||||||
|     # to download stuff |  | ||||||
|     def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: |  | ||||||
|         return [] |  | ||||||
|  |  | ||||||
|     def post_process_hook(self, song: Song, temp_target: Target, **kwargs): |  | ||||||
|         pass |  | ||||||
|  |  | ||||||
|     def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: |  | ||||||
|         return DownloadResult() |  | ||||||
|   | |||||||
| @@ -1,303 +0,0 @@ | |||||||
| from __future__ import annotations |  | ||||||
|  |  | ||||||
| import re |  | ||||||
| from collections import defaultdict |  | ||||||
| from pathlib import Path |  | ||||||
| from typing import Any, Callable, Dict, Generator, List, Optional |  | ||||||
|  |  | ||||||
| from ..objects import OuterProxy as DataObject |  | ||||||
| from ..utils import BColors |  | ||||||
| from ..utils.config import main_settings |  | ||||||
| from ..utils.enums import SourceType |  | ||||||
| from ..utils.exception import MKComposeException |  | ||||||
| from ..utils.shared import ALPHABET |  | ||||||
| from ..utils.string_processing import unify |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class HumanIO: |  | ||||||
|     @staticmethod |  | ||||||
|     def ask_to_create(option: Option) -> bool: |  | ||||||
|         return True |  | ||||||
|  |  | ||||||
|     @staticmethod |  | ||||||
|     def not_found(key: Any) -> None: |  | ||||||
|         return None |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Option: |  | ||||||
|     """ |  | ||||||
|     This could represent a data object, a string or a page. |  | ||||||
|     """ |  | ||||||
|  |  | ||||||
|     def __init__( |  | ||||||
|         self,  |  | ||||||
|         value: Any,  |  | ||||||
|         text: Optional[str] = None,  |  | ||||||
|         keys: List[Any] = None,  |  | ||||||
|         hidden: bool = False,  |  | ||||||
|         parse_key: Callable[[Any], Any] = lambda x: x, |  | ||||||
|         index: int = None, |  | ||||||
|     ): |  | ||||||
|         self._parse_key: Callable[[Any], Any] = parse_key |  | ||||||
|          |  | ||||||
|         self._index = index |  | ||||||
|         self.value = value |  | ||||||
|         self._text = text or str(value) |  | ||||||
|         self.hidden = hidden |  | ||||||
|  |  | ||||||
|         self._raw_keys = set(keys or []) |  | ||||||
|         self._raw_keys.add(self.text) |  | ||||||
|         try: |  | ||||||
|             self._raw_keys.add(self.value) |  | ||||||
|         except TypeError: |  | ||||||
|             pass |  | ||||||
|         self._raw_keys.add(str(self.value)) |  | ||||||
|         self._raw_keys.add(self._index) |  | ||||||
|         self.keys = set(self.parse_key(key) for key in self._raw_keys) |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def text(self) -> str: |  | ||||||
|         return self._text.replace("{index}", str(self.index)) |  | ||||||
|      |  | ||||||
|     @text.setter |  | ||||||
|     def text(self, value: str): |  | ||||||
|         self._text = value |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def index(self) -> int: |  | ||||||
|         return self._index |  | ||||||
|  |  | ||||||
|     @index.setter |  | ||||||
|     def index(self, value: int): |  | ||||||
|         p = self._parse_key(self._index) |  | ||||||
|         if p in self.keys: |  | ||||||
|             self.keys.remove(p) |  | ||||||
|         self._index = value |  | ||||||
|         self.keys.add(p) |  | ||||||
|      |  | ||||||
|     def register_key(self, key: Any): |  | ||||||
|         self._raw_keys.add(key) |  | ||||||
|         self.keys.add(self._parse_key(key)) |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def parse_key(self) -> Callable[[Any], Any]: |  | ||||||
|         return self._parse_key |  | ||||||
|  |  | ||||||
|     @parse_key.setter |  | ||||||
|     def parse_key(self, value: Callable[[Any], Any]): |  | ||||||
|         self._parse_key = value |  | ||||||
|         self.keys = set(self._parse_key(key) for key in self._raw_keys) |  | ||||||
|  |  | ||||||
|     def __str__(self): |  | ||||||
|         return self.text |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Select: |  | ||||||
|     def __init__( |  | ||||||
|         self,  |  | ||||||
|         options: Generator[Option, None, None] = None,  |  | ||||||
|         option_factory: Callable[[Any], Option] = None, |  | ||||||
|         raw_options: List[Any] = None, |  | ||||||
|         parse_option_key: Callable[[Any], Any] = lambda x: x, |  | ||||||
|         human_io: HumanIO = HumanIO, |  | ||||||
|         sort: bool = False, |  | ||||||
|         **kwargs |  | ||||||
|     ): |  | ||||||
|         self._parse_option_key: Callable[[Any], Any] = parse_option_key |  | ||||||
|         self.human_io: HumanIO = human_io |  | ||||||
|  |  | ||||||
|         self._key_to_option: Dict[Any, Option] = dict() |  | ||||||
|         self._options: List[Option] = [] |  | ||||||
|  |  | ||||||
|         options = options or [] |  | ||||||
|         self.option_factory: Optional[Callable[[Any], Option]] = option_factory |  | ||||||
|         if self.can_create_options: |  | ||||||
|             _raw_options = raw_options or [] |  | ||||||
|             if sort: |  | ||||||
|                 _raw_options = sorted(_raw_options) |  | ||||||
|  |  | ||||||
|             for raw_option in _raw_options: |  | ||||||
|                 self.append(self.option_factory(raw_option)) |  | ||||||
|         elif raw_options is not None: |  | ||||||
|             raise MKComposeException("Cannot create options without a factory.") |  | ||||||
|  |  | ||||||
|         self.extend(options) |  | ||||||
|  |  | ||||||
|     @property |  | ||||||
|     def can_create_options(self) -> bool: |  | ||||||
|         return self.option_factory is not None |  | ||||||
|  |  | ||||||
|     def append(self, option: Option): |  | ||||||
|         option.parse_key = self._parse_option_key |  | ||||||
|         self._options.append(option) |  | ||||||
|         for key in option.keys: |  | ||||||
|             self._key_to_option[key] = option |  | ||||||
|  |  | ||||||
|     def _remap(self): |  | ||||||
|         self._key_to_option = dict() |  | ||||||
|         for option in self._options: |  | ||||||
|             for key in option.keys: |  | ||||||
|                 self._key_to_option[key] = option |  | ||||||
|  |  | ||||||
|     def extend(self, options: List[Option]): |  | ||||||
|         for option in options: |  | ||||||
|             self.append(option) |  | ||||||
|  |  | ||||||
|     def __iter__(self) -> Generator[Option, None, None]: |  | ||||||
|         for option in self._options: |  | ||||||
|             if option.hidden: |  | ||||||
|                 continue |  | ||||||
|              |  | ||||||
|             yield option |  | ||||||
|  |  | ||||||
|     def __contains__(self, key: Any) -> bool: |  | ||||||
|         return self._parse_option_key(key) in self._key_to_option |  | ||||||
|  |  | ||||||
|     def __getitem__(self, key: Any) -> Option: |  | ||||||
|         r = self._key_to_option[self._parse_option_key(key)] |  | ||||||
|         if callable(r): |  | ||||||
|             r = r() |  | ||||||
|         if callable(r.value): |  | ||||||
|             r.value = r.value() |  | ||||||
|         return r |  | ||||||
|  |  | ||||||
|     def create_option(self, key: Any, **kwargs) -> Option: |  | ||||||
|         if not self.can_create_options: |  | ||||||
|             raise MKComposeException("Cannot create options without a factory.") |  | ||||||
|  |  | ||||||
|         option = self.option_factory(key, **kwargs) |  | ||||||
|         self.append(option) |  | ||||||
|         return option |  | ||||||
|  |  | ||||||
|     def choose(self, key: Any) -> Optional[Option]: |  | ||||||
|         if key not in self: |  | ||||||
|             if self.can_create_options: |  | ||||||
|                 c = self.create_option(key) |  | ||||||
|                 if self.human_io.ask_to_create(c): |  | ||||||
|                     return c |  | ||||||
|              |  | ||||||
|             self.human_io.not_found(key) |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         return self[key] |  | ||||||
|  |  | ||||||
|     def pprint(self) -> str: |  | ||||||
|         return "\n".join(str(option) for option in self) |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class StringSelect(Select): |  | ||||||
|     def __init__(self, **kwargs): |  | ||||||
|         self._current_index = 0 |  | ||||||
|         kwargs["option_factory"] = self.next_option |  | ||||||
|         kwargs["parse_option_key"] = lambda x: unify(str(x)) |  | ||||||
|  |  | ||||||
|         super().__init__(**kwargs) |  | ||||||
|  |  | ||||||
|     def next_option(self, value: Any) -> Optional[Option]: |  | ||||||
|         o = Option(value=value, keys=[self._current_index], text=f"{BColors.BOLD.value}{self._current_index: >2}{BColors.ENDC.value}: {value}") |  | ||||||
|         self._current_index += 1 |  | ||||||
|         return o |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class GenreSelect(StringSelect): |  | ||||||
|     @staticmethod |  | ||||||
|     def is_valid_genre(genre: Path) -> bool: |  | ||||||
|         """ |  | ||||||
|         gets the name of all subdirectories of shared.MUSIC_DIR, |  | ||||||
|         but filters out all directories, where the name matches with any Patern |  | ||||||
|         from shared.NOT_A_GENRE_REGEX. |  | ||||||
|         """ |  | ||||||
|         if not genre.is_dir(): |  | ||||||
|             return False |  | ||||||
|  |  | ||||||
|         if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): |  | ||||||
|             return False |  | ||||||
|  |  | ||||||
|         return True |  | ||||||
|  |  | ||||||
|     def __init__(self): |  | ||||||
|         super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir()))) |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class SourceTypeToOption(dict): |  | ||||||
|     def __init__(self, callback): |  | ||||||
|         super().__init__() |  | ||||||
|          |  | ||||||
|         self.callback = callback |  | ||||||
|  |  | ||||||
|     def __missing__(self, key): |  | ||||||
|         self[key] = self.callback(key) |  | ||||||
|         return self[key] |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class DataObjectSelect(Select): |  | ||||||
|     def __init__(self, data_objects: Generator[DataObject]): |  | ||||||
|         self._source_type_to_data_objects: Dict[SourceType, List[Option]] = defaultdict(list) |  | ||||||
|         self._source_type_to_option: Dict[SourceType, Option] = SourceTypeToOption(self.option_from_source_type) |  | ||||||
|  |  | ||||||
|         self._data_object_index: int = 0 |  | ||||||
|         self._source_type_index: int = 0 |  | ||||||
|  |  | ||||||
|         super().__init__( |  | ||||||
|             parse_option_key=lambda x: unify(str(x)), |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|         self.extend(data_objects) |  | ||||||
|  |  | ||||||
|     def option_from_data_object(self, data_object: DataObject) -> Option: |  | ||||||
|         index = self._data_object_index |  | ||||||
|         self._data_object_index += 1 |  | ||||||
|  |  | ||||||
|         return Option( |  | ||||||
|             value=data_object, |  | ||||||
|             keys=[index, data_object.option_string, data_object.title_string], |  | ||||||
|             text=f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {data_object.option_string}", |  | ||||||
|             index=index, |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|     def option_from_source_type(self, source_type: SourceType) -> Option: |  | ||||||
|         index = ALPHABET[self._source_type_index % len(ALPHABET)] |  | ||||||
|         self._source_type_index += 1 |  | ||||||
|  |  | ||||||
|         o = Option( |  | ||||||
|             value=lambda: DataObjectSelect(self._source_type_to_data_objects[source_type]), |  | ||||||
|             keys=[index, source_type], |  | ||||||
|             text=f"{BColors.HEADER.value}({index}) --------------------------------{source_type.name:{'-'}<{21}}--------------------{BColors.ENDC.value}", |  | ||||||
|         ) |  | ||||||
|  |  | ||||||
|         super().append(o) |  | ||||||
|  |  | ||||||
|         return o |  | ||||||
|  |  | ||||||
|     def append(self, option: Union[Option, DataObject]): |  | ||||||
|         if isinstance(option, DataObject): |  | ||||||
|             data_object = option |  | ||||||
|             option = self.option_from_data_object(data_object) |  | ||||||
|         else: |  | ||||||
|             data_object = option.value |  | ||||||
|  |  | ||||||
|         for source_type in data_object.source_collection.source_types(only_with_page=True): |  | ||||||
|             self._source_type_to_data_objects[source_type].append(option) |  | ||||||
|  |  | ||||||
|         super().append(option) |  | ||||||
|  |  | ||||||
|     def __iter__(self): |  | ||||||
|         source_types = list(sorted(self._source_type_to_data_objects.keys(), key=lambda x: x.name)) |  | ||||||
|         single_source = len(source_types) > 1 |  | ||||||
|  |  | ||||||
|         j = 0 |  | ||||||
|         for st in source_types: |  | ||||||
|             if single_source: |  | ||||||
|                 yield self._source_type_to_option[st] |  | ||||||
|  |  | ||||||
|             limit = min(15, len(self._source_type_to_data_objects[st])) if single_source else len(self._source_type_to_data_objects[st]) |  | ||||||
|  |  | ||||||
|             for i in range(limit): |  | ||||||
|                 o = self._source_type_to_data_objects[st][i] |  | ||||||
|                 o.index = j |  | ||||||
|                 yield o |  | ||||||
|                 j += 1 |  | ||||||
|  |  | ||||||
|         self._remap() |  | ||||||
							
								
								
									
										329
									
								
								music_kraken/download/page_attributes.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										329
									
								
								music_kraken/download/page_attributes.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,329 @@ | |||||||
|  | from typing import Tuple, Type, Dict, Set, Optional, List | ||||||
|  | from collections import defaultdict | ||||||
|  | from pathlib import Path | ||||||
|  | import re | ||||||
|  | import logging | ||||||
|  |  | ||||||
|  | from . import FetchOptions, DownloadOptions | ||||||
|  | from .results import SearchResults | ||||||
|  | from ..objects import ( | ||||||
|  |     DatabaseObject as DataObject, | ||||||
|  |     Collection, | ||||||
|  |     Target, | ||||||
|  |     Source, | ||||||
|  |     Options, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Artist, | ||||||
|  |     Label, | ||||||
|  | ) | ||||||
|  | from ..audio import write_metadata_to_target, correct_codec | ||||||
|  | from ..utils import output, BColors | ||||||
|  | from ..utils.string_processing import fit_to_file_system | ||||||
|  | from ..utils.config import youtube_settings, main_settings | ||||||
|  | from ..utils.path_manager import LOCATIONS | ||||||
|  | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
|  | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  | from ..utils.support_classes.query import Query | ||||||
|  | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  | from ..utils.exception import MKMissingNameException | ||||||
|  | from ..utils.exception.download import UrlNotFoundException | ||||||
|  | from ..utils.shared import DEBUG_PAGES | ||||||
|  |  | ||||||
|  | from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Musicbrainz, Genius, INDEPENDENT_DB_OBJECTS | ||||||
|  |  | ||||||
|  |  | ||||||
|  | ALL_PAGES: Set[Type[Page]] = { | ||||||
|  |     # EncyclopaediaMetallum, | ||||||
|  |     Genius, | ||||||
|  |     Musify, | ||||||
|  |     YoutubeMusic, | ||||||
|  |     Bandcamp, | ||||||
|  |     Musicbrainz | ||||||
|  | } | ||||||
|  |  | ||||||
|  | if youtube_settings["use_youtube_alongside_youtube_music"]: | ||||||
|  |     ALL_PAGES.add(YouTube) | ||||||
|  |  | ||||||
|  | AUDIO_PAGES: Set[Type[Page]] = { | ||||||
|  |     Musify, | ||||||
|  |     YouTube, | ||||||
|  |     YoutubeMusic, | ||||||
|  |     Bandcamp | ||||||
|  | } | ||||||
|  |  | ||||||
|  | SHADY_PAGES: Set[Type[Page]] = { | ||||||
|  |     Musify, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fetch_map = { | ||||||
|  |     Song: "fetch_song", | ||||||
|  |     Album: "fetch_album", | ||||||
|  |     Artist: "fetch_artist", | ||||||
|  |     Label: "fetch_label", | ||||||
|  | } | ||||||
|  |  | ||||||
|  | if DEBUG_PAGES: | ||||||
|  |     DEBUGGING_PAGE = Bandcamp | ||||||
|  |     print(f"Only downloading from page {DEBUGGING_PAGE}.") | ||||||
|  |  | ||||||
|  |     ALL_PAGES = {DEBUGGING_PAGE} | ||||||
|  |     AUDIO_PAGES = ALL_PAGES.union(AUDIO_PAGES) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Pages: | ||||||
|  |     def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): | ||||||
|  |         self.LOGGER = logging.getLogger("download") | ||||||
|  |          | ||||||
|  |         self.download_options: DownloadOptions = download_options or DownloadOptions() | ||||||
|  |         self.fetch_options: FetchOptions = fetch_options or FetchOptions() | ||||||
|  |  | ||||||
|  |         # initialize all page instances | ||||||
|  |         self._page_instances: Dict[Type[Page], Page] = dict() | ||||||
|  |         self._source_to_page: Dict[SourceType, Type[Page]] = dict() | ||||||
|  |          | ||||||
|  |         exclude_pages = exclude_pages if exclude_pages is not None else set() | ||||||
|  |          | ||||||
|  |         if exclude_shady: | ||||||
|  |             exclude_pages = exclude_pages.union(SHADY_PAGES) | ||||||
|  |          | ||||||
|  |         if not exclude_pages.issubset(ALL_PAGES): | ||||||
|  |             raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}") | ||||||
|  |          | ||||||
|  |         def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]: | ||||||
|  |             return tuple(sorted(page_set, key=lambda page: page.__name__)) | ||||||
|  |          | ||||||
|  |         self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages) | ||||||
|  |         self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set) | ||||||
|  |  | ||||||
|  |         self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES) | ||||||
|  |         self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) | ||||||
|  |          | ||||||
|  |         for page_type in self.pages: | ||||||
|  |             self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options) | ||||||
|  |             self._source_to_page[page_type.SOURCE_TYPE] = page_type | ||||||
|  |  | ||||||
|  |     def _get_page_from_enum(self, source_page: SourceType) -> Page: | ||||||
|  |         if source_page not in self._source_to_page: | ||||||
|  |             return None | ||||||
|  |         return self._page_instances[self._source_to_page[source_page]] | ||||||
|  |  | ||||||
|  |     def search(self, query: Query) -> SearchResults: | ||||||
|  |         result = SearchResults() | ||||||
|  |          | ||||||
|  |         for page_type in self.pages: | ||||||
|  |             result.add( | ||||||
|  |                 page=page_type, | ||||||
|  |                 search_result=self._page_instances[page_type].search(query=query) | ||||||
|  |             ) | ||||||
|  |              | ||||||
|  |         return result | ||||||
|  |      | ||||||
|  |     def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: | ||||||
|  |         if not isinstance(data_object, INDEPENDENT_DB_OBJECTS): | ||||||
|  |             return data_object | ||||||
|  |          | ||||||
|  |         source: Source | ||||||
|  |         for source in data_object.source_collection.get_sources(source_type_sorting={ | ||||||
|  |             "only_with_page": True, | ||||||
|  |         }): | ||||||
|  |             new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) | ||||||
|  |             if new_data_object is not None: | ||||||
|  |                 data_object.merge(new_data_object) | ||||||
|  |  | ||||||
|  |         return data_object | ||||||
|  |  | ||||||
|  |     def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: | ||||||
|  |         if not source.has_page: | ||||||
|  |             return None | ||||||
|  |          | ||||||
|  |         source_type = source.page.get_source_type(source=source) | ||||||
|  |         if source_type is None: | ||||||
|  |             self.LOGGER.debug(f"Could not determine source type for {source}.") | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         func = getattr(source.page, fetch_map[source_type]) | ||||||
|  |          | ||||||
|  |         # fetching the data object and marking it as fetched | ||||||
|  |         data_object: DataObject = func(source=source, **kwargs) | ||||||
|  |         data_object.mark_as_fetched(source.hash_url) | ||||||
|  |         return data_object | ||||||
|  |  | ||||||
|  |     def fetch_from_url(self, url: str) -> Optional[DataObject]: | ||||||
|  |         source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) | ||||||
|  |         if source is None: | ||||||
|  |             return None | ||||||
|  |          | ||||||
|  |         return self.fetch_from_source(source=source) | ||||||
|  |      | ||||||
|  |     def _skip_object(self, data_object: DataObject) -> bool: | ||||||
|  |         if isinstance(data_object, Album): | ||||||
|  |             if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: | ||||||
|  |                 return True | ||||||
|  |          | ||||||
|  |         return False | ||||||
|  |  | ||||||
|  |     def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: | ||||||
|  |         # fetch the given object | ||||||
|  |         self.fetch_details(data_object) | ||||||
|  |         output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) | ||||||
|  |          | ||||||
|  |         # fetching all parent objects (e.g. if you only download a song) | ||||||
|  |         if not kwargs.get("fetched_upwards", False): | ||||||
|  |             to_fetch: List[DataObject] = [data_object] | ||||||
|  |  | ||||||
|  |             while len(to_fetch) > 0: | ||||||
|  |                 new_to_fetch = [] | ||||||
|  |                 for d in to_fetch: | ||||||
|  |                     if self._skip_object(d): | ||||||
|  |                         continue | ||||||
|  |  | ||||||
|  |                     self.fetch_details(d) | ||||||
|  |  | ||||||
|  |                     for c in d.get_parent_collections(): | ||||||
|  |                         new_to_fetch.extend(c) | ||||||
|  |  | ||||||
|  |                 to_fetch = new_to_fetch | ||||||
|  |              | ||||||
|  |             kwargs["fetched_upwards"] = True | ||||||
|  |          | ||||||
|  |         # download all children | ||||||
|  |         download_result: DownloadResult = DownloadResult() | ||||||
|  |         for c in data_object.get_child_collections(): | ||||||
|  |             for d in c: | ||||||
|  |                 if self._skip_object(d): | ||||||
|  |                     continue | ||||||
|  |  | ||||||
|  |                 download_result.merge(self.download(d, genre, **kwargs)) | ||||||
|  |  | ||||||
|  |         # actually download if the object is a song | ||||||
|  |         if isinstance(data_object, Song): | ||||||
|  |             """ | ||||||
|  |             TODO | ||||||
|  |             add the traced artist and album to the naming. | ||||||
|  |             I am able to do that, because duplicate values are removed later on. | ||||||
|  |             """ | ||||||
|  |  | ||||||
|  |             self._download_song(data_object, naming={ | ||||||
|  |                 "genre": [genre], | ||||||
|  |                 "audio_format": [main_settings["audio_format"]], | ||||||
|  |             }) | ||||||
|  |  | ||||||
|  |         return download_result | ||||||
|  |  | ||||||
|  |     def _extract_fields_from_template(self, path_template: str) -> Set[str]: | ||||||
|  |         return set(re.findall(r"{([^}]+)}", path_template)) | ||||||
|  |  | ||||||
|  |     def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: | ||||||
|  |         field_names: Set[str] = self._extract_fields_from_template(path_template) | ||||||
|  |          | ||||||
|  |         for field in field_names: | ||||||
|  |             if len(naming[field]) == 0: | ||||||
|  |                 raise MKMissingNameException(f"Missing field for {field}.") | ||||||
|  |  | ||||||
|  |             path_template = path_template.replace(f"{{{field}}}", naming[field][0]) | ||||||
|  |  | ||||||
|  |         return path_template | ||||||
|  |  | ||||||
|  |     def _download_song(self, song: Song, naming: dict) -> DownloadOptions: | ||||||
|  |         """ | ||||||
|  |         TODO | ||||||
|  |         Search the song in the file system. | ||||||
|  |         """ | ||||||
|  |         r = DownloadResult(total=1) | ||||||
|  |          | ||||||
|  |         # pre process the data recursively | ||||||
|  |         song.compile() | ||||||
|  |          | ||||||
|  |         # manage the naming | ||||||
|  |         naming: Dict[str, List[str]] = defaultdict(list, naming) | ||||||
|  |         naming["song"].append(song.title_value) | ||||||
|  |         naming["isrc"].append(song.isrc) | ||||||
|  |         naming["album"].extend(a.title_value for a in song.album_collection) | ||||||
|  |         naming["album_type"].extend(a.album_type.value for a in song.album_collection) | ||||||
|  |         naming["artist"].extend(a.name for a in song.artist_collection) | ||||||
|  |         naming["artist"].extend(a.name for a in song.feature_artist_collection) | ||||||
|  |         for a in song.album_collection: | ||||||
|  |             naming["label"].extend([l.title_value for l in a.label_collection]) | ||||||
|  |         # removing duplicates from the naming, and process the strings | ||||||
|  |         for key, value in naming.items(): | ||||||
|  |             # https://stackoverflow.com/a/17016257 | ||||||
|  |             naming[key] = list(dict.fromkeys(value)) | ||||||
|  |         song.genre = naming["genre"][0] | ||||||
|  |  | ||||||
|  |         # manage the targets | ||||||
|  |         tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) | ||||||
|  |  | ||||||
|  |         song.target_collection.append(Target( | ||||||
|  |             relative_to_music_dir=True, | ||||||
|  |             file_path=Path( | ||||||
|  |                 self._parse_path_template(main_settings["download_path"], naming=naming),  | ||||||
|  |                 self._parse_path_template(main_settings["download_file"], naming=naming), | ||||||
|  |             ) | ||||||
|  |         )) | ||||||
|  |         for target in song.target_collection: | ||||||
|  |             if target.exists: | ||||||
|  |                 output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) | ||||||
|  |                 r.found_on_disk += 1 | ||||||
|  |  | ||||||
|  |                 if not self.download_options.download_again_if_found: | ||||||
|  |                     target.copy_content(tmp) | ||||||
|  |             else: | ||||||
|  |                 target.create_path() | ||||||
|  |                 output(f'{target.file_path}', color=BColors.GREY) | ||||||
|  |  | ||||||
|  |         # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source | ||||||
|  |         used_source: Optional[Source] = None | ||||||
|  |         skip_intervals: List[Tuple[float, float]] = [] | ||||||
|  |         for source in song.source_collection.get_sources(source_type_sorting={ | ||||||
|  |             "only_with_page": True, | ||||||
|  |             "sort_key": lambda page: page.download_priority, | ||||||
|  |             "reverse": True, | ||||||
|  |         }): | ||||||
|  |             if tmp.exists: | ||||||
|  |                 break | ||||||
|  |  | ||||||
|  |             used_source = source | ||||||
|  |             streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") | ||||||
|  |             skip_intervals = source.page.get_skip_intervals(song=song, source=source) | ||||||
|  |  | ||||||
|  |             # if something has been downloaded but it somehow failed, delete the file | ||||||
|  |             if streaming_results.is_fatal_error and tmp.exists: | ||||||
|  |                 tmp.delete() | ||||||
|  |  | ||||||
|  |         # if everything went right, the file should exist now | ||||||
|  |         if not tmp.exists: | ||||||
|  |             if used_source is None: | ||||||
|  |                 r.error_message = f"No source found for {song.option_string}." | ||||||
|  |             else: | ||||||
|  |                 r.error_message = f"Something went wrong downloading {song.option_string}." | ||||||
|  |             return r | ||||||
|  |  | ||||||
|  |         # post process the audio | ||||||
|  |         found_on_disk = used_source is None | ||||||
|  |         if not found_on_disk or self.download_options.process_audio_if_found: | ||||||
|  |             correct_codec(target=tmp, skip_intervals=skip_intervals) | ||||||
|  |             r.sponsor_segments = len(skip_intervals) | ||||||
|  |  | ||||||
|  |         if used_source is not None: | ||||||
|  |             used_source.page.post_process_hook(song=song, temp_target=tmp) | ||||||
|  |  | ||||||
|  |         if not found_on_disk or self.download_options.process_metadata_if_found: | ||||||
|  |             write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) | ||||||
|  |  | ||||||
|  |         # copy the tmp target to the final locations | ||||||
|  |         for target in song.target_collection: | ||||||
|  |             tmp.copy_content(target) | ||||||
|  |  | ||||||
|  |         tmp.delete() | ||||||
|  |         return r | ||||||
|  |  | ||||||
|  |     def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: | ||||||
|  |         source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) | ||||||
|  |          | ||||||
|  |         if source is None: | ||||||
|  |             raise UrlNotFoundException(url=url) | ||||||
|  |          | ||||||
|  |         _actual_page = self._source_to_page[source.source_type] | ||||||
|  |          | ||||||
|  |         return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) | ||||||
| @@ -1,12 +1,8 @@ | |||||||
| from __future__ import annotations | from typing import Tuple, Type, Dict, List, Generator, Union | ||||||
|  |  | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
| from typing import TYPE_CHECKING, Dict, Generator, List, Tuple, Type, Union |  | ||||||
|  |  | ||||||
| from ..objects import DatabaseObject | from ..objects import DatabaseObject | ||||||
|  | from ..pages import Page, EncyclopaediaMetallum, Musify | ||||||
| if TYPE_CHECKING: |  | ||||||
|     from . import Page |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @dataclass | @dataclass | ||||||
|   | |||||||
| @@ -1,52 +1,9 @@ | |||||||
| import importlib | from .encyclopaedia_metallum import EncyclopaediaMetallum | ||||||
| import inspect | from .musify import Musify | ||||||
| import logging | from .musicbrainz import Musicbrainz | ||||||
| import pkgutil | from .youtube import YouTube | ||||||
| import sys | from .youtube_music import YoutubeMusic | ||||||
| from collections import defaultdict | from .bandcamp import Bandcamp | ||||||
| from copy import copy | from .genius import Genius | ||||||
| from pathlib import Path |  | ||||||
| from typing import Dict, Generator, List, Set, Type |  | ||||||
|  |  | ||||||
| from ._bandcamp import Bandcamp | from .abstract import Page, INDEPENDENT_DB_OBJECTS | ||||||
| from ._encyclopaedia_metallum import EncyclopaediaMetallum |  | ||||||
| from ._genius import Genius |  | ||||||
| from ._musify import Musify |  | ||||||
| from ._youtube import YouTube |  | ||||||
| from ._youtube_music import YoutubeMusic |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def import_children(): |  | ||||||
|     _page_directory = Path(__file__).parent |  | ||||||
|     _stem_blacklist = set(["__pycache__", "__init__"]) |  | ||||||
|  |  | ||||||
|     for _file in _page_directory.iterdir(): |  | ||||||
|         if _file.stem in _stem_blacklist: |  | ||||||
|             continue |  | ||||||
|          |  | ||||||
|         logging.debug(f"importing {_file.absolute()}") |  | ||||||
|         exec(f"from . import {_file.stem}") |  | ||||||
|  |  | ||||||
| # module_blacklist = set(sys.modules.keys()) |  | ||||||
| import_children() |  | ||||||
|  |  | ||||||
| """ |  | ||||||
| classes = set() |  | ||||||
|  |  | ||||||
| print(__name__) |  | ||||||
| for module_name, module in sys.modules.items(): |  | ||||||
|     if module_name in module_blacklist or not module_name.startswith(__name__): |  | ||||||
|         continue |  | ||||||
|  |  | ||||||
|     print("scanning module", module_name) |  | ||||||
|     for name, obj in inspect.getmembers(module, predicate=inspect.isclass): |  | ||||||
|         _module = obj.__module__ |  | ||||||
|         if _module.startswith(__name__) and hasattr(obj, "SOURCE_TYPE"): |  | ||||||
|             print("checking object", name, obj.__module__) |  | ||||||
|             classes.add(obj) |  | ||||||
|     print() |  | ||||||
|  |  | ||||||
| print(*(c.__name__ for c in classes), sep=",\t") |  | ||||||
|  |  | ||||||
| __all__ = [c.__name__ for c in classes] |  | ||||||
| """ |  | ||||||
|   | |||||||
							
								
								
									
										157
									
								
								music_kraken/pages/abstract.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										157
									
								
								music_kraken/pages/abstract.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,157 @@ | |||||||
|  | import logging | ||||||
|  | import random | ||||||
|  | import re | ||||||
|  | from copy import copy | ||||||
|  | from pathlib import Path | ||||||
|  | from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict | ||||||
|  | from string import Formatter | ||||||
|  | from dataclasses import dataclass, field | ||||||
|  |  | ||||||
|  | import requests | ||||||
|  | from bs4 import BeautifulSoup | ||||||
|  |  | ||||||
|  | from ..connection import Connection | ||||||
|  | from ..objects import ( | ||||||
|  |     Song, | ||||||
|  |     Source, | ||||||
|  |     Album, | ||||||
|  |     Artist, | ||||||
|  |     Target, | ||||||
|  |     DatabaseObject, | ||||||
|  |     Options, | ||||||
|  |     Collection, | ||||||
|  |     Label, | ||||||
|  | ) | ||||||
|  | from ..utils.enums import SourceType | ||||||
|  | from ..utils.enums.album import AlbumType | ||||||
|  | from ..audio import write_metadata_to_target, correct_codec | ||||||
|  | from ..utils.config import main_settings | ||||||
|  | from ..utils.support_classes.query import Query | ||||||
|  | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  | from ..utils.string_processing import fit_to_file_system | ||||||
|  | from ..utils import trace, output, BColors | ||||||
|  |  | ||||||
|  | INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song] | ||||||
|  | INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]] | ||||||
|  |  | ||||||
|  | @dataclass | ||||||
|  | class FetchOptions: | ||||||
|  |     download_all: bool = False | ||||||
|  |     album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) | ||||||
|  |  | ||||||
|  | @dataclass | ||||||
|  | class DownloadOptions: | ||||||
|  |     download_all: bool = False | ||||||
|  |     album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) | ||||||
|  |  | ||||||
|  |     process_audio_if_found: bool = False | ||||||
|  |     process_metadata_if_found: bool = True | ||||||
|  |  | ||||||
|  | class Page: | ||||||
|  |     SOURCE_TYPE: SourceType | ||||||
|  |     LOGGER: logging.Logger | ||||||
|  |  | ||||||
|  |     def __new__(cls, *args, **kwargs): | ||||||
|  |         cls.LOGGER = logging.getLogger(cls.__name__) | ||||||
|  |  | ||||||
|  |         return super().__new__(cls) | ||||||
|  |  | ||||||
|  |     def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): | ||||||
|  |         self.SOURCE_TYPE.register_page(self) | ||||||
|  |          | ||||||
|  |         self.download_options: DownloadOptions = download_options or DownloadOptions() | ||||||
|  |         self.fetch_options: FetchOptions = fetch_options or FetchOptions() | ||||||
|  |  | ||||||
|  |     def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): | ||||||
|  |         """ | ||||||
|  |         Perform a regex search on the given string, using a single or a list of | ||||||
|  |         patterns returning the first matching group. | ||||||
|  |         In case of failure return a default value or raise a WARNING or a | ||||||
|  |         RegexNotFoundError, depending on fatal, specifying the field name. | ||||||
|  |         """ | ||||||
|  |  | ||||||
|  |         if isinstance(pattern, str): | ||||||
|  |             mobj = re.search(pattern, string, flags) | ||||||
|  |         else: | ||||||
|  |             for p in pattern: | ||||||
|  |                 mobj = re.search(p, string, flags) | ||||||
|  |                 if mobj: | ||||||
|  |                     break | ||||||
|  |  | ||||||
|  |         if mobj: | ||||||
|  |             if group is None: | ||||||
|  |                 # return the first matching group | ||||||
|  |                 return next(g for g in mobj.groups() if g is not None) | ||||||
|  |             elif isinstance(group, (list, tuple)): | ||||||
|  |                 return tuple(mobj.group(g) for g in group) | ||||||
|  |             else: | ||||||
|  |                 return mobj.group(group) | ||||||
|  |  | ||||||
|  |         return default | ||||||
|  |  | ||||||
|  |     def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: | ||||||
|  |         return None | ||||||
|  |  | ||||||
|  |     def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: | ||||||
|  |         return BeautifulSoup(r.content, "html.parser") | ||||||
|  |  | ||||||
|  |     # to search stuff | ||||||
|  |     def search(self, query: Query) -> List[DatabaseObject]: | ||||||
|  |         music_object = query.music_object | ||||||
|  |  | ||||||
|  |         search_functions = { | ||||||
|  |             Song: self.song_search, | ||||||
|  |             Album: self.album_search, | ||||||
|  |             Artist: self.artist_search, | ||||||
|  |             Label: self.label_search | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         if type(music_object) in search_functions: | ||||||
|  |             r = search_functions[type(music_object)](music_object) | ||||||
|  |             if r is not None and len(r) > 0: | ||||||
|  |                 return r | ||||||
|  |  | ||||||
|  |         r = [] | ||||||
|  |         for default_query in query.default_search: | ||||||
|  |             for single_option in self.general_search(default_query): | ||||||
|  |                 r.append(single_option) | ||||||
|  |  | ||||||
|  |         return r | ||||||
|  |  | ||||||
|  |     def general_search(self, search_query: str) -> List[DatabaseObject]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     def label_search(self, label: Label) -> List[Label]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     def artist_search(self, artist: Artist) -> List[Artist]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     def album_search(self, album: Album) -> List[Album]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     def song_search(self, song: Song) -> List[Song]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     # to fetch stuff | ||||||
|  |     def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: | ||||||
|  |         return Song() | ||||||
|  |  | ||||||
|  |     def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: | ||||||
|  |         return Album() | ||||||
|  |  | ||||||
|  |     def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: | ||||||
|  |         return Artist() | ||||||
|  |  | ||||||
|  |     def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: | ||||||
|  |         return Label() | ||||||
|  |  | ||||||
|  |     # to download stuff | ||||||
|  |     def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     def post_process_hook(self, song: Song, temp_target: Target, **kwargs): | ||||||
|  |         pass | ||||||
|  |  | ||||||
|  |     def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: | ||||||
|  |         return DownloadResult() | ||||||
| @@ -1,22 +1,33 @@ | |||||||
| import json |  | ||||||
| from enum import Enum |  | ||||||
| from typing import List, Optional, Type | from typing import List, Optional, Type | ||||||
| from urllib.parse import urlparse, urlunparse | from urllib.parse import urlparse, urlunparse | ||||||
| 
 | import json | ||||||
| import pycountry | from enum import Enum | ||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
|  | import pycountry | ||||||
| 
 | 
 | ||||||
|  | from ..objects import Source, DatabaseObject | ||||||
|  | from .abstract import Page | ||||||
|  | from ..objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     SourceType, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     Contact, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     Lyrics, | ||||||
|  |     FormattedText, | ||||||
|  |     Artwork, | ||||||
|  | ) | ||||||
| from ..connection import Connection | from ..connection import Connection | ||||||
| from ..download import Page |  | ||||||
| from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject, |  | ||||||
|                        FormattedText, ID3Timestamp, Label, Lyrics, Song, |  | ||||||
|                        Source, SourceType, Target) |  | ||||||
| from ..utils import dump_to_file | from ..utils import dump_to_file | ||||||
| from ..utils.config import logging_settings, main_settings | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ..utils.shared import DEBUG |  | ||||||
| from ..utils.string_processing import clean_song_title |  | ||||||
| from ..utils.support_classes.download_result import DownloadResult | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  | from ..utils.string_processing import clean_song_title | ||||||
|  | from ..utils.config import main_settings, logging_settings | ||||||
|  | from ..utils.shared import DEBUG | ||||||
| 
 | 
 | ||||||
| if DEBUG: | if DEBUG: | ||||||
|     from ..utils import dump_to_file |     from ..utils import dump_to_file | ||||||
| @@ -1,20 +1,31 @@ | |||||||
| from collections import defaultdict | from collections import defaultdict | ||||||
| from typing import Dict, List, Optional, Type, Union | from typing import List, Optional, Dict, Type, Union | ||||||
| from urllib.parse import urlencode, urlparse |  | ||||||
| 
 |  | ||||||
| import pycountry |  | ||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
|  | import pycountry | ||||||
|  | from urllib.parse import urlparse, urlencode | ||||||
| 
 | 
 | ||||||
| from ..connection import Connection | from ..connection import Connection | ||||||
| from ..download import Page |  | ||||||
| from ..objects import (Album, Artist, DatabaseObject, FormattedText, |  | ||||||
|                        ID3Timestamp, Label, Lyrics, Options, Song, Source) |  | ||||||
| from ..utils import dump_to_file |  | ||||||
| from ..utils.config import logging_settings | from ..utils.config import logging_settings | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType | from .abstract import Page | ||||||
|  | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| from ..utils.enums.album import AlbumType | from ..utils.enums.album import AlbumType | ||||||
| from ..utils.shared import DEBUG |  | ||||||
| from ..utils.support_classes.query import Query | from ..utils.support_classes.query import Query | ||||||
|  | from ..objects import ( | ||||||
|  |     Lyrics, | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     FormattedText, | ||||||
|  |     Label, | ||||||
|  |     Options, | ||||||
|  |     DatabaseObject | ||||||
|  | ) | ||||||
|  | from ..utils.shared import DEBUG | ||||||
|  | from ..utils import dump_to_file | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, { | ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, { | ||||||
|     "Full-length": AlbumType.STUDIO_ALBUM, |     "Full-length": AlbumType.STUDIO_ALBUM, | ||||||
| @@ -196,7 +207,6 @@ def create_grid( | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class EncyclopaediaMetallum(Page): | class EncyclopaediaMetallum(Page): | ||||||
|     REGISTER = False |  | ||||||
|     SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM |     SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM | ||||||
|     LOGGER = logging_settings["metal_archives_logger"] |     LOGGER = logging_settings["metal_archives_logger"] | ||||||
|      |      | ||||||
| @@ -1,22 +1,33 @@ | |||||||
|  | from typing import List, Optional, Type | ||||||
|  | from urllib.parse import urlparse, urlunparse, urlencode | ||||||
| import json | import json | ||||||
| from enum import Enum | from enum import Enum | ||||||
| from typing import List, Optional, Type |  | ||||||
| from urllib.parse import urlencode, urlparse, urlunparse |  | ||||||
| 
 |  | ||||||
| import pycountry |  | ||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
|  | import pycountry | ||||||
| 
 | 
 | ||||||
|  | from ..objects import Source, DatabaseObject | ||||||
|  | from .abstract import Page | ||||||
|  | from ..objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     SourceType, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     Contact, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     Lyrics, | ||||||
|  |     FormattedText, | ||||||
|  |     Artwork, | ||||||
|  | ) | ||||||
| from ..connection import Connection | from ..connection import Connection | ||||||
| from ..download import Page |  | ||||||
| from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject, |  | ||||||
|                        FormattedText, ID3Timestamp, Label, Lyrics, Song, |  | ||||||
|                        Source, SourceType, Target) |  | ||||||
| from ..utils import dump_to_file, traverse_json_path | from ..utils import dump_to_file, traverse_json_path | ||||||
| from ..utils.config import logging_settings, main_settings | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ..utils.shared import DEBUG |  | ||||||
| from ..utils.string_processing import clean_song_title |  | ||||||
| from ..utils.support_classes.download_result import DownloadResult | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  | from ..utils.string_processing import clean_song_title | ||||||
|  | from ..utils.config import main_settings, logging_settings | ||||||
|  | from ..utils.shared import DEBUG | ||||||
| 
 | 
 | ||||||
| if DEBUG: | if DEBUG: | ||||||
|     from ..utils import dump_to_file |     from ..utils import dump_to_file | ||||||
| @@ -123,7 +134,7 @@ class Genius(Page): | |||||||
|                 source_list=[source], |                 source_list=[source], | ||||||
|                 artist_list=[self.parse_api_object(data.get("artist"))], |                 artist_list=[self.parse_api_object(data.get("artist"))], | ||||||
|                 artwork=artwork, |                 artwork=artwork, | ||||||
|                 date=ID3Timestamp(**data.get("release_date_components", {})), |                 date=ID3Timestamp(**(data.get("release_date_components") or {})), | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         if object_type == "song": |         if object_type == "song": | ||||||
							
								
								
									
										145
									
								
								music_kraken/pages/musicbrainz.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										145
									
								
								music_kraken/pages/musicbrainz.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,145 @@ | |||||||
|  | from collections import defaultdict | ||||||
|  | from dataclasses import dataclass | ||||||
|  | from enum import Enum | ||||||
|  | from typing import List, Optional, Type, Union, Generator, Dict, Any | ||||||
|  | from urllib.parse import urlparse | ||||||
|  |  | ||||||
|  | import pycountry | ||||||
|  | import musicbrainzngs | ||||||
|  | from bs4 import BeautifulSoup | ||||||
|  |  | ||||||
|  | from ..connection import Connection | ||||||
|  | from .abstract import Page | ||||||
|  | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
|  | from ..utils.enums.album import AlbumType, AlbumStatus | ||||||
|  | from ..objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     FormattedText, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     DatabaseObject, | ||||||
|  |     Lyrics, | ||||||
|  |     Artwork | ||||||
|  | ) | ||||||
|  | from ..utils.config import logging_settings, main_settings | ||||||
|  | from ..utils import string_processing, shared | ||||||
|  | from ..utils.string_processing import clean_song_title | ||||||
|  | from ..utils.support_classes.query import Query | ||||||
|  | from ..utils.support_classes.download_result import DownloadResult | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Musicbrainz(Page): | ||||||
|  |     SOURCE_TYPE = ALL_SOURCE_TYPES.MUSICBRAINZ | ||||||
|  |  | ||||||
|  |     HOST = "https://musicbrainz.org" | ||||||
|  |  | ||||||
|  |     def __init__(self, *args, **kwargs): | ||||||
|  |         musicbrainzngs.set_useragent("mk", "1") | ||||||
|  |  | ||||||
|  |         super().__init__(*args, **kwargs) | ||||||
|  |      | ||||||
|  |     def general_search(self, search_query: str) -> List[DatabaseObject]: | ||||||
|  |         search_results = [] | ||||||
|  |  | ||||||
|  |         #Artist | ||||||
|  |         search_results += self.artist_search(search_query).copy() | ||||||
|  |  | ||||||
|  |         #Album | ||||||
|  |         search_results += self.album_search(search_query).copy() | ||||||
|  |  | ||||||
|  |         #Song | ||||||
|  |         search_results += self.song_search(search_query).copy() | ||||||
|  |  | ||||||
|  |         return search_results | ||||||
|  |  | ||||||
|  |     def artist_search(self, search_query: str) -> List[Artist]: | ||||||
|  |         artist_list = [] | ||||||
|  |          | ||||||
|  |         #Artist | ||||||
|  |         artist_dict_list: list = musicbrainzngs.search_artists(search_query)['artist-list'] | ||||||
|  |         artist_source_list: List[Source] = [] | ||||||
|  |         for artist_dict in artist_dict_list: | ||||||
|  |             artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/artist/" + artist_dict['id'])) | ||||||
|  |             artist_list.append(Artist( | ||||||
|  |                 name=artist_dict['name'], | ||||||
|  |                 source_list=artist_source_list | ||||||
|  |             )) | ||||||
|  |    | ||||||
|  |         return artist_list | ||||||
|  |  | ||||||
|  |     def song_search(self, search_query: str) -> List[Song]: | ||||||
|  |         song_list = [] | ||||||
|  |  | ||||||
|  |         #Song | ||||||
|  |         song_dict_list: list = musicbrainzngs.search_recordings(search_query)['recording-list'] | ||||||
|  |         song_source_list: List[Source] = []  | ||||||
|  |         for song_dict in song_dict_list: | ||||||
|  |             song_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/recording/" + song_dict['id']))  | ||||||
|  |             song_list.append(Song( | ||||||
|  |                 title=song_dict['title'], | ||||||
|  |                 source_list=song_source_list | ||||||
|  |             ))  | ||||||
|  |  | ||||||
|  |         return song_list | ||||||
|  |      | ||||||
|  |     def album_search(self, search_query: str) -> List[Album]: | ||||||
|  |         album_list = [] | ||||||
|  |  | ||||||
|  |         #Album | ||||||
|  |         album_dict_list: list = musicbrainzngs.search_release_groups(search_query)['release-group-list'] | ||||||
|  |         album_source_list: List[Source] = [] | ||||||
|  |         for album_dict in album_dict_list: | ||||||
|  |             album_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/release-group/" + album_dict['id'])) | ||||||
|  |             album_list.append(Album( | ||||||
|  |                 title=album_dict['title'], | ||||||
|  |                 source_list=album_source_list | ||||||
|  |             )) | ||||||
|  |  | ||||||
|  |         return album_list | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: | ||||||
|  |         album_list = [] | ||||||
|  |  | ||||||
|  |         #Album | ||||||
|  |         album_dict_list: list = musicbrainzngs.search_release_groups(search_query)['release-group-list'] | ||||||
|  |         album_source_list: List[Source] = [] | ||||||
|  |         for album_dict in album_dict_list: | ||||||
|  |             album_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/release-group/" + album_dict['id'])) | ||||||
|  |             album_list.append(Album( | ||||||
|  |                 title=album_dict['title'], | ||||||
|  |                 source_list=album_source_list | ||||||
|  |             )) | ||||||
|  |  | ||||||
|  |     def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: | ||||||
|  |         artist_list = [] | ||||||
|  |          | ||||||
|  |         #Artist | ||||||
|  |         artist_dict_list: list = musicbrainzngs.search_artists(search_query)['artist-list'] | ||||||
|  |         artist_source_list: List[Source] = [] | ||||||
|  |         for artist_dict in artist_dict_list: | ||||||
|  |             artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/artist/" + artist_dict['id'])) | ||||||
|  |             artist_list.append(Artist( | ||||||
|  |                 name=artist_dict['name'], | ||||||
|  |                 source_list=artist_source_list, | ||||||
|  |             )) | ||||||
|  |  | ||||||
|  |     def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: | ||||||
|  |         song_list = [] | ||||||
|  |  | ||||||
|  |         #Song | ||||||
|  |         song_dict_list: list = musicbrainzngs.search_recordings(search_query)['recording-list'] | ||||||
|  |         song_source_list: List[Source] = []  | ||||||
|  |         for song_dict in song_dict_list: | ||||||
|  |             song_source_list.append(Source(self.SOURCE_TYPE, self.HOST + "/recording/" + song_dict['id']))  | ||||||
|  |             song_list.append(Song( | ||||||
|  |                 title=song_dict['title'], | ||||||
|  |                 source_list=song_source_list | ||||||
|  |             ))  | ||||||
|  |      | ||||||
|  |          | ||||||
| @@ -1,23 +1,34 @@ | |||||||
| from collections import defaultdict | from collections import defaultdict | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
| from enum import Enum | from enum import Enum | ||||||
| from typing import Any, Dict, Generator, List, Optional, Type, Union | from typing import List, Optional, Type, Union, Generator, Dict, Any | ||||||
| from urllib.parse import urlparse | from urllib.parse import urlparse | ||||||
| 
 | 
 | ||||||
| import pycountry | import pycountry | ||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
| 
 | 
 | ||||||
| from ..connection import Connection | from ..connection import Connection | ||||||
| from ..download import Page | from .abstract import Page | ||||||
| from ..objects import (Album, Artist, Artwork, DatabaseObject, FormattedText, | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
|                        ID3Timestamp, Label, Lyrics, Song, Source, Target) | from ..utils.enums.album import AlbumType, AlbumStatus | ||||||
| from ..utils import shared, string_processing | from ..objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     FormattedText, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     DatabaseObject, | ||||||
|  |     Lyrics, | ||||||
|  |     Artwork | ||||||
|  | ) | ||||||
| from ..utils.config import logging_settings, main_settings | from ..utils.config import logging_settings, main_settings | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType | from ..utils import string_processing, shared | ||||||
| from ..utils.enums.album import AlbumStatus, AlbumType |  | ||||||
| from ..utils.string_processing import clean_song_title | from ..utils.string_processing import clean_song_title | ||||||
| from ..utils.support_classes.download_result import DownloadResult |  | ||||||
| from ..utils.support_classes.query import Query | from ..utils.support_classes.query import Query | ||||||
|  | from ..utils.support_classes.download_result import DownloadResult | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent | https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent | ||||||
| @@ -1,19 +1,29 @@ | |||||||
|  | from typing import List, Optional, Type, Tuple | ||||||
|  | from urllib.parse import urlparse, urlunparse, parse_qs | ||||||
| from enum import Enum | from enum import Enum | ||||||
| from typing import List, Optional, Tuple, Type |  | ||||||
| from urllib.parse import parse_qs, urlparse, urlunparse |  | ||||||
| 
 | 
 | ||||||
| import python_sponsorblock | import python_sponsorblock | ||||||
| 
 | 
 | ||||||
|  | from ..objects import Source, DatabaseObject, Song, Target | ||||||
|  | from .abstract import Page | ||||||
|  | from ..objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     FormattedText, | ||||||
|  |     ID3Timestamp | ||||||
|  | ) | ||||||
| from ..connection import Connection | from ..connection import Connection | ||||||
| from ..download import Page |  | ||||||
| from ..objects import (Album, Artist, DatabaseObject, FormattedText, |  | ||||||
|                        ID3Timestamp, Label, Song, Source, Target) |  | ||||||
| from ..utils.config import logging_settings, main_settings, youtube_settings |  | ||||||
| from ..utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ..utils.string_processing import clean_song_title | from ..utils.string_processing import clean_song_title | ||||||
|  | from ..utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| from ..utils.support_classes.download_result import DownloadResult | from ..utils.support_classes.download_result import DownloadResult | ||||||
| from ._youtube_music.super_youtube import (SuperYouTube, YouTubeUrl, | from ..utils.config import youtube_settings, main_settings, logging_settings | ||||||
|                                            YouTubeUrlType, get_invidious_url) | 
 | ||||||
|  | from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| - https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance | - https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance | ||||||
| @@ -28,7 +38,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class YouTube(SuperYouTube): | class YouTube(SuperYouTube): | ||||||
|     REGISTER = youtube_settings["use_youtube_alongside_youtube_music"] |     # CHANGE | ||||||
|     SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE |     SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE | ||||||
| 
 | 
 | ||||||
|     def __init__(self, *args, **kwargs): |     def __init__(self, *args, **kwargs): | ||||||
| @@ -3,6 +3,7 @@ from enum import Enum | |||||||
| 
 | 
 | ||||||
| from ...utils.config import logging_settings | from ...utils.config import logging_settings | ||||||
| from ...objects import Source, DatabaseObject | from ...objects import Source, DatabaseObject | ||||||
|  | from ..abstract import Page | ||||||
| from ...objects import ( | from ...objects import ( | ||||||
|     Artist, |     Artist, | ||||||
|     Source, |     Source, | ||||||
| @@ -6,6 +6,7 @@ from ...utils.string_processing import clean_song_title | |||||||
| from ...utils.enums import SourceType, ALL_SOURCE_TYPES | from ...utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| 
 | 
 | ||||||
| from ...objects import Source, DatabaseObject | from ...objects import Source, DatabaseObject | ||||||
|  | from ..abstract import Page | ||||||
| from ...objects import ( | from ...objects import ( | ||||||
|     Artist, |     Artist, | ||||||
|     Source, |     Source, | ||||||
| @@ -1,17 +1,26 @@ | |||||||
|  | from typing import List, Optional, Type, Tuple | ||||||
|  | from urllib.parse import urlparse, urlunparse, parse_qs | ||||||
| from enum import Enum | from enum import Enum | ||||||
| from typing import List, Optional, Tuple, Type |  | ||||||
| from urllib.parse import parse_qs, urlparse, urlunparse |  | ||||||
| 
 |  | ||||||
| import python_sponsorblock |  | ||||||
| import requests | import requests | ||||||
| 
 | 
 | ||||||
|  | import python_sponsorblock | ||||||
|  | 
 | ||||||
|  | from ...objects import Source, DatabaseObject, Song, Target | ||||||
|  | from ..abstract import Page | ||||||
|  | from ...objects import ( | ||||||
|  |     Artist, | ||||||
|  |     Source, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     FormattedText, | ||||||
|  |     ID3Timestamp | ||||||
|  | ) | ||||||
| from ...connection import Connection | from ...connection import Connection | ||||||
| from ...download import Page |  | ||||||
| from ...objects import (Album, Artist, DatabaseObject, FormattedText, |  | ||||||
|                         ID3Timestamp, Label, Song, Source, Target) |  | ||||||
| from ...utils.config import logging_settings, main_settings, youtube_settings |  | ||||||
| from ...utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ...utils.support_classes.download_result import DownloadResult | from ...utils.support_classes.download_result import DownloadResult | ||||||
|  | from ...utils.config import youtube_settings, logging_settings, main_settings | ||||||
|  | from ...utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: | def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: | ||||||
| @@ -1,33 +1,46 @@ | |||||||
| from __future__ import annotations, unicode_literals | from __future__ import unicode_literals, annotations | ||||||
| 
 | 
 | ||||||
| import json | from typing import Dict, List, Optional, Set, Type | ||||||
|  | from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode | ||||||
| import logging | import logging | ||||||
| import random | import random | ||||||
| import re | import json | ||||||
| from collections import defaultdict |  | ||||||
| from dataclasses import dataclass | from dataclasses import dataclass | ||||||
|  | import re | ||||||
| from functools import lru_cache | from functools import lru_cache | ||||||
| from typing import Dict, List, Optional, Set, Type | from collections import defaultdict | ||||||
| from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse |  | ||||||
| 
 | 
 | ||||||
| import youtube_dl | import youtube_dl | ||||||
| from youtube_dl.extractor.youtube import YoutubeIE | from youtube_dl.extractor.youtube import YoutubeIE | ||||||
| from youtube_dl.utils import DownloadError | from youtube_dl.utils import DownloadError | ||||||
| 
 | 
 | ||||||
| from ...connection import Connection |  | ||||||
| from ...download import Page |  | ||||||
| from ...objects import Album, Artist, Artwork |  | ||||||
| from ...objects import DatabaseObject as DataObject |  | ||||||
| from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song, |  | ||||||
|                         Source, Target) |  | ||||||
| from ...utils import dump_to_file, get_current_millis, traverse_json_path |  | ||||||
| from ...utils.config import logging_settings, main_settings, youtube_settings |  | ||||||
| from ...utils.enums import ALL_SOURCE_TYPES, SourceType |  | ||||||
| from ...utils.enums.album import AlbumType |  | ||||||
| from ...utils.exception.config import SettingValueError | from ...utils.exception.config import SettingValueError | ||||||
|  | from ...utils.config import main_settings, youtube_settings, logging_settings | ||||||
| from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING | from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING | ||||||
| from ...utils.string_processing import clean_song_title | from ...utils.string_processing import clean_song_title | ||||||
|  | from ...utils import get_current_millis, traverse_json_path | ||||||
|  | 
 | ||||||
|  | from ...utils import dump_to_file | ||||||
|  | 
 | ||||||
|  | from ..abstract import Page | ||||||
|  | from ...objects import ( | ||||||
|  |     DatabaseObject as DataObject, | ||||||
|  |     Source, | ||||||
|  |     FormattedText, | ||||||
|  |     ID3Timestamp, | ||||||
|  |     Artwork, | ||||||
|  |     Artist, | ||||||
|  |     Song, | ||||||
|  |     Album, | ||||||
|  |     Label, | ||||||
|  |     Target, | ||||||
|  |     Lyrics, | ||||||
|  | ) | ||||||
|  | from ...connection import Connection | ||||||
|  | from ...utils.enums import SourceType, ALL_SOURCE_TYPES | ||||||
|  | from ...utils.enums.album import AlbumType | ||||||
| from ...utils.support_classes.download_result import DownloadResult | from ...utils.support_classes.download_result import DownloadResult | ||||||
|  | 
 | ||||||
| from ._list_render import parse_renderer | from ._list_render import parse_renderer | ||||||
| from ._music_object_render import parse_run_element | from ._music_object_render import parse_run_element | ||||||
| from .super_youtube import SuperYouTube | from .super_youtube import SuperYouTube | ||||||
| @@ -59,6 +59,11 @@ Reference for the logging formats: https://docs.python.org/3/library/logging.htm | |||||||
|         description="The logger for the musify scraper.", |         description="The logger for the musify scraper.", | ||||||
|         default_value="musify" |         default_value="musify" | ||||||
|     ), |     ), | ||||||
|  |     LoggerAttribute( | ||||||
|  |         name="musicbrainz_logger", | ||||||
|  |         description="The logger for the musicbrainz scraper.", | ||||||
|  |         default_value="musicbrainz" | ||||||
|  |     ), | ||||||
|     LoggerAttribute( |     LoggerAttribute( | ||||||
|         name="youtube_logger", |         name="youtube_logger", | ||||||
|         description="The logger for the youtube scraper.", |         description="The logger for the youtube scraper.", | ||||||
|   | |||||||
| @@ -17,9 +17,6 @@ class SourceType: | |||||||
|     def register_page(self, page: Page): |     def register_page(self, page: Page): | ||||||
|         self.page = page |         self.page = page | ||||||
|  |  | ||||||
|     def deregister_page(self): |  | ||||||
|         self.page = None |  | ||||||
|  |  | ||||||
|     def __hash__(self): |     def __hash__(self): | ||||||
|         return hash(self.name) |         return hash(self.name) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -3,9 +3,6 @@ class MKBaseException(Exception): | |||||||
|         self.message = message |         self.message = message | ||||||
|         super().__init__(message, **kwargs) |         super().__init__(message, **kwargs) | ||||||
|  |  | ||||||
| # Compose exceptions. Those usually mean a bug on my side. |  | ||||||
| class MKComposeException(MKBaseException): |  | ||||||
|     pass |  | ||||||
|  |  | ||||||
| # Downloading | # Downloading | ||||||
| class MKDownloadException(MKBaseException): | class MKDownloadException(MKBaseException): | ||||||
|   | |||||||
| @@ -1,11 +1,11 @@ | |||||||
| import os |  | ||||||
| import random | import random | ||||||
| from pathlib import Path |  | ||||||
|  |  | ||||||
| from dotenv import load_dotenv | from dotenv import load_dotenv | ||||||
|  | from pathlib import Path | ||||||
|  | import os | ||||||
|  |  | ||||||
|  |  | ||||||
| from .config import main_settings |  | ||||||
| from .path_manager import LOCATIONS | from .path_manager import LOCATIONS | ||||||
|  | from .config import main_settings | ||||||
|  |  | ||||||
| if not load_dotenv(Path(__file__).parent.parent.parent / ".env"): | if not load_dotenv(Path(__file__).parent.parent.parent / ".env"): | ||||||
|     load_dotenv(Path(__file__).parent.parent.parent / ".env.example") |     load_dotenv(Path(__file__).parent.parent.parent / ".env.example") | ||||||
| @@ -51,6 +51,3 @@ have fun :3""".strip() | |||||||
| URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+" | URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+" | ||||||
| INT_PATTERN = r"^\d*$" | INT_PATTERN = r"^\d*$" | ||||||
| FLOAT_PATTERN = r"^[\d|\,|\.]*$" | FLOAT_PATTERN = r"^[\d|\,|\.]*$" | ||||||
|  |  | ||||||
|  |  | ||||||
| ALPHABET = "abcdefghijklmnopqrstuvwxyz" |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user