From 068c749c385b51479912bf4cb65c7b005efd5cca Mon Sep 17 00:00:00 2001 From: Lars Noack Date: Tue, 21 May 2024 15:27:10 +0200 Subject: [PATCH] feat: implemented artist search --- music_kraken/download/page_attributes.py | 3 +- music_kraken/pages/__init__.py | 1 + music_kraken/pages/genius.py | 312 +++++++++++++++++++++++ music_kraken/utils/shared.py | 4 +- 4 files changed, 317 insertions(+), 3 deletions(-) create mode 100644 music_kraken/pages/genius.py diff --git a/music_kraken/download/page_attributes.py b/music_kraken/download/page_attributes.py index 997960d..1db24be 100644 --- a/music_kraken/download/page_attributes.py +++ b/music_kraken/download/page_attributes.py @@ -30,11 +30,12 @@ from ..utils.exception import MKMissingNameException from ..utils.exception.download import UrlNotFoundException from ..utils.shared import DEBUG_PAGES -from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, INDEPENDENT_DB_OBJECTS +from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS ALL_PAGES: Set[Type[Page]] = { # EncyclopaediaMetallum, + Genius, Musify, YoutubeMusic, Bandcamp diff --git a/music_kraken/pages/__init__.py b/music_kraken/pages/__init__.py index 5757a2c..ba24501 100644 --- a/music_kraken/pages/__init__.py +++ b/music_kraken/pages/__init__.py @@ -3,5 +3,6 @@ from .musify import Musify from .youtube import YouTube from .youtube_music import YoutubeMusic from .bandcamp import Bandcamp +from .genius import Genius from .abstract import Page, INDEPENDENT_DB_OBJECTS diff --git a/music_kraken/pages/genius.py b/music_kraken/pages/genius.py new file mode 100644 index 0000000..7a609c8 --- /dev/null +++ b/music_kraken/pages/genius.py @@ -0,0 +1,312 @@ +from typing import List, Optional, Type +from urllib.parse import urlparse, urlunparse, urlencode +import json +from enum import Enum +from bs4 import BeautifulSoup +import pycountry + +from ..objects import Source, DatabaseObject +from .abstract import Page +from ..objects import ( + Artist, + Source, + SourceType, + Song, + Album, + Label, + Target, + Contact, + ID3Timestamp, + Lyrics, + FormattedText, + Artwork, +) +from ..connection import Connection +from ..utils import dump_to_file, traverse_json_path +from ..utils.enums import SourceType, ALL_SOURCE_TYPES +from ..utils.support_classes.download_result import DownloadResult +from ..utils.string_processing import clean_song_title +from ..utils.config import main_settings, logging_settings +from ..utils.shared import DEBUG + +if DEBUG: + from ..utils import dump_to_file + + +class Genius(Page): + SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS + HOST = "genius.com" + + def __init__(self, *args, **kwargs): + self.connection: Connection = Connection( + host="https://genius.com/", + logger=self.LOGGER, + module="genius", + ) + + super().__init__(*args, **kwargs) + + def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: + path = source.parsed_url.path.replace("/", "") + + return super().get_source_type(source) + + def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]: + results = [] + + search_params = { + "q": search_query, + } + + r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}") + if r is None: + return results + + dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False) + data = r.json() + + for elements in traverse_json_path(data, "response.sections", default=[]): + hits = elements.get("hits", []) + for hit in hits: + result = hit.get("result", {}) + hit_type = hit.get("type", result.get("_type")) + + name = result.get("name") + if name is None: + continue + source = Source(self.SOURCE_TYPE, result.get("url"), additional_data={ + "id": result.get("id"), + }) + if source.url is None: + continue + image_url = result.get("header_image_url") + + if hit_type == "artist": + results.append(Artist( + name=name, + source_list=[source] + )) + continue + + return results + + def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: + return Label() + + def _parse_artist_details(self, soup: BeautifulSoup) -> Artist: + name: str = None + source_list: List[Source] = [] + contact_list: List[Contact] = [] + + band_name_location: BeautifulSoup = soup.find("p", {"id": "band-name-location"}) + if band_name_location is not None: + title_span = band_name_location.find("span", {"class": "title"}) + if title_span is not None: + name = title_span.text.strip() + + link_container: BeautifulSoup = soup.find("ol", {"id": "band-links"}) + if link_container is not None: + li: BeautifulSoup + for li in link_container.find_all("a"): + if li is None and li['href'] is not None: + continue + + source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE)) + + return Artist( + name=name, + source_list=source_list + ) + + def _parse_album(self, soup: BeautifulSoup, initial_source: Source) -> List[Album]: + title = None + source_list: List[Source] = [] + + a = soup.find("a") + if a is not None and a["href"] is not None: + source_list.append(Source(self.SOURCE_TYPE, _get_host(initial_source) + a["href"])) + + title_p = soup.find("p", {"class": "title"}) + if title_p is not None: + title = title_p.text.strip() + + return Album(title=title, source_list=source_list) + + def _parse_artist_data_blob(self, data_blob: dict, artist_url: str): + parsed_artist_url = urlparse(artist_url) + album_list: List[Album] = [] + + for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []): + album_list.append(Album( + title=album_json["title"].strip(), + source_list=[Source( + self.SOURCE_TYPE, + urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", "")) + )] + )) + + return album_list + + def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: + artist = Artist() + + r = self.connection.get(_parse_artist_url(source.url), name=f"artist_{urlparse(source.url).scheme}_{urlparse(source.url).netloc}") + if r is None: + return artist + + soup = self.get_soup_from_response(r) + + if DEBUG: + dump_to_file("artist_page.html", r.text, exit_after_dump=False) + + artist = self._parse_artist_details(soup=soup.find("div", {"id": "bio-container"})) + + html_music_grid = soup.find("ol", {"id": "music-grid"}) + if html_music_grid is not None: + for subsoup in html_music_grid.find_all("li"): + artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source)) + + for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})): + data_blob = data_blob_soup["data-blob"] + + if DEBUG: + dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False) + + if data_blob is not None: + artist.album_collection.extend( + self._parse_artist_data_blob(json.loads(data_blob), source.url) + ) + + artist.source_collection.append(source) + return artist + + def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]: + lyrics_list: List[Lyrics] = [] + + _lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text") + if _lyrics is not None: + lyrics_list.append(Lyrics(text=FormattedText(plain=_lyrics))) + + return Song( + title=clean_song_title(track["item"]["name"]), + source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])], + tracksort=int(track["position"]), + artwork=artwork, + ) + + def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: + album = Album() + + r = self.connection.get(source.url, name=f"album_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('album', '')}") + if r is None: + return album + + soup = self.get_soup_from_response(r) + + data_container = soup.find("script", {"type": "application/ld+json"}) + + if DEBUG: + dump_to_file("album_data.json", data_container.text, is_json=True, exit_after_dump=False) + + data = json.loads(data_container.text) + artist_data = data["byArtist"] + + artist_source_list = [] + if "@id" in artist_data: + artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] + album = Album( + title=data["name"].strip(), + source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))], + date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), + artist_list=[Artist( + name=artist_data["name"].strip(), + source_list=artist_source_list + )] + ) + + artwork: Artwork = Artwork() + + def _get_artwork_url(_data: dict) -> Optional[str]: + if "image" in _data: + return _data["image"] + for _property in _data.get("additionalProperty", []): + if _property.get("name") == "art_id": + return f"https://f4.bcbits.com/img/a{_property.get('value')}_2.jpg" + + _artwork_url = _get_artwork_url(data) + if _artwork_url is not None: + artwork.append(url=_artwork_url, width=350, height=350) + else: + for album_release in data.get("albumRelease", []): + _artwork_url = _get_artwork_url(album_release) + if _artwork_url is not None: + artwork.append(url=_artwork_url, width=350, height=350) + break + + + for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])): + if DEBUG: + dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False) + + try: + album.song_collection.append(self._parse_track_element(track_json, artwork=artwork)) + except KeyError: + continue + + album.source_collection.append(source) + return album + + def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]: + track_lyrics = soup.find("div", {"class": "lyricsText"}) + if track_lyrics: + return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))] + + return [] + + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: + r = self.connection.get(source.url, name=f"song_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('track', '')}") + if r is None: + return Song() + + soup = self.get_soup_from_response(r) + + data_container = soup.find("script", {"type": "application/ld+json"}) + other_data = {} + + other_data_list = soup.select("script[data-tralbum]") + if len(other_data_list) > 0: + other_data = json.loads(other_data_list[0]["data-tralbum"]) + + dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False) + dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False) + dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False) + + data = json.loads(data_container.text) + album_data = data["inAlbum"] + artist_data = data["byArtist"] + + mp3_url = None + for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items(): + mp3_url = value + + song = Song( + title=clean_song_title(data["name"], artist_name=artist_data["name"]), + source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)], + album_list=[Album( + title=album_data["name"].strip(), + date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), + source_list=[Source(self.SOURCE_TYPE, album_data["@id"])] + )], + artist_list=[Artist( + name=artist_data["name"].strip(), + source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] + )], + lyrics_list=self._fetch_lyrics(soup=soup) + ) + + return song + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: + if source.audio_url is None: + return DownloadResult(error_message="Couldn't find download link.") + return self.connection.stream_into(url=source.audio_url, target=target, description=desc) diff --git a/music_kraken/utils/shared.py b/music_kraken/utils/shared.py index 2a5d4a4..b75cf7f 100644 --- a/music_kraken/utils/shared.py +++ b/music_kraken/utils/shared.py @@ -15,11 +15,11 @@ __stage__ = os.getenv("STAGE", "prod") DEBUG = (__stage__ == "dev") and True DEBUG_LOGGING = DEBUG and False DEBUG_TRACE = DEBUG and True -DEBUG_OBJECT_TRACE = DEBUG and True +DEBUG_OBJECT_TRACE = DEBUG and False DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_PAGES = DEBUG and False -DEBUG_DUMP = DEBUG and False +DEBUG_DUMP = DEBUG and True DEBUG_PRINT_ID = DEBUG and True if DEBUG: