From 5ce76c758e9554185e4d09a9e3fe91194c80e2e9 Mon Sep 17 00:00:00 2001 From: Kur01234 Date: Tue, 2 Jul 2024 17:20:25 +0200 Subject: [PATCH] feat: genius fixes and duplicate detection --- music_kraken/download/page_attributes.py | 109 +++++++++++++---------- music_kraken/objects/artwork.py | 7 ++ music_kraken/objects/song.py | 2 +- music_kraken/pages/genius.py | 8 +- 4 files changed, 77 insertions(+), 49 deletions(-) diff --git a/music_kraken/download/page_attributes.py b/music_kraken/download/page_attributes.py index 248ce6c..788c13f 100644 --- a/music_kraken/download/page_attributes.py +++ b/music_kraken/download/page_attributes.py @@ -3,6 +3,7 @@ from collections import defaultdict from pathlib import Path import re import logging +import subprocess from PIL import Image @@ -76,33 +77,37 @@ if DEBUG_PAGES: class Pages: def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): self.LOGGER = logging.getLogger("download") - + self.download_options: DownloadOptions = download_options or DownloadOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions() # initialize all page instances self._page_instances: Dict[Type[Page], Page] = dict() self._source_to_page: Dict[SourceType, Type[Page]] = dict() - + exclude_pages = exclude_pages if exclude_pages is not None else set() - + if exclude_shady: exclude_pages = exclude_pages.union(SHADY_PAGES) - + if not exclude_pages.issubset(ALL_PAGES): - raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}") - + raise ValueError( + f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}") + def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]: return tuple(sorted(page_set, key=lambda page: page.__name__)) - + self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages) self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set) - self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES) - self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) - + self._audio_pages_set: Set[Type[Page] + ] = self._pages_set.intersection(AUDIO_PAGES) + self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple( + self._audio_pages_set) + for page_type in self.pages: - self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options) + self._page_instances[page_type] = page_type( + fetch_options=self.fetch_options, download_options=self.download_options) self._source_to_page[page_type.SOURCE_TYPE] = page_type def _get_page_from_enum(self, source_page: SourceType) -> Page: @@ -112,24 +117,26 @@ class Pages: def search(self, query: Query) -> SearchResults: result = SearchResults() - + for page_type in self.pages: result.add( page=page_type, - search_result=self._page_instances[page_type].search(query=query) + search_result=self._page_instances[page_type].search( + query=query) ) - + return result - + def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: if not isinstance(data_object, INDEPENDENT_DB_OBJECTS): return data_object - + source: Source for source in data_object.source_collection.get_sources(source_type_sorting={ "only_with_page": True, }): - new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) + new_data_object = self.fetch_from_source( + source=source, stop_at_level=stop_at_level) if new_data_object is not None: data_object.merge(new_data_object) @@ -138,14 +145,14 @@ class Pages: def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: if not source.has_page: return None - + source_type = source.page.get_source_type(source=source) if source_type is None: self.LOGGER.debug(f"Could not determine source type for {source}.") return None func = getattr(source.page, fetch_map[source_type]) - + # fetching the data object and marking it as fetched data_object: DataObject = func(source=source, **kwargs) data_object.mark_as_fetched(source.hash_url) @@ -155,21 +162,21 @@ class Pages: source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) if source is None: return None - + return self.fetch_from_source(source=source) - + def _skip_object(self, data_object: DataObject) -> bool: if isinstance(data_object, Album): if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: return True - - return False + return False def _fetch_artist_artwork(self, artist: Artist, naming: dict): naming: Dict[str, List[str]] = defaultdict(list, naming) naming["artist"].append(artist.name) - naming["label"].extend([l.title_value for l in artist.label_collection]) + naming["label"].extend( + [l.title_value for l in artist.label_collection]) # removing duplicates from the naming, and process the strings for key, value in naming.items(): # https://stackoverflow.com/a/17016257 @@ -182,8 +189,12 @@ class Pages: naming["image_number"] = [str(image_number)] target = Target( relative_to_music_dir=True, - file_path=Path(self._parse_path_template(main_settings["artist_artwork_path"], naming=naming)) + file_path=Path(self._parse_path_template( + main_settings["artist_artwork_path"], naming=naming)) ) + if not target.file_path.parent.exists(): + target.create_path() + subprocess.Popen(["gio", "set", target.file_path.parent, "metadata::custom-icon", "file://"+str(target.file_path)]) with Image.open(artwork_variant.target.file_path) as img: img.save(target.file_path, main_settings["image_format"]) artwork_variant.target = Target @@ -191,7 +202,8 @@ class Pages: def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: # fetch the given object self.fetch_details(data_object) - output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) + output( + f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) # fetching all parent objects (e.g. if you only download a song) if not kwargs.get("fetched_upwards", False): @@ -209,7 +221,7 @@ class Pages: new_to_fetch.extend(c) to_fetch = new_to_fetch - + kwargs["fetched_upwards"] = True naming = kwargs.get("naming", { @@ -247,13 +259,15 @@ class Pages: return set(re.findall(r"{([^}]+)}", path_template)) def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: - field_names: Set[str] = self._extract_fields_from_template(path_template) - + field_names: Set[str] = self._extract_fields_from_template( + path_template) + for field in field_names: if len(naming[field]) == 0: raise MKMissingNameException(f"Missing field for {field}.") - path_template = path_template.replace(f"{{{field}}}", naming[field][0]) + path_template = path_template.replace( + f"{{{field}}}", naming[field][0]) return path_template @@ -263,16 +277,17 @@ class Pages: Search the song in the file system. """ r = DownloadResult(total=1) - + # pre process the data recursively song.compile() - + # manage the naming naming: Dict[str, List[str]] = defaultdict(list, naming) naming["song"].append(song.title_value) naming["isrc"].append(song.isrc) naming["album"].extend(a.title_value for a in song.album_collection) - naming["album_type"].extend(a.album_type.value for a in song.album_collection) + naming["album_type"].extend( + a.album_type.value for a in song.album_collection) naming["artist"].extend(a.name for a in song.artist_collection) naming["artist"].extend(a.name for a in song.feature_artist_collection) for a in song.album_collection: @@ -289,13 +304,16 @@ class Pages: song.target_collection.append(Target( relative_to_music_dir=True, file_path=Path( - self._parse_path_template(main_settings["download_path"], naming=naming), - self._parse_path_template(main_settings["download_file"], naming=naming), + self._parse_path_template( + main_settings["download_path"], naming=naming), + self._parse_path_template( + main_settings["download_file"], naming=naming), ) )) for target in song.target_collection: if target.exists: - output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) + output( + f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) r.found_on_disk += 1 if not self.download_options.download_again_if_found: @@ -316,8 +334,10 @@ class Pages: break used_source = source - streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") - skip_intervals = source.page.get_skip_intervals(song=song, source=source) + streaming_results = source.page.download_song_to_target( + source=source, target=tmp, desc="download") + skip_intervals = source.page.get_skip_intervals( + song=song, source=source) # if something has been downloaded but it somehow failed, delete the file if streaming_results.is_fatal_error and tmp.exists: @@ -341,7 +361,8 @@ class Pages: used_source.page.post_process_hook(song=song, temp_target=tmp) if not found_on_disk or self.download_options.process_metadata_if_found: - write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) + write_metadata_to_target( + metadata=song.metadata, target=tmp, song=song) # copy the tmp target to the final locations for target in song.target_collection: @@ -352,12 +373,10 @@ class Pages: def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) - + if source is None: raise UrlNotFoundException(url=url) - - _actual_page = self._source_to_page[source.source_type] - - return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) - \ No newline at end of file + _actual_page = self._source_to_page[source.source_type] + + return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) diff --git a/music_kraken/objects/artwork.py b/music_kraken/objects/artwork.py index fcd69cf..9effaba 100644 --- a/music_kraken/objects/artwork.py +++ b/music_kraken/objects/artwork.py @@ -175,6 +175,7 @@ class ArtworkCollection: This will make the artworks ready for download and delete duplicates. """ artwork_hashes: list = list() + artwork_urls: list = list() for artwork in self._data: index = 0 for artwork_variant in artwork.variants: @@ -182,6 +183,12 @@ class ArtworkCollection: url=artwork_variant.url, name=artwork_variant.url, ) + + if artwork_variant.url in artwork_urls: + artwork.variants.pop(index) + continue + artwork_urls.append(artwork_variant.url) + target: Target = artwork_variant.target with target.open("wb") as f: f.write(r.content) diff --git a/music_kraken/objects/song.py b/music_kraken/objects/song.py index d66bb12..f39aa96 100644 --- a/music_kraken/objects/song.py +++ b/music_kraken/objects/song.py @@ -185,7 +185,7 @@ class Song(Base): return def _compile(self): - self.artwork.compile(self.target_collection.get(0)) + self.artwork.compile() INDEX_DEPENDS_ON = ("title", "isrc", "source_collection") diff --git a/music_kraken/pages/genius.py b/music_kraken/pages/genius.py index 3b8f184..c6414ba 100644 --- a/music_kraken/pages/genius.py +++ b/music_kraken/pages/genius.py @@ -1,4 +1,5 @@ -import json +import simplejson as json +from json_unescape import escape_json, unescape_json from enum import Enum from typing import List, Optional, Type from urllib.parse import urlencode, urlparse, urlunparse @@ -268,8 +269,9 @@ class Genius(Page): # get the contents that are between `JSON.parse('` and `');` content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ") if content is not None: - content = content.replace("\\\\", "\\").replace('\\"', '"').replace("\\'", "'") - data = json.loads(content) + #IMPLEMENT FIX FROM HAZEL + content = escape_json(content) + data = json.loads(content) lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None) if lyrics_html is not None: