Compare commits

...

2 Commits

Author SHA1 Message Date
f6caee41a8 feat: finished searching genious
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 15:52:41 +02:00
068c749c38 feat: implemented artist search 2024-05-21 15:27:10 +02:00
4 changed files with 385 additions and 3 deletions

View File

@ -30,11 +30,12 @@ from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, INDEPENDENT_DB_OBJECTS
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum,
Genius,
Musify,
YoutubeMusic,
Bandcamp

View File

@ -3,5 +3,6 @@ from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp
from .genius import Genius
from .abstract import Page, INDEPENDENT_DB_OBJECTS

View File

@ -0,0 +1,380 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse, urlencode
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourceType,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection
from ..utils import dump_to_file, traverse_json_path
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..utils.shared import DEBUG
if DEBUG:
from ..utils import dump_to_file
class Genius(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS
HOST = "genius.com"
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://genius.com/",
logger=self.LOGGER,
module="genius",
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
path = source.parsed_url.path.replace("/", "")
return super().get_source_type(source)
def add_to_artwork(self, artwork: Artwork, url: str):
if url is None:
return
url_frags = url.split(".")
if len(url_frags) < 2:
artwork.append(url=url)
return
dimensions = url_frags[-2].split("x")
if len(dimensions) < 2:
artwork.append(url=url)
return
if len(dimensions) == 3:
dimensions = dimensions[:-1]
try:
artwork.append(url=url, width=int(dimensions[0]), height=int(dimensions[1]))
except ValueError:
artwork.append(url=url)
def parse_api_object(self, data: dict) -> Optional[DatabaseObject]:
object_type = data.get("_type")
artwork = Artwork()
self.add_to_artwork(artwork, data.get("header_image_url"))
self.add_to_artwork(artwork, data.get("image_url"))
source: Source = Source(self.SOURCE_TYPE, data.get("url"), additional_data={
"id": data.get("id"),
"slug": data.get("slug"),
"api_path": data.get("api_path"),
})
if source.url is None:
return None
if object_type == "artist":
return Artist(
name=data.get("name"),
source_list=[source],
artwork=artwork,
)
if object_type == "album":
self.add_to_artwork(artwork, data.get("cover_art_thumbnail_url"))
self.add_to_artwork(artwork, data.get("cover_art_url"))
return Album(
title=data.get("name"),
source_list=[source],
artist_list=[self.parse_api_object(data.get("artist"))],
artwork=artwork,
date=ID3Timestamp(**data.get("release_date_components", {})),
)
if object_type == "song":
self.add_to_artwork(artwork, data.get("song_art_image_thumbnail_url"))
self.add_to_artwork(artwork, data.get("song_art_image_url"))
main_artist_list = []
featured_artist_list = []
_artist_name = None
primary_artist = self.parse_api_object(data.get("primary_artist"))
if primary_artist is not None:
_artist_name = primary_artist.name
main_artist_list.append(primary_artist)
for feature_artist in data.get("featured_artists", []):
artist = self.parse_api_object(feature_artist)
if artist is not None:
featured_artist_list.append(artist)
return Song(
title=clean_song_title(data.get("title"), artist_name=_artist_name),
source_list=[source],
artwork=artwork,
feature_artist_list=featured_artist_list,
artist_list=main_artist_list,
)
return None
def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]:
results = []
search_params = {
"q": search_query,
}
r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}")
if r is None:
return results
dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
for elements in traverse_json_path(data, "response.sections", default=[]):
hits = elements.get("hits", [])
for hit in hits:
parsed = self.parse_api_object(hit.get("result"))
if parsed is not None:
results.append(parsed)
return results
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def _parse_artist_details(self, soup: BeautifulSoup) -> Artist:
name: str = None
source_list: List[Source] = []
contact_list: List[Contact] = []
band_name_location: BeautifulSoup = soup.find("p", {"id": "band-name-location"})
if band_name_location is not None:
title_span = band_name_location.find("span", {"class": "title"})
if title_span is not None:
name = title_span.text.strip()
link_container: BeautifulSoup = soup.find("ol", {"id": "band-links"})
if link_container is not None:
li: BeautifulSoup
for li in link_container.find_all("a"):
if li is None and li['href'] is not None:
continue
source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE))
return Artist(
name=name,
source_list=source_list
)
def _parse_album(self, soup: BeautifulSoup, initial_source: Source) -> List[Album]:
title = None
source_list: List[Source] = []
a = soup.find("a")
if a is not None and a["href"] is not None:
source_list.append(Source(self.SOURCE_TYPE, _get_host(initial_source) + a["href"]))
title_p = soup.find("p", {"class": "title"})
if title_p is not None:
title = title_p.text.strip()
return Album(title=title, source_list=source_list)
def _parse_artist_data_blob(self, data_blob: dict, artist_url: str):
parsed_artist_url = urlparse(artist_url)
album_list: List[Album] = []
for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []):
album_list.append(Album(
title=album_json["title"].strip(),
source_list=[Source(
self.SOURCE_TYPE,
urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", ""))
)]
))
return album_list
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
r = self.connection.get(_parse_artist_url(source.url), name=f"artist_{urlparse(source.url).scheme}_{urlparse(source.url).netloc}")
if r is None:
return artist
soup = self.get_soup_from_response(r)
if DEBUG:
dump_to_file("artist_page.html", r.text, exit_after_dump=False)
artist = self._parse_artist_details(soup=soup.find("div", {"id": "bio-container"}))
html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"):
artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"]
if DEBUG:
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None:
artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url)
)
artist.source_collection.append(source)
return artist
def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]:
lyrics_list: List[Lyrics] = []
_lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text")
if _lyrics is not None:
lyrics_list.append(Lyrics(text=FormattedText(plain=_lyrics)))
return Song(
title=clean_song_title(track["item"]["name"]),
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
tracksort=int(track["position"]),
artwork=artwork,
)
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album = Album()
r = self.connection.get(source.url, name=f"album_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('album', '')}")
if r is None:
return album
soup = self.get_soup_from_response(r)
data_container = soup.find("script", {"type": "application/ld+json"})
if DEBUG:
dump_to_file("album_data.json", data_container.text, is_json=True, exit_after_dump=False)
data = json.loads(data_container.text)
artist_data = data["byArtist"]
artist_source_list = []
if "@id" in artist_data:
artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
album = Album(
title=data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=artist_source_list
)]
)
artwork: Artwork = Artwork()
def _get_artwork_url(_data: dict) -> Optional[str]:
if "image" in _data:
return _data["image"]
for _property in _data.get("additionalProperty", []):
if _property.get("name") == "art_id":
return f"https://f4.bcbits.com/img/a{_property.get('value')}_2.jpg"
_artwork_url = _get_artwork_url(data)
if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350)
else:
for album_release in data.get("albumRelease", []):
_artwork_url = _get_artwork_url(album_release)
if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350)
break
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
if DEBUG:
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
try:
album.song_collection.append(self._parse_track_element(track_json, artwork=artwork))
except KeyError:
continue
album.source_collection.append(source)
return album
def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]:
track_lyrics = soup.find("div", {"class": "lyricsText"})
if track_lyrics:
return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))]
return []
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
r = self.connection.get(source.url, name=f"song_{urlparse(source.url).netloc.split('.')[0]}_{urlparse(source.url).path.replace('/', '').replace('track', '')}")
if r is None:
return Song()
soup = self.get_soup_from_response(r)
data_container = soup.find("script", {"type": "application/ld+json"})
other_data = {}
other_data_list = soup.select("script[data-tralbum]")
if len(other_data_list) > 0:
other_data = json.loads(other_data_list[0]["data-tralbum"])
dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False)
dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False)
dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False)
data = json.loads(data_container.text)
album_data = data["inAlbum"]
artist_data = data["byArtist"]
mp3_url = None
for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items():
mp3_url = value
song = Song(
title=clean_song_title(data["name"], artist_name=artist_data["name"]),
source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
album_list=[Album(
title=album_data["name"].strip(),
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)],
artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)],
lyrics_list=self._fetch_lyrics(soup=soup)
)
return song
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
if source.audio_url is None:
return DownloadResult(error_message="Couldn't find download link.")
return self.connection.stream_into(url=source.audio_url, target=target, description=desc)

View File

@ -15,11 +15,11 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_DUMP = DEBUG and True
DEBUG_PRINT_ID = DEBUG and True
if DEBUG: