from typing import List, Optional, Type from urllib.parse import urlparse, urlunparse, urlencode import json from enum import Enum from bs4 import BeautifulSoup import pycountry from ..objects import Source, DatabaseObject from .abstract import Page from ..objects import ( Artist, Source, SourceType, Song, Album, Label, Target, Contact, ID3Timestamp, Lyrics, FormattedText, Artwork, ) from ..connection import Connection from ..utils import dump_to_file, traverse_json_path from ..utils.enums import SourceType, ALL_SOURCE_TYPES from ..utils.support_classes.download_result import DownloadResult from ..utils.string_processing import clean_song_title from ..utils.config import main_settings, logging_settings from ..utils.shared import DEBUG if DEBUG: from ..utils import dump_to_file class Genius(Page): SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS HOST = "genius.com" def __init__(self, *args, **kwargs): self.connection: Connection = Connection( host="https://genius.com/", logger=self.LOGGER, module="genius", ) super().__init__(*args, **kwargs) def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: path = source.parsed_url.path.replace("/", "") if path.startswith("artists"): return Artist if path.startswith("albums"): return Album return Song def add_to_artwork(self, artwork: Artwork, url: str): if url is None: return url_frags = url.split(".") if len(url_frags) < 2: artwork.append(url=url) return dimensions = url_frags[-2].split("x") if len(dimensions) < 2: artwork.append(url=url) return if len(dimensions) == 3: dimensions = dimensions[:-1] try: artwork.append(url=url, width=int(dimensions[0]), height=int(dimensions[1])) except ValueError: artwork.append(url=url) def parse_api_object(self, data: dict) -> Optional[DatabaseObject]: object_type = data.get("_type") artwork = Artwork() self.add_to_artwork(artwork, data.get("header_image_url")) self.add_to_artwork(artwork, data.get("image_url")) additional_sources: List[Source] = [] source: Source = Source(self.SOURCE_TYPE, data.get("url"), additional_data={ "id": data.get("id"), "slug": data.get("slug"), "api_path": data.get("api_path"), }) notes = FormattedText() description = data.get("description", {}) if "html" in description: notes.html = description["html"] elif "markdown" in description: notes.markdown = description["markdown"] elif "description_preview" in data: notes.plain = data["description_preview"] if source.url is None: return None if object_type == "artist": if data.get("instagram_name") is not None: additional_sources.append(Source(ALL_SOURCE_TYPES.INSTAGRAM, f"https://www.instagram.com/{data['instagram_name']}/")) if data.get("facebook_name") is not None: additional_sources.append(Source(ALL_SOURCE_TYPES.FACEBOOK, f"https://www.facebook.com/{data['facebook_name']}/")) if data.get("twitter_name") is not None: additional_sources.append(Source(ALL_SOURCE_TYPES.TWITTER, f"https://x.com/{data['twitter_name']}/")) return Artist( name=data.get("name"), source_list=[source], artwork=artwork, notes=notes, ) if object_type == "album": self.add_to_artwork(artwork, data.get("cover_art_thumbnail_url")) self.add_to_artwork(artwork, data.get("cover_art_url")) return Album( title=data.get("name"), source_list=[source], artist_list=[self.parse_api_object(data.get("artist"))], artwork=artwork, date=ID3Timestamp(**data.get("release_date_components", {})), ) if object_type == "song": self.add_to_artwork(artwork, data.get("song_art_image_thumbnail_url")) self.add_to_artwork(artwork, data.get("song_art_image_url")) main_artist_list = [] featured_artist_list = [] _artist_name = None primary_artist = self.parse_api_object(data.get("primary_artist")) if primary_artist is not None: _artist_name = primary_artist.name main_artist_list.append(primary_artist) for feature_artist in data.get("featured_artists", []): artist = self.parse_api_object(feature_artist) if artist is not None: featured_artist_list.append(artist) return Song( title=clean_song_title(data.get("title"), artist_name=_artist_name), source_list=[source], artwork=artwork, feature_artist_list=featured_artist_list, artist_list=main_artist_list, ) return None def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]: results = [] search_params = { "q": search_query, } r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}") if r is None: return results dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False) data = r.json() for elements in traverse_json_path(data, "response.sections", default=[]): hits = elements.get("hits", []) for hit in hits: parsed = self.parse_api_object(hit.get("result")) if parsed is not None: results.append(parsed) return results def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: artist: Artist = Artist() # https://genius.com/api/artists/24527/albums?page=1 r = self.connection.get(source.url, name=source.url) if r is None: return artist soup = self.get_soup_from_response(r) # find the content attribute in the meta tag which is contained in the head data_container = soup.find("meta", {"itemprop": "page_data"}) if data_container is not None: content = data_container["content"] dump_to_file("genius_itemprop_artist.json", content, is_json=True, exit_after_dump=False) data = json.loads(content) artist = self.parse_api_object(data.get("artist", {})) for e in data.get("artist_albums", []): r = self.parse_api_object(e) if not isinstance(r, Album): continue artist.album_collection.append(r) for e in data.get("artist_songs", []): r = self.parse_api_object(e) if not isinstance(r, Song): continue """ TODO fetch the album for these songs, because the api doesn't return them """ artist.album_collection.extend(r.album_collection) artist.source_collection.append(source) return artist def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]: lyrics_list: List[Lyrics] = [] _lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text") if _lyrics is not None: lyrics_list.append(Lyrics(text=FormattedText(plain=_lyrics))) return Song( title=clean_song_title(track["item"]["name"]), source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])], tracksort=int(track["position"]), artwork=artwork, ) def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: album = Album() r = self.connection.get(source.url, name=f"album_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('album', '')}") if r is None: return album soup = self.get_soup_from_response(r) data_container = soup.find("script", {"type": "application/ld+json"}) if DEBUG: dump_to_file("album_data.json", data_container.text, is_json=True, exit_after_dump=False) data = json.loads(data_container.text) artist_data = data["byArtist"] artist_source_list = [] if "@id" in artist_data: artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] album = Album( title=data["name"].strip(), source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))], date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), artist_list=[Artist( name=artist_data["name"].strip(), source_list=artist_source_list )] ) artwork: Artwork = Artwork() def _get_artwork_url(_data: dict) -> Optional[str]: if "image" in _data: return _data["image"] for _property in _data.get("additionalProperty", []): if _property.get("name") == "art_id": return f"https://f4.bcbits.com/img/a{_property.get('value')}_2.jpg" _artwork_url = _get_artwork_url(data) if _artwork_url is not None: artwork.append(url=_artwork_url, width=350, height=350) else: for album_release in data.get("albumRelease", []): _artwork_url = _get_artwork_url(album_release) if _artwork_url is not None: artwork.append(url=_artwork_url, width=350, height=350) break for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])): if DEBUG: dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False) try: album.song_collection.append(self._parse_track_element(track_json, artwork=artwork)) except KeyError: continue album.source_collection.append(source) return album def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]: track_lyrics = soup.find("div", {"class": "lyricsText"}) if track_lyrics: return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))] return [] def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: r = self.connection.get(source.url, name=f"song_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('track', '')}") if r is None: return Song() soup = self.get_soup_from_response(r) data_container = soup.find("script", {"type": "application/ld+json"}) other_data = {} other_data_list = soup.select("script[data-tralbum]") if len(other_data_list) > 0: other_data = json.loads(other_data_list[0]["data-tralbum"]) dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False) dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False) dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False) data = json.loads(data_container.text) album_data = data["inAlbum"] artist_data = data["byArtist"] mp3_url = None for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items(): mp3_url = value song = Song( title=clean_song_title(data["name"], artist_name=artist_data["name"]), source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)], album_list=[Album( title=album_data["name"].strip(), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), source_list=[Source(self.SOURCE_TYPE, album_data["@id"])] )], artist_list=[Artist( name=artist_data["name"].strip(), source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] )], lyrics_list=self._fetch_lyrics(soup=soup) ) return song def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: if source.audio_url is None: return DownloadResult(error_message="Couldn't find download link.") return self.connection.stream_into(url=source.audio_url, target=target, description=desc)