from __future__ import annotations import logging import random import re from collections import defaultdict from copy import copy from dataclasses import dataclass, field from pathlib import Path from string import Formatter from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Type, TypedDict, Union) import requests from bs4 import BeautifulSoup from ..audio import correct_codec, write_metadata_to_target from ..connection import Connection from ..objects import Album, Artist, Collection from ..objects import DatabaseObject as DataObject from ..objects import Label, Options, Song, Source, Target from ..utils import BColors, limit_generator, output, trace from ..utils.config import main_settings, youtube_settings from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums.album import AlbumType from ..utils.exception import MKComposeException, MKMissingNameException from ..utils.exception.download import UrlNotFoundException from ..utils.path_manager import LOCATIONS from ..utils.shared import DEBUG_PAGES from ..utils.string_processing import fit_to_file_system from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.query import Query from .results import SearchResults @dataclass class FetchOptions: download_all: bool = False album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) @dataclass class DownloadOptions: download_all: bool = False album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) download_again_if_found: bool = False process_audio_if_found: bool = False process_metadata_if_found: bool = True fetch_map = { Song: "fetch_song", Album: "fetch_album", Artist: "fetch_artist", Label: "fetch_label", } class Downloader: def __init__( self, auto_register_pages: bool = True, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs ): self.LOGGER = logging.getLogger("download") self.download_options: DownloadOptions = download_options or DownloadOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions() self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set) if auto_register_pages: self.scan_for_pages(**kwargs) # manage which pages to use def register_page(self, page_type: Type[Page], **kwargs): if page_type in self._registered_pages: return self._registered_pages[page_type].add(page_type( download_options=self.download_options, fetch_options=self.fetch_options, **kwargs )) def deregister_page(self, page_type: Type[Page]): if page_type not in _registered_pages: return for p in self._registered_pages[page_type]: p.__del__() del self._registered_pages[page_type] def scan_for_pages(self, **kwargs): # assuming the wanted pages are the leaf classes of the interface from .. import pages leaf_classes = [] class_list = [Page] while len(class_list): _class = class_list.pop() class_subclasses = _class.__subclasses__() if len(class_subclasses) == 0: if _class.REGISTER: leaf_classes.append(_class) else: class_list.extend(class_subclasses) if Page in leaf_classes: self.LOGGER.warn("couldn't find any data source") return for leaf_class in leaf_classes: self.register_page(leaf_class, **kwargs) def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]: if len(page_types) == 0: page_types = self._registered_pages.keys() for page_type in page_types: yield from self._registered_pages[page_type] # fetching/downloading data def search(self, query: Query) -> Generator[DataObject, None, None]: """Yields all data objects that were found by the query. Other than `Downloader.search_yield_pages`, this function just yields all data objects. This looses the data, where th objects were searched originally, so this might not be the best choice. Args: query (Query): The query to search for. Yields: Generator[DataObject, None, None]: A generator that yields all found data objects. """ for page in self.get_pages(): yield from page.search(query=query) def search_yield_pages(self, query: Query, results_per_page: Optional[int] = None) -> Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: """Yields all data objects that were found by the query, grouped by the page they were found on. every yield is a tuple of the page and a generator that yields the data objects. So this could be how it is used: ```python for page, data_objects in downloader.search_yield_pages(query): print(f"Found on {page}:") for data_object in data_objects: print(data_object) ``` Args: query (Query): The query to search for. results_per_page (Optional[int], optional): If this is set, the generators only yield this amount of data objects per page. Yields: Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: yields the page and a generator that yields the data objects. """ for page in self.get_pages(): yield page, limit_generator(page.search(query=query), limit=results_per_page) def fetch_details(self, data_object: DataObject, **kwargs) -> DataObject: """Fetches more details for the given data object. This uses every source contained in data_object.source_collection that has a page. Args: data_object (DataObject): The data object to fetch details for. Returns: DataObject: The same data object, but with more details. """ source: Source for source in data_object.source_collection.get_sources(source_type_sorting={ "only_with_page": True, }): new_data_object = self.fetch_from_source(source=source, **kwargs) if new_data_object is not None: data_object.merge(new_data_object) return data_object def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: """Gets a data object from the given source. Args: source (Source): The source to get the data object from. Returns: Optional[DataObject]: If a data object can be retrieved, it is returned. Otherwise, None is returned. """ if not source.has_page: return None source_type = source.page.get_source_type(source=source) if source_type is None: self.LOGGER.debug(f"Could not determine source type for {source}.") return None func = getattr(source.page, fetch_map[source_type]) # fetching the data object and marking it as fetched data_object: DataObject = func(source=source, **kwargs) data_object.mark_as_fetched(source.hash_url) return data_object def fetch_from_url(self, url: str) -> Optional[DataObject]: """This function tries to detect the source of the given url and fetches the data object from it. Args: url (str): The url to fetch the data object from. Returns: Optional[DataObject]: The fetched data object, or None if no source could be detected or if no object could be retrieved. """ source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) if source is None: return None return self.fetch_from_source(source=source) def _skip_object(self, data_object: DataObject) -> bool: """Determine if the given data object should be downloaded or not. Args: data_object (DataObject): The data object in question. Returns: bool: Returns True if the given data object should be skipped. """ if isinstance(data_object, Album): if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: return True return False def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: """Downloads the given data object. It will recursively fetch all available details for this and every related object. Then it will create the folder structure and download the audio file. In the end the metadata will be written to the file. Args: data_object (DataObject): The data object to download. If it is a artist, it will download the whole discography, if it is an Album it will download the whole Tracklist. Returns: DownloadResult: Relevant information about the download success. """ # fetch the given object self.fetch_details(data_object) output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) # fetching all parent objects (e.g. if you only download a song) if not kwargs.get("fetched_upwards", False): to_fetch: List[DataObject] = [data_object] while len(to_fetch) > 0: new_to_fetch = [] for d in to_fetch: if self._skip_object(d): continue self.fetch_details(d) for c in d.get_parent_collections(): new_to_fetch.extend(c) to_fetch = new_to_fetch kwargs["fetched_upwards"] = True # download all children download_result: DownloadResult = DownloadResult() for c in data_object.get_child_collections(): for d in c: if self._skip_object(d): continue download_result.merge(self.download(d, genre, **kwargs)) # actually download if the object is a song if isinstance(data_object, Song): """ TODO add the traced artist and album to the naming. I am able to do that, because duplicate values are removed later on. """ self._download_song(data_object, naming={ "genre": [genre], "audio_format": [main_settings["audio_format"]], }) return download_result def _extract_fields_from_template(self, path_template: str) -> Set[str]: return set(re.findall(r"{([^}]+)}", path_template)) def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: field_names: Set[str] = self._extract_fields_from_template(path_template) for field in field_names: if len(naming[field]) == 0: raise MKMissingNameException(f"Missing field for {field}.") path_template = path_template.replace(f"{{{field}}}", naming[field][0]) return path_template def _download_song(self, song: Song, naming: dict) -> DownloadOptions: """ TODO Search the song in the file system. """ r = DownloadResult(total=1) # pre process the data recursively song.compile() # manage the naming naming: Dict[str, List[str]] = defaultdict(list, naming) naming["song"].append(song.title_value) naming["isrc"].append(song.isrc) naming["album"].extend(a.title_value for a in song.album_collection) naming["album_type"].extend(a.album_type.value for a in song.album_collection) naming["artist"].extend(a.name for a in song.artist_collection) naming["artist"].extend(a.name for a in song.feature_artist_collection) for a in song.album_collection: naming["label"].extend([l.title_value for l in a.label_collection]) # removing duplicates from the naming, and process the strings for key, value in naming.items(): # https://stackoverflow.com/a/17016257 naming[key] = list(dict.fromkeys(value)) song.genre = naming["genre"][0] # manage the targets tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) song.target_collection.append(Target( relative_to_music_dir=True, file_path=Path( self._parse_path_template(main_settings["download_path"], naming=naming), self._parse_path_template(main_settings["download_file"], naming=naming), ) )) for target in song.target_collection: if target.exists: output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) r.found_on_disk += 1 if not self.download_options.download_again_if_found: target.copy_content(tmp) else: target.create_path() output(f'{target.file_path}', color=BColors.GREY) # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source used_source: Optional[Source] = None skip_intervals: List[Tuple[float, float]] = [] for source in song.source_collection.get_sources(source_type_sorting={ "only_with_page": True, "sort_key": lambda page: page.download_priority, "reverse": True, }): if tmp.exists: break used_source = source streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") skip_intervals = source.page.get_skip_intervals(song=song, source=source) # if something has been downloaded but it somehow failed, delete the file if streaming_results.is_fatal_error and tmp.exists: tmp.delete() # if everything went right, the file should exist now if not tmp.exists: if used_source is None: r.error_message = f"No source found for {song.option_string}." else: r.error_message = f"Something went wrong downloading {song.option_string}." return r # post process the audio found_on_disk = used_source is None if not found_on_disk or self.download_options.process_audio_if_found: correct_codec(target=tmp, skip_intervals=skip_intervals) r.sponsor_segments = len(skip_intervals) if used_source is not None: used_source.page.post_process_hook(song=song, temp_target=tmp) if not found_on_disk or self.download_options.process_metadata_if_found: write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) # copy the tmp target to the final locations for target in song.target_collection: tmp.copy_content(target) tmp.delete() return r def fetch_url(self, url: str, **kwargs) -> DataObject: source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) if source is None or source.page is None: raise UrlNotFoundException(url=url) return source.page.fetch_object_from_source(source=source, **kwargs) # misc function def get_existing_genres(self) -> Generator[str, None, None]: """Yields every existing genre, for the user to select from. Yields: Generator[str, None, None]: a generator that yields every existing genre. """ def is_valid_genre(genre: Path) -> bool: """ gets the name of all subdirectories of shared.MUSIC_DIR, but filters out all directories, where the name matches with any Patern from shared.NOT_A_GENRE_REGEX. """ if not genre.is_dir(): return False if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): return False return True for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()): yield genre.name class Page: REGISTER = True SOURCE_TYPE: SourceType LOGGER: logging.Logger def __new__(cls, *args, **kwargs): cls.LOGGER = logging.getLogger(cls.__name__) return super().__new__(cls) @classmethod def is_leaf_page(cls) -> bool: return len(cls.__subclasses__()) == 0 def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs): self.SOURCE_TYPE.register_page(self) self.download_options: DownloadOptions = download_options or DownloadOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions() def __del__(self): self.SOURCE_TYPE.deregister_page() def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): """ Perform a regex search on the given string, using a single or a list of patterns returning the first matching group. In case of failure return a default value or raise a WARNING or a RegexNotFoundError, depending on fatal, specifying the field name. """ if isinstance(pattern, str): mobj = re.search(pattern, string, flags) else: for p in pattern: mobj = re.search(p, string, flags) if mobj: break if mobj: if group is None: # return the first matching group return next(g for g in mobj.groups() if g is not None) elif isinstance(group, (list, tuple)): return tuple(mobj.group(g) for g in group) else: return mobj.group(group) return default def get_source_type(self, source: Source) -> Optional[Type[DataObject]]: return None def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: return BeautifulSoup(r.content, "html.parser") # to search stuff def search(self, query: Query) -> List[DataObject]: music_object = query.music_object search_functions = { Song: self.song_search, Album: self.album_search, Artist: self.artist_search, Label: self.label_search } if type(music_object) in search_functions: r = search_functions[type(music_object)](music_object) if r is not None and len(r) > 0: return r r = [] for default_query in query.default_search: for single_option in self.general_search(default_query): r.append(single_option) return r def general_search(self, search_query: str) -> List[DataObject]: return [] def label_search(self, label: Label) -> List[Label]: return [] def artist_search(self, artist: Artist) -> List[Artist]: return [] def album_search(self, album: Album) -> List[Album]: return [] def song_search(self, song: Song) -> List[Song]: return [] # to fetch stuff def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: return Song() def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: return Album() def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: return Artist() def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: return Label() # to download stuff def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: return [] def post_process_hook(self, song: Song, temp_target: Target, **kwargs): pass def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: return DownloadResult()