from __future__ import annotations import logging import random import re from collections import defaultdict from copy import copy from dataclasses import dataclass, field from pathlib import Path from string import Formatter from typing import (TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Type, TypedDict, Union) import requests from bs4 import BeautifulSoup from ..audio import correct_codec, write_metadata_to_target from ..connection import Connection from ..objects import Album, Artist, Collection from ..objects import DatabaseObject as DataObject from ..objects import Label, Options, Song, Source, Target from ..utils import BColors, output, trace from ..utils.config import main_settings, youtube_settings from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums.album import AlbumType from ..utils.exception import MKMissingNameException from ..utils.exception.download import UrlNotFoundException from ..utils.path_manager import LOCATIONS from ..utils.shared import DEBUG_PAGES from ..utils.string_processing import fit_to_file_system from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.query import Query from .results import SearchResults @dataclass class FetchOptions: download_all: bool = False album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) @dataclass class DownloadOptions: download_all: bool = False album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"])) download_again_if_found: bool = False process_audio_if_found: bool = False process_metadata_if_found: bool = True fetch_map = { Song: "fetch_song", Album: "fetch_album", Artist: "fetch_artist", Label: "fetch_label", } class Downloader: def __init__( self, auto_register_pages: bool = True, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs ): self.LOGGER = logging.getLogger("download") self.download_options: DownloadOptions = download_options or DownloadOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions() self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set) if auto_register_pages: self.scan_for_pages(**kwargs) def register_page(self, page_type: Type[Page], **kwargs): if page_type in _registered_pages: return self._registered_pages[page_type].add(page_type( download_options=self.download_options, fetch_options=self.fetch_options, **kwargs )) def deregister_page(self, page_type: Type[Page]): if page_type not in _registered_pages: return for p in self._registered_pages[page_type]: p.__del__() del self._registered_pages[page_type] def scan_for_pages(self, **kwargs): # assuming the wanted pages are the leaf classes of the interface leaf_classes = [] class_list = [Page] while len(class_list): _class = class_list.pop() class_subclasses = _class.__subclasses__() if len(class_subclasses) == 0: if _class.REGISTER: leaf_classes.append(_class) else: class_list.extend(class_subclasses) for leaf_class in leaf_classes: self.register_page(leaf_class, **kwargs) def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]: if len(page_types) == 0: page_types = _registered_pages.keys() for page_type in page_types: yield from self._registered_pages[page_type] def search(self, query: Query) -> SearchResults: result = SearchResults() for page in self.get_pages(): result.add( page=type(page), search_result=page.search(query=query) ) return result def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: source: Source for source in data_object.source_collection.get_sources(source_type_sorting={ "only_with_page": True, }): new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) if new_data_object is not None: data_object.merge(new_data_object) return data_object def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: if not source.has_page: return None source_type = source.page.get_source_type(source=source) if source_type is None: self.LOGGER.debug(f"Could not determine source type for {source}.") return None func = getattr(source.page, fetch_map[source_type]) # fetching the data object and marking it as fetched data_object: DataObject = func(source=source, **kwargs) data_object.mark_as_fetched(source.hash_url) return data_object def fetch_from_url(self, url: str) -> Optional[DataObject]: source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) if source is None: return None return self.fetch_from_source(source=source) def _skip_object(self, data_object: DataObject) -> bool: if isinstance(data_object, Album): if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: return True return False def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: # fetch the given object self.fetch_details(data_object) output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) # fetching all parent objects (e.g. if you only download a song) if not kwargs.get("fetched_upwards", False): to_fetch: List[DataObject] = [data_object] while len(to_fetch) > 0: new_to_fetch = [] for d in to_fetch: if self._skip_object(d): continue self.fetch_details(d) for c in d.get_parent_collections(): new_to_fetch.extend(c) to_fetch = new_to_fetch kwargs["fetched_upwards"] = True # download all children download_result: DownloadResult = DownloadResult() for c in data_object.get_child_collections(): for d in c: if self._skip_object(d): continue download_result.merge(self.download(d, genre, **kwargs)) # actually download if the object is a song if isinstance(data_object, Song): """ TODO add the traced artist and album to the naming. I am able to do that, because duplicate values are removed later on. """ self._download_song(data_object, naming={ "genre": [genre], "audio_format": [main_settings["audio_format"]], }) return download_result def _extract_fields_from_template(self, path_template: str) -> Set[str]: return set(re.findall(r"{([^}]+)}", path_template)) def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: field_names: Set[str] = self._extract_fields_from_template(path_template) for field in field_names: if len(naming[field]) == 0: raise MKMissingNameException(f"Missing field for {field}.") path_template = path_template.replace(f"{{{field}}}", naming[field][0]) return path_template def _download_song(self, song: Song, naming: dict) -> DownloadOptions: """ TODO Search the song in the file system. """ r = DownloadResult(total=1) # pre process the data recursively song.compile() # manage the naming naming: Dict[str, List[str]] = defaultdict(list, naming) naming["song"].append(song.title_value) naming["isrc"].append(song.isrc) naming["album"].extend(a.title_value for a in song.album_collection) naming["album_type"].extend(a.album_type.value for a in song.album_collection) naming["artist"].extend(a.name for a in song.artist_collection) naming["artist"].extend(a.name for a in song.feature_artist_collection) for a in song.album_collection: naming["label"].extend([l.title_value for l in a.label_collection]) # removing duplicates from the naming, and process the strings for key, value in naming.items(): # https://stackoverflow.com/a/17016257 naming[key] = list(dict.fromkeys(value)) song.genre = naming["genre"][0] # manage the targets tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) song.target_collection.append(Target( relative_to_music_dir=True, file_path=Path( self._parse_path_template(main_settings["download_path"], naming=naming), self._parse_path_template(main_settings["download_file"], naming=naming), ) )) for target in song.target_collection: if target.exists: output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) r.found_on_disk += 1 if not self.download_options.download_again_if_found: target.copy_content(tmp) else: target.create_path() output(f'{target.file_path}', color=BColors.GREY) # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source used_source: Optional[Source] = None skip_intervals: List[Tuple[float, float]] = [] for source in song.source_collection.get_sources(source_type_sorting={ "only_with_page": True, "sort_key": lambda page: page.download_priority, "reverse": True, }): if tmp.exists: break used_source = source streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") skip_intervals = source.page.get_skip_intervals(song=song, source=source) # if something has been downloaded but it somehow failed, delete the file if streaming_results.is_fatal_error and tmp.exists: tmp.delete() # if everything went right, the file should exist now if not tmp.exists: if used_source is None: r.error_message = f"No source found for {song.option_string}." else: r.error_message = f"Something went wrong downloading {song.option_string}." return r # post process the audio found_on_disk = used_source is None if not found_on_disk or self.download_options.process_audio_if_found: correct_codec(target=tmp, skip_intervals=skip_intervals) r.sponsor_segments = len(skip_intervals) if used_source is not None: used_source.page.post_process_hook(song=song, temp_target=tmp) if not found_on_disk or self.download_options.process_metadata_if_found: write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) # copy the tmp target to the final locations for target in song.target_collection: tmp.copy_content(target) tmp.delete() return r def fetch_url(self, url: str, **kwargs) -> DataObject: source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) if source is None or source.page is None: raise UrlNotFoundException(url=url) return source.page.fetch_object_from_source(source=source, **kwargs) class Page: REGISTER = True SOURCE_TYPE: SourceType LOGGER: logging.Logger def __new__(cls, *args, **kwargs): cls.LOGGER = logging.getLogger(cls.__name__) return super().__new__(cls) def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs): self.SOURCE_TYPE.register_page(self) self.download_options: DownloadOptions = download_options or DownloadOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions() def __del__(self): self.SOURCE_TYPE.deregister_page() def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): """ Perform a regex search on the given string, using a single or a list of patterns returning the first matching group. In case of failure return a default value or raise a WARNING or a RegexNotFoundError, depending on fatal, specifying the field name. """ if isinstance(pattern, str): mobj = re.search(pattern, string, flags) else: for p in pattern: mobj = re.search(p, string, flags) if mobj: break if mobj: if group is None: # return the first matching group return next(g for g in mobj.groups() if g is not None) elif isinstance(group, (list, tuple)): return tuple(mobj.group(g) for g in group) else: return mobj.group(group) return default def get_source_type(self, source: Source) -> Optional[Type[DataObject]]: return None def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup: return BeautifulSoup(r.content, "html.parser") # to search stuff def search(self, query: Query) -> List[DataObject]: music_object = query.music_object search_functions = { Song: self.song_search, Album: self.album_search, Artist: self.artist_search, Label: self.label_search } if type(music_object) in search_functions: r = search_functions[type(music_object)](music_object) if r is not None and len(r) > 0: return r r = [] for default_query in query.default_search: for single_option in self.general_search(default_query): r.append(single_option) return r def general_search(self, search_query: str) -> List[DataObject]: return [] def label_search(self, label: Label) -> List[Label]: return [] def artist_search(self, artist: Artist) -> List[Artist]: return [] def album_search(self, album: Album) -> List[Album]: return [] def song_search(self, song: Song) -> List[Song]: return [] # to fetch stuff def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: return Song() def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: return Album() def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: return Artist() def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: return Label() # to download stuff def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: return [] def post_process_hook(self, song: Song, temp_target: Target, **kwargs): pass def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: return DownloadResult() class Option: """ This could represent a data object, a string or a page. """ def __init__(self, value: Any, text: Optional[str] = None, keys: Set[str] = None): self.value = value self.text = text or str(value) self.keys = keys or set() self.keys.add(self.text)