fix: metal archives artist sources

This commit is contained in:
Hazel 2023-09-13 16:01:01 +02:00
parent 8091a9ffb0
commit 13b9c0b35e
11 changed files with 129 additions and 47 deletions

View File

@ -35,4 +35,10 @@ if __name__ == "__main__":
"d: 2" "d: 2"
] ]
music_kraken.cli.download(genre="test", command_list=youtube_music_test, process_metadata_anyway=True) cross_download = [
"s: #a Psychonaut 4",
"2",
"d: 0"
]
music_kraken.cli.download(genre="test", command_list=cross_download, process_metadata_anyway=True)

View File

@ -15,7 +15,7 @@ from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic,
ALL_PAGES: Set[Type[Page]] = { ALL_PAGES: Set[Type[Page]] = {
EncyclopaediaMetallum, EncyclopaediaMetallum,
Musify, Musify,
YoutubeMusic, # YoutubeMusic,
Bandcamp Bandcamp
} }
@ -97,7 +97,11 @@ class Pages:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS): if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.") return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.")
_page_types = set(self._source_to_page[src] for src in music_object.source_collection.source_pages) _page_types = set()
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
audio_pages = self._audio_pages_set.intersection(_page_types) audio_pages = self._audio_pages_set.intersection(_page_types)
for download_page in audio_pages: for download_page in audio_pages:

View File

@ -27,7 +27,7 @@ class Results:
self._page_by_index = dict() self._page_by_index = dict()
def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]: def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]:
# if this throws a key error, either the formated generator needs to be iterated, or the option doesn't exist. # if this throws a key error, either the formatted generator needs to be iterated, or the option doesn't exist.
return self._page_by_index[index], self._by_index[index] return self._page_by_index[index], self._by_index[index]
def delete_details(self, exclude_index: int): def delete_details(self, exclude_index: int):

View File

@ -81,6 +81,8 @@ class Collection(Generic[T]):
:param merge_into_existing: :param merge_into_existing:
:return did_not_exist: :return did_not_exist:
""" """
if element is None:
return AppendResult(False, None, False)
# if the element type has been defined in the initializer it checks if the type matches # if the element type has been defined in the initializer it checks if the type matches
if self.element_type is not None and not isinstance(element, self.element_type): if self.element_type is not None and not isinstance(element, self.element_type):

View File

@ -31,7 +31,7 @@ class DatabaseObject:
""" """
_id = random.randint(0, HIGHEST_ID) _id = random.randint(0, HIGHEST_ID)
self.automatic_id = True self.automatic_id = True
LOGGER.debug(f"Id for {type(self).__name__} isn't set. Setting to {_id}") # LOGGER.debug(f"Id for {type(self).__name__} isn't set. Setting to {_id}")
# The id can only be None, if the object is dynamic (self.dynamic = True) # The id can only be None, if the object is dynamic (self.dynamic = True)
self.id: Optional[int] = _id self.id: Optional[int] = _id

View File

@ -649,11 +649,12 @@ class Artist(MainObject):
return metadata return metadata
def __str__(self): def __str__(self, include_notes: bool = False):
string = self.name or "" string = self.name or ""
plaintext_notes = self.notes.get_plaintext() if include_notes:
if plaintext_notes is not None: plaintext_notes = self.notes.get_plaintext()
string += "\n" + plaintext_notes if plaintext_notes is not None:
string += "\n" + plaintext_notes
return string return string
def __repr__(self): def __repr__(self):

View File

@ -153,11 +153,12 @@ def build_new_object(new_object: DatabaseObject) -> DatabaseObject:
return new_object return new_object
def merge_together(old_object: DatabaseObject, new_object: DatabaseObject) -> DatabaseObject: def merge_together(old_object: DatabaseObject, new_object: DatabaseObject, do_compile: bool = True) -> DatabaseObject:
new_object = clean_object(new_object) new_object = clean_object(new_object)
old_object.merge(new_object) old_object.merge(new_object)
old_object.compile(merge_into=False) if do_compile:
old_object.compile(merge_into=False)
return old_object return old_object
@ -246,7 +247,7 @@ class Page:
return [] return []
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1) -> DatabaseObject: def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1, post_process: bool = True) -> DatabaseObject:
""" """
when a music object with lacking data is passed in, it returns when a music object with lacking data is passed in, it returns
the SAME object **(no copy)** with more detailed data. the SAME object **(no copy)** with more detailed data.
@ -270,22 +271,22 @@ class Page:
if isinstance(music_object, INDEPENDENT_DB_OBJECTS): if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source source: Source
for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE): for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE):
new_music_object.merge( new_music_object.merge(self.fetch_object_from_source(
self.fetch_object_from_source( source=source,
source=source, enforce_type=type(music_object),
enforce_type=type(music_object), stop_at_level=stop_at_level,
stop_at_level=stop_at_level, post_process=False
post_process=False ))
)
)
return merge_together(music_object, new_music_object) return merge_together(music_object, new_music_object, do_compile=post_process)
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]: def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]:
obj_type = self.get_source_type(source) obj_type = self.get_source_type(
source)
print("obj type", obj_type, self)
if obj_type is None: if obj_type is None:
return None return None
if enforce_type != obj_type and enforce_type is not None: if enforce_type != obj_type and enforce_type is not None:
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}") self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
return None return None
@ -298,13 +299,21 @@ class Page:
Artist: self.fetch_artist, Artist: self.fetch_artist,
Label: self.fetch_label Label: self.fetch_label
} }
if obj_type in fetch_map: if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level) music_object = fetch_map[obj_type](source, stop_at_level)
else: else:
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}") self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
return None return None
if stop_at_level > 1:
collection: Collection
for collection_str in music_object.DOWNWARDS_COLLECTION_ATTRIBUTES:
collection = music_object.__getattribute__(collection_str)
for sub_element in collection:
sub_element.merge(self.fetch_details(sub_element, stop_at_level=stop_at_level-1, post_process=False))
if post_process and music_object: if post_process and music_object:
return build_new_object(music_object) return build_new_object(music_object)
@ -323,6 +332,10 @@ class Page:
return Label() return Label()
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
# print("downloading")
self.fetch_details(music_object, stop_at_level=2)
naming_dict: NamingDict = NamingDict({"genre": genre}) naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject): def fill_naming_objects(naming_music_object: DatabaseObject):

View File

@ -1,10 +1,9 @@
from typing import List, Optional, Type from typing import List, Optional, Type
from urllib.parse import urlparse from urllib.parse import urlparse, urlunparse
import json import json
from enum import Enum from enum import Enum
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pycountry import pycountry
import demjson3
from ..objects import Source, DatabaseObject from ..objects import Source, DatabaseObject
from .abstract import Page from .abstract import Page
@ -51,12 +50,13 @@ class Bandcamp(Page):
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
parsed_url = urlparse(source.url) parsed_url = urlparse(source.url)
path = parsed_url.path.replace("/", "")
if parsed_url.path == "": if path == "":
return Artist return Artist
if parsed_url.path.startswith("/album/"): if path.startswith("album"):
return Album return Album
if parsed_url.path.startswith("/track/"): if path.startswith("track"):
return Song return Song
return super().get_source_type(source) return super().get_source_type(source)
@ -65,7 +65,6 @@ class Bandcamp(Page):
try: try:
object_type = BandcampTypes(data["type"]) object_type = BandcampTypes(data["type"])
except ValueError: except ValueError:
print(data["type"])
return return
url = data["item_url_root"] url = data["item_url_root"]
@ -180,7 +179,7 @@ class Bandcamp(Page):
source_list=source_list source_list=source_list
) )
def _parse_song_list(self, soup: BeautifulSoup) -> List[Album]: def _parse_album(self, soup: BeautifulSoup) -> List[Album]:
title = None title = None
source_list: List[Source] = [] source_list: List[Source] = []
@ -194,6 +193,25 @@ class Bandcamp(Page):
return Album(title=title, source_list=source_list) return Album(title=title, source_list=source_list)
def _parse_artist_data_blob(self, data_blob: dict, artist_url: str):
if DEBUG:
dump_to_file("bandcamp_data_blob.json", json.dumps(data_blob), is_json=True, exit_after_dump=False)
parsed_artist_url = urlparse(artist_url)
album_list: List[Album] = []
for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []):
album_list.append(Album(
title=album_json["title"],
source_list=[Source(
self.SOURCE_TYPE,
urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", ""))
)]
))
return album_list
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist() artist = Artist()
@ -207,9 +225,22 @@ class Bandcamp(Page):
dump_to_file("artist_page.html", r.text, exit_after_dump=False) dump_to_file("artist_page.html", r.text, exit_after_dump=False)
artist = self._parse_artist_details(soup=soup.find("div", {"id": "bio-container"})) artist = self._parse_artist_details(soup=soup.find("div", {"id": "bio-container"}))
for subsoup in soup.find("ol", {"id": "music-grid"}).find_all("li"):
artist.main_album_collection.append(self._parse_song_list(soup=subsoup))
html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup))
data_blob_soup = soup.find("div", {"id": "pagedata"})
if data_blob_soup is not None:
data_blob = data_blob_soup["data-blob"]
if data_blob is not None:
artist.main_album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url)
)
artist.source_collection.append(source)
return artist return artist
def _parse_track_element(self, track: dict) -> Optional[Song]: def _parse_track_element(self, track: dict) -> Optional[Song]:
@ -220,7 +251,6 @@ class Bandcamp(Page):
) )
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
print(source)
album = Album() album = Album()
r = self.connection.get(source.url) r = self.connection.get(source.url)
@ -237,13 +267,16 @@ class Bandcamp(Page):
data = json.loads(data_container.text) data = json.loads(data_container.text)
artist_data = data["byArtist"] artist_data = data["byArtist"]
artist_source_list = []
if "@id" in artist_data:
artist_source_list=[Source(self.SOURCE_TYPE, artist_data["@id"])]
album = Album( album = Album(
title=data["name"], title=data["name"],
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))], source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"], name=artist_data["name"],
source_list=[Source(self.SOURCE_TYPE, artist_data["@id"])] source_list=artist_source_list
)] )]
) )
@ -256,6 +289,7 @@ class Bandcamp(Page):
except KeyError: except KeyError:
continue continue
album.source_collection.append(source)
return album return album
def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]: def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]:
@ -270,8 +304,6 @@ class Bandcamp(Page):
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
print(source)
r = self.connection.get(source.url) r = self.connection.get(source.url)
if r is None: if r is None:
return Song() return Song()
@ -313,6 +345,7 @@ class Bandcamp(Page):
lyrics_list=self._fetch_lyrics(soup=soup) lyrics_list=self._fetch_lyrics(soup=soup)
) )
song.source_collection.append(source)
return song return song

View File

@ -22,6 +22,10 @@ from ..objects import (
Options, Options,
DatabaseObject DatabaseObject
) )
from ..utils.shared import DEBUG
if DEBUG:
from ..utils.debug_utils import dump_to_file
ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, { ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
@ -264,20 +268,33 @@ class EncyclopaediaMetallum(Page):
soup = self.get_soup_from_response(r) soup = self.get_soup_from_response(r)
if DEBUG:
dump_to_file(f"ma_artist_sources_{ma_artist_id}.html", soup.prettify(), exit_after_dump=False)
if soup.find("span", {"id": "noLinks"}) is not None: if soup.find("span", {"id": "noLinks"}) is not None:
return [] return []
artist_source = soup.find("div", {"id": "band_links_Official"}) source_list = []
"""
TODO link_table: BeautifulSoup = soup.find("table", {"id": "linksTablemain"})
add a Label object to add the label sources from if link_table is not None:
TODO for tr in link_table.find_all("tr"):
maybe do merchandice stuff anchor: BeautifulSoup = tr.find("a")
""" if anchor is None:
continue
href = anchor["href"]
if href is not None:
source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE))
# The following code is only legacy code, which I just kep because it doesn't harm.
# The way ma returns sources changed.
artist_source = soup.find("div", {"id": "band_links"})
merchandice_source = soup.find("div", {"id": "band_links_Official_merchandise"}) merchandice_source = soup.find("div", {"id": "band_links_Official_merchandise"})
label_source = soup.find("div", {"id": "band_links_Labels"}) label_source = soup.find("div", {"id": "band_links_Labels"})
source_list = []
if artist_source is not None: if artist_source is not None:
for tr in artist_source.find_all("td"): for tr in artist_source.find_all("td"):
@ -288,6 +305,8 @@ class EncyclopaediaMetallum(Page):
source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE)) source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE))
print(source_list)
return source_list return source_list
def _parse_artist_attributes(self, artist_soup: BeautifulSoup) -> Artist: def _parse_artist_attributes(self, artist_soup: BeautifulSoup) -> Artist:

View File

@ -71,8 +71,9 @@ class YoutubeMusicConnection(Connection):
r = self.get("https://music.youtube.com/verify_session", is_heartbeat=True) r = self.get("https://music.youtube.com/verify_session", is_heartbeat=True)
if r is None: if r is None:
self.heartbeat_failed() self.heartbeat_failed()
return
string = r.content.decode("utf-8") string = r.text
data = json.loads(string[string.index("{"):]) data = json.loads(string[string.index("{"):])
success: bool = data["success"] success: bool = data["success"]
@ -248,6 +249,9 @@ class YoutubeMusic(SuperYouTube):
} }
) )
if r is None:
return []
renderer_list = r.json().get("contents", {}).get("tabbedSearchResultsRenderer", {}).get("tabs", [{}])[0].get("tabRenderer").get("content", {}).get("sectionListRenderer", {}).get("contents", []) renderer_list = r.json().get("contents", {}).get("tabbedSearchResultsRenderer", {}).get("tabs", [{}])[0].get("tabRenderer").get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG: if DEBUG:

View File

@ -4,7 +4,7 @@ from .config import main_settings
DEBUG = True DEBUG = True
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and True DEBUG_PAGES = DEBUG and False
if DEBUG: if DEBUG:
print("DEBUG ACTIVE") print("DEBUG ACTIVE")