feat: build

This commit is contained in:
2024-04-09 10:32:17 +02:00
parent 5ba38916d6
commit fc2414dc68
93 changed files with 42 additions and 26 deletions

View File

@@ -0,0 +1,7 @@
from .encyclopaedia_metallum import EncyclopaediaMetallum
from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp
from .abstract import Page, INDEPENDENT_DB_OBJECTS

View File

@@ -0,0 +1,453 @@
import logging
import random
import re
from copy import copy
from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple
from string import Formatter
import requests
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (
Song,
Source,
Album,
Artist,
Target,
DatabaseObject,
Options,
Collection,
Label,
)
from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
class NamingDict(dict):
CUSTOM_KEYS: Dict[str, str] = {
"label": "label.name",
"artist": "artist.name",
"song": "song.title",
"isrc": "song.isrc",
"album": "album.title",
"album_type": "album.album_type_string"
}
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = main_settings["audio_format"]
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
def __getitem__(self, key: str) -> str:
return fit_to_file_system(super().__getitem__(key))
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
def __missing__(self, key: str) -> str:
if "." not in key:
if key not in self.CUSTOM_KEYS:
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
frag_list = key.split(".")
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
return str(value)
except AttributeError:
return self.default_value_for_name(attribute_name)
def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
if type(music_object) == Label:
return _clean_label(label=music_object, collections=collections)
if type(music_object) == Artist:
return _clean_artist(artist=music_object, collections=collections)
if type(music_object) == Album:
return _clean_album(album=music_object, collections=collections)
if type(music_object) == Song:
return _clean_song(song=music_object, collections=collections)
def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]):
if collection.element_type not in collection_dict:
return
for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element, merge_into_existing=True)
collection[i] = r.current_element
if not r.was_the_same:
_clean_music_object(r.current_element, collection_dict)
def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(label.current_artist_collection, collections)
_clean_collection(label.album_collection, collections)
def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(artist.main_album_collection, collections)
_clean_collection(artist.feature_song_collection, collections)
_clean_collection(artist.label_collection, collections)
def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(album.label_collection, collections)
_clean_collection(album.song_collection, collections)
_clean_collection(album.artist_collection, collections)
def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(song.album_collection, collections)
_clean_collection(song.feature_artist_collection, collections)
_clean_collection(song.main_artist_collection, collections)
class Page:
"""
This is an abstract class, laying out the
functionality for every other class fetching something
"""
SOURCE_TYPE: SourcePages
LOGGER = logging.getLogger("this shouldn't be used")
# set this to true, if all song details can also be fetched by fetching album details
NO_ADDITIONAL_DATA_FROM_SONG = False
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1,
post_process: bool = True) -> DatabaseObject:
"""
when a music object with lacking data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
If you for example put in, an album, it fetches the tracklist
:param music_object:
:param stop_at_level:
This says the depth of the level the scraper will recurse to.
If this is for example set to 2, then the levels could be:
1. Level: the album
2. Level: every song of the album + every artist of the album
If no additional requests are needed to get the data one level below the supposed stop level
this gets ignored
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
"""
# creating a new object, of the same type
new_music_object: Optional[DatabaseObject] = None
# only certain database objects, have a source list
if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source
for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE):
tmp = self.fetch_object_from_source(
source=source,
enforce_type=type(music_object),
stop_at_level=stop_at_level,
post_process=False
)
if new_music_object is None:
new_music_object = tmp
else:
new_music_object.merge(tmp)
if new_music_object is not None:
music_object.merge(new_music_object)
return music_object
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2,
enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[
DatabaseObject]:
obj_type = self.get_source_type(source)
if obj_type is None:
return None
if enforce_type != obj_type and enforce_type is not None:
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
return None
music_object: DatabaseObject = None
fetch_map = {
Song: self.fetch_song,
Album: self.fetch_album,
Artist: self.fetch_artist,
Label: self.fetch_label
}
if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level)
else:
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
return None
if stop_at_level > 1:
collection: Collection
for collection_str in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection = music_object.__getattribute__(collection_str)
for sub_element in collection:
sub_element.merge(
self.fetch_details(sub_element, stop_at_level=stop_at_level - 1, post_process=False))
return music_object
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False,
process_metadata_anyway: bool = False) -> DownloadResult:
naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty:
continue
dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object)
return self._download(music_object, naming_dict, download_all, process_metadata_anyway=process_metadata_anyway)
def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False,
skip_details: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
skip_next_details = skip_details
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
if self.NO_ADDITIONAL_DATA_FROM_SONG:
skip_next_details = True
if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]:
return DownloadResult()
if not isinstance(music_object, Song) or not self.NO_ADDITIONAL_DATA_FROM_SONG:
self.fetch_details(music_object=music_object, stop_at_level=2)
naming_dict.add_object(music_object)
if isinstance(music_object, Song):
return self._download_song(music_object, naming_dict, process_metadata_anyway=process_metadata_anyway)
download_result: DownloadResult = DownloadResult()
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(music_object, collection_name)
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all,
skip_details=skip_next_details,
process_metadata_anyway=process_metadata_anyway))
return download_result
def _download_song(self, song: Song, naming_dict: NamingDict, process_metadata_anyway: bool = False):
if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre
if song.genre is None:
song.genre = naming_dict["genre"]
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
new_target = Target(
relative_to_music_dir=True,
file_path=Path(
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
)
if song.target_collection.empty:
song.target_collection.append(new_target)
sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
if len(sources) == 0:
return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.")
temp_target: Target = Target(
relative_to_music_dir=False,
file_path=Path(
main_settings["temp_directory"],
str(song.id)
)
)
r = DownloadResult(1)
found_on_disc = False
target: Target
for target in song.target_collection:
if target.exists:
if process_metadata_anyway:
target.copy_content(temp_target)
found_on_disc = True
r.found_on_disk += 1
r.add_target(target)
if found_on_disc and not process_metadata_anyway:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r
source = sources[0]
if not found_on_disc:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.title)
if not r.is_fatal_error:
r.merge(self._post_process_targets(song, temp_target,
[] if found_on_disc else self.get_skip_intervals(song, source)))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List) -> DownloadResult:
correct_codec(temp_target, interval_list=interval_list)
self.post_process_hook(song, temp_target)
write_metadata_to_target(song.metadata, temp_target)
r = DownloadResult()
target: Target
for target in song.target_collection:
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
return r
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -0,0 +1,361 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText
)
from ..connection import Connection
from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import main_settings, logging_settings
from ..utils.shared import DEBUG
if DEBUG:
from ..utils.debug_utils import dump_to_file
def _parse_artist_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, "/music/", "", "", ""))
def _get_host(source: Source) -> str:
parsed = urlparse(source.url)
return urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
class BandcampTypes(Enum):
ARTIST = "b"
ALBUM = "a"
SONG = "t"
class Bandcamp(Page):
# CHANGE
SOURCE_TYPE = SourcePages.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://bandcamp.com/",
logger=self.LOGGER,
module="bandcamp",
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
parsed_url = urlparse(source.url)
path = parsed_url.path.replace("/", "")
if path == "" or path.startswith("music"):
return Artist
if path.startswith("album"):
return Album
if path.startswith("track"):
return Song
return super().get_source_type(source)
def _parse_autocomplete_api_result(self, data: dict) -> DatabaseObject:
try:
object_type = BandcampTypes(data["type"])
except ValueError:
return
url = data["item_url_root"]
if "item_url_path" in data:
url = data["item_url_path"]
source_list = [Source(self.SOURCE_TYPE, url)]
name = data["name"]
if data.get("is_label", False):
return Label(
name=name,
source_list=source_list
)
if object_type is BandcampTypes.ARTIST:
source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(url))]
return Artist(
name=name,
source_list=source_list
)
if object_type is BandcampTypes.ALBUM:
return Album(
title=name,
source_list=source_list,
artist_list=[
Artist(
name=data["band_name"].strip(),
source_list=[
Source(self.SOURCE_TYPE, data["item_url_root"])
]
)
]
)
if object_type is BandcampTypes.SONG:
return Song(
title=name.strip(),
source_list=source_list,
main_artist_list=[
Artist(
name=data["band_name"],
source_list=[
Source(self.SOURCE_TYPE, data["item_url_root"])
]
)
]
)
def general_search(self, search_query: str, filter_string: str = "") -> List[DatabaseObject]:
results = []
r = self.connection.post("https://bandcamp.com/api/bcsearch_public_api/1/autocomplete_elastic", json={
"fan_id": None,
"full_page": True,
"search_filter": filter_string,
"search_text": search_query,
})
if r is None:
return results
if DEBUG:
dump_to_file("bandcamp_search_response.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
for element in data.get("auto", {}).get("results", []):
r = self._parse_autocomplete_api_result(element)
if r is not None:
results.append(r)
return results
def label_search(self, label: Label) -> List[Label]:
return self.general_search(label.name, filter_string="b")
def artist_search(self, artist: Artist) -> List[Artist]:
return self.general_search(artist.name, filter_string="b")
def album_search(self, album: Album) -> List[Album]:
return self.general_search(album.title, filter_string="a")
def song_search(self, song: Song) -> List[Song]:
return self.general_search(song.title, filter_string="t")
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def _parse_artist_details(self, soup: BeautifulSoup) -> Artist:
name: str = None
source_list: List[Source] = []
contact_list: List[Contact] = []
band_name_location: BeautifulSoup = soup.find("p", {"id": "band-name-location"})
if band_name_location is not None:
title_span = band_name_location.find("span", {"class": "title"})
if title_span is not None:
name = title_span.text.strip()
link_container: BeautifulSoup = soup.find("ol", {"id": "band-links"})
if link_container is not None:
li: BeautifulSoup
for li in link_container.find_all("a"):
if li is None and li['href'] is not None:
continue
source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE))
return Artist(
name=name,
source_list=source_list
)
def _parse_album(self, soup: BeautifulSoup, initial_source: Source) -> List[Album]:
title = None
source_list: List[Source] = []
a = soup.find("a")
if a is not None and a["href"] is not None:
source_list.append(Source(self.SOURCE_TYPE, _get_host(initial_source) + a["href"]))
title_p = soup.find("p", {"class": "title"})
if title_p is not None:
title = title_p.text.strip()
return Album(title=title, source_list=source_list)
def _parse_artist_data_blob(self, data_blob: dict, artist_url: str):
parsed_artist_url = urlparse(artist_url)
album_list: List[Album] = []
for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []):
album_list.append(Album(
title=album_json["title"].strip(),
source_list=[Source(
self.SOURCE_TYPE,
urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", ""))
)]
))
return album_list
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
r = self.connection.get(_parse_artist_url(source.url))
if r is None:
return artist
soup = self.get_soup_from_response(r)
if DEBUG:
dump_to_file("artist_page.html", r.text, exit_after_dump=False)
artist = self._parse_artist_details(soup=soup.find("div", {"id": "bio-container"}))
html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"]
if DEBUG:
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None:
artist.main_album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url)
)
artist.source_collection.append(source)
return artist
def _parse_track_element(self, track: dict) -> Optional[Song]:
return Song(
title=track["item"]["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
tracksort=int(track["position"])
)
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album = Album()
r = self.connection.get(source.url)
if r is None:
return album
soup = self.get_soup_from_response(r)
data_container = soup.find("script", {"type": "application/ld+json"})
if DEBUG:
dump_to_file("album_data.json", data_container.text, is_json=True, exit_after_dump=False)
data = json.loads(data_container.text)
artist_data = data["byArtist"]
artist_source_list = []
if "@id" in artist_data:
artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
album = Album(
title=data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=artist_source_list
)]
)
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
if DEBUG:
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
try:
album.song_collection.append(self._parse_track_element(track_json))
except KeyError:
continue
album.source_collection.append(source)
return album
def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]:
track_lyrics = soup.find("div", {"class": "lyricsText"})
if track_lyrics:
self.LOGGER.debug(" Lyrics retrieved..")
return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))]
return []
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
r = self.connection.get(source.url)
if r is None:
return Song()
soup = self.get_soup_from_response(r)
data_container = soup.find("script", {"type": "application/ld+json"})
other_data = {}
other_data_list = soup.select("script[data-tralbum]")
if len(other_data_list) > 0:
other_data = json.loads(other_data_list[0]["data-tralbum"])
if DEBUG:
dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False)
dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False)
dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False)
data = json.loads(data_container.text)
album_data = data["inAlbum"]
artist_data = data["byArtist"]
mp3_url = None
for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items():
mp3_url = value
song = Song(
title=data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
album_list=[Album(
title=album_data["name"].strip(),
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)],
main_artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)],
lyrics_list=self._fetch_lyrics(soup=soup)
)
song.source_collection.append(source)
return song
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
if source.audio_url is None:
return DownloadResult(error_message="Couldn't find download link.")
return self.connection.stream_into(url=source.audio_url, target=target, description=desc)

View File

@@ -0,0 +1,857 @@
from collections import defaultdict
from typing import List, Optional, Dict, Type, Union
from bs4 import BeautifulSoup
import pycountry
from urllib.parse import urlparse, urlencode
from ..connection import Connection
from ..utils.config import logging_settings
from .abstract import Page
from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query
from ..objects import (
Lyrics,
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Options,
DatabaseObject
)
from ..utils.shared import DEBUG
if DEBUG:
from ..utils.debug_utils import dump_to_file
ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
"Full-length": AlbumType.STUDIO_ALBUM,
"Single": AlbumType.SINGLE,
"EP": AlbumType.EP,
"Demo": AlbumType.DEMO,
"Video": AlbumType.OTHER,
"Live album": AlbumType.LIVE_ALBUM,
"Compilation": AlbumType.COMPILATION_ALBUM
})
URL_SITE = 'https://www.metal-archives.com/'
URL_IMAGES = 'https://www.metal-archives.com/images/'
URL_CSS = 'https://www.metal-archives.com/css/'
def _song_from_json(artist_html=None, album_html=None, release_type=None, title=None, lyrics_html=None) -> Song:
song_id = None
if lyrics_html is not None:
soup = BeautifulSoup(lyrics_html, 'html.parser')
anchor = soup.find('a')
raw_song_id = anchor.get('id')
song_id = raw_song_id.replace("lyricsLink_", "")
return Song(
title=title,
main_artist_list=[
_artist_from_json(artist_html=artist_html)
],
album_list=[
_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html)
],
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, song_id)
]
)
def _artist_from_json(artist_html=None, genre=None, country=None) -> Artist:
"""
TODO parse the country to a standard
"""
# parse the html
# parse the html for the band name and link on metal-archives
soup = BeautifulSoup(artist_html, 'html.parser')
anchor = soup.find('a')
artist_name = anchor.text
artist_url = anchor.get('href')
artist_id = artist_url.split("/")[-1]
anchor.decompose()
strong = soup.find('strong')
if strong is not None:
strong.decompose()
akronyms_ = soup.text[2:-2].split(', ')
return Artist(
name=artist_name,
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, artist_url)
]
)
def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Album:
# parse the html
# <a href="https://www.metal-archives.com/albums/Ghost_Bath/Self_Loather/970834">Self Loather</a>'
soup = BeautifulSoup(album_html, 'html.parser')
anchor = soup.find('a')
album_name = anchor.text.strip()
album_url = anchor.get('href')
album_id = album_url.split("/")[-1]
album_type = ALBUM_TYPE_MAP[release_type.strip()]
return Album(
title=album_name,
album_type=album_type,
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, album_url)
],
artist_list=[
_artist_from_json(artist_html=artist_html)
]
)
def create_grid(
tableOrId: str = "#searchResultsSong",
nbrPerPage: int = 200,
ajaxUrl: str = "search/ajax-advanced/searching/songs/?songTitle=high&bandName=&releaseTitle=&lyrics=&genre=",
extraOptions: dict = None
):
"""
function createGrid(tableOrId, nbrPerPage, ajaxUrl, extraOptions) {
var table = null;
if (typeof tableOrId == "string") {
table = $(tableOrId);
} else {
table = tableOrId;
}
if (ajaxUrl == undefined) {
ajaxUrl = null;
}
var options = {
bAutoWidth: false,
bFilter: false,
bLengthChange: false,
bProcessing: true,
bServerSide: ajaxUrl != null,
iDisplayLength: nbrPerPage,
sAjaxSource: URL_SITE + ajaxUrl,
sPaginationType: 'full_numbers',
sDom: 'ipl<"block_spacer_5"><"clear"r>f<t>rip',
oLanguage: {
sProcessing: 'Loading...',
sEmptyTable: 'No records to display.',
sZeroRecords: 'No records found.'
},
"fnDrawCallback": autoScrollUp
};
if (typeof extraOptions == "object") {
for (var key in extraOptions) {
options[key] = extraOptions[key];
if (key == 'fnDrawCallback') {
var callback = options[key];
options[key] = function(o) {
autoScrollUp(o);
callback(o);
}
}
}
}
return table.dataTable(options);
}
:return:
"""
def onDrawCallback(o):
"""
this gets executed once the ajax request is done
:param o:
:return:
"""
extraOptions = extraOptions or {
"bSort": False,
"oLanguage": {
"sProcessing": 'Searching, please wait...',
"sEmptyTable": 'No matches found. Please try with different search terms.'
}
}
options = {
"bAutoWidth": False,
"bFilter": False,
"bLengthChange": False,
"bProcessing": True,
"bServerSide": ajaxUrl is not None,
"iDisplayLength": nbrPerPage,
"sAjaxSource": URL_SITE + ajaxUrl,
"sPaginationType": 'full_numbers',
"sDom": 'ipl<"block_spacer_5"><"clear"r>f<t>rip',
"oLanguage": {
"sProcessing": 'Loading...',
"sEmptyTable": 'No records to display.',
"sZeroRecords": 'No records found.'
},
"fnDrawCallback": onDrawCallback
}
for key, value in extraOptions.items():
options[key] = value
if key == 'fnDrawCallback':
callback = options[key]
options[key] = lambda o: onDrawCallback(o) and callback(o)
# implement jquery datatable
class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourcePages.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"]
def __init__(self, **kwargs):
self.connection: Connection = Connection(
host="https://www.metal-archives.com/",
logger=self.LOGGER,
module=type(self).__name__
)
super().__init__(**kwargs)
def song_search(self, song: Song) -> List[Song]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?"
"""
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?songTitle={song}&bandName={" \
"artist}&releaseTitle={album}&lyrics=&genre=&sEcho=1&iColumns=5&sColumns=&iDisplayStart=0" \
"&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&mDataProp_3=3&mDataProp_4=4&_" \
"=1674550595663"
"""
"""
The difficult question I am facing is, that if I try every artist, with every song, with every album,
I end up with a quadratic runtime complecety O(n^2), where every step means one web request.
This.
Is not good.
"""
search_params = {
"songTitle": song.title,
"bandName": "*",
"releaseTitle": "*",
"lyrics": "",
"genre": "",
"sEcho": 1,
"iColumns": 5,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"mDataProp_3": 3,
"mDataProp_4": 4,
"_": 1705946986092
}
referer_params = {
"songTitle": song.title,
"bandName": "*",
"releaseTitle": "*",
"lyrics": "",
"genre": "",
}
urlencode(search_params)
song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection]
search_results = []
for artist in artist_titles:
for album in album_titles:
_search = search_params.copy()
_referer_params = referer_params.copy()
_search["bandName"] = _referer_params["bandName"] = artist
_search["releaseTitle"] = _referer_params["releaseTitle"] = album
r = self.connection.get(endpoint + urlencode(_search), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/songs?" + urlencode(_referer_params),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
}, name="song_search")
if r is None:
return []
search_results.extend(_song_from_json(
artist_html=raw_song[0],
album_html=raw_song[1],
release_type=raw_song[2],
title=raw_song[3],
lyrics_html=raw_song[4]
) for raw_song in r.json()['aaData'])
return search_results
def album_search(self, album: Album) -> List[Album]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/albums/?"
search_params = {
"bandName": "*",
"releaseTitle": album.title.strip(),
"releaseYearFrom": "",
"releaseMonthFrom": "",
"releaseYearTo": "",
"releaseMonthTo": "",
"country": "",
"location": "",
"releaseLabelName": "",
"releaseCatalogNumber": "",
"releaseIdentifiers": "",
"releaseRecordingInfo": "",
"releaseDescription": "",
"releaseNotes": "",
"genre": "",
"sEcho": 1,
"iColumns": 3,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"_": 1705946986092
}
referer_params = {
"bandName": "*",
"releaseTitle": album.title.strip(),
}
album_title = album.title
artist_titles = ["*"] if album.artist_collection.empty else [artist.name.strip() for artist in album.artist_collection]
search_results = []
for artist in artist_titles:
_search = search_params.copy()
_referer_params = referer_params.copy()
_search["bandName"] = _referer_params["bandName"] = artist
r = self.connection.get(endpoint + urlencode(_search), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/albums?" + urlencode(_referer_params),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01",
})
#r = self.connection.get(endpoint.format(artist=artist, album=album_title))
if r is None:
return []
search_results.extend(_album_from_json(
artist_html=raw_album[0],
album_html=raw_album[1],
release_type=raw_album[2]
) for raw_album in r.json()['aaData'])
def artist_search(self, artist: Artist) -> List[Artist]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/bands/?"
search_params = {
"bandName": artist.name.strip(),
"genre": "",
"country": "",
"yearCreationFrom": "",
"yearCreationTo": "",
"bandNotes": "",
"status": "",
"themes": "",
"location": "",
"bandLabelName": "",
"sEcho": 1,
"iColumns": 3,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"_": 1705946986092
}
r = self.connection.get(endpoint + urlencode(search_params), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/bands?" + urlencode({"bandName": artist.name.strip()}),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01",
}, name="artist_search.json")
if r is None:
return []
data_key = 'aaData'
parsed_data = r.json()
if data_key not in parsed_data:
return []
return [
_artist_from_json(artist_html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
for raw_artist in r.json()['aaData']
]
def general_search(self, query: str) -> List[DatabaseObject]:
"""
Searches the default endpoint from metal archives, which intern searches only
for bands, but it is the default, thus I am rolling with it
"""
endpoint = "https://www.metal-archives.com/search/ajax-band-search/?field=name&query={query}&sEcho=1&iColumns=3&sColumns=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2"
r = self.connection.get(endpoint.format(query=query))
if r is None:
return []
return [
_artist_from_json(artist_html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
for raw_artist in r.json()['aaData']
]
def _fetch_artist_discography(self, ma_artist_id: str) -> List[Album]:
discography_url = "https://www.metal-archives.com/band/discography/id/{}/tab/all"
# make the request
r = self.connection.get(discography_url.format(ma_artist_id))
if r is None:
return []
soup = self.get_soup_from_response(r)
discography = []
tbody_soup = soup.find('tbody')
for tr_soup in tbody_soup.find_all('tr'):
td_list = tr_soup.findChildren(recursive=False)
album_soup = td_list[0]
album_name = album_soup.text
album_url = album_soup.find('a').get('href')
album_id = album_url.split('/')[-1]
raw_album_type = td_list[1].text
album_year = td_list[2].text
date_obj = None
try:
date_obj = ID3Timestamp(year=int(album_year))
except ValueError():
pass
discography.append(
Album(
title=album_name,
date=date_obj,
album_type=ALBUM_TYPE_MAP[raw_album_type],
source_list=[Source(self.SOURCE_TYPE, album_url)]
)
)
return discography
def _fetch_artist_sources(self, ma_artist_id: str) -> List[Source]:
sources_url = "https://www.metal-archives.com/link/ajax-list/type/band/id/{}"
r = self.connection.get(sources_url.format(ma_artist_id))
if r is None:
return []
soup = self.get_soup_from_response(r)
if DEBUG:
dump_to_file(f"ma_artist_sources_{ma_artist_id}.html", soup.prettify(), exit_after_dump=False)
if soup.find("span", {"id": "noLinks"}) is not None:
return []
source_list = []
link_table: BeautifulSoup = soup.find("table", {"id": "linksTablemain"})
if link_table is not None:
for tr in link_table.find_all("tr"):
anchor: BeautifulSoup = tr.find("a")
if anchor is None:
continue
href = anchor["href"]
if href is not None:
source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE))
# The following code is only legacy code, which I just kep because it doesn't harm.
# The way ma returns sources changed.
artist_source = soup.find("div", {"id": "band_links"})
merchandice_source = soup.find("div", {"id": "band_links_Official_merchandise"})
label_source = soup.find("div", {"id": "band_links_Labels"})
if artist_source is not None:
for tr in artist_source.find_all("td"):
a = tr.find("a")
url = a.get("href")
if url is None:
continue
source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE))
return source_list
def _parse_artist_attributes(self, artist_soup: BeautifulSoup) -> Artist:
name: str = None
country: pycountry.Countrie = None
formed_in_year: int = None
genre: str = None
lyrical_themes: List[str] = []
label_name: str = None
label_url: str = None
source_list: List[Source] = []
title_soup: BeautifulSoup = artist_soup.find("title")
if title_soup is not None:
bad_name_substring = " - Encyclopaedia Metallum: The Metal Archives"
title_text = title_soup.get_text()
if title_text.count(bad_name_substring) == 1:
name = title_text.replace(bad_name_substring, "")
else:
self.LOGGER.debug(f"the title of the page is \"{title_text}\"")
"""
TODO
Implement the bandpictures and logos that can be gotten with the elements
<a class="image" id="photo" title="Ghost Bath"...
<a class="image" id="logo" title="Ghost Bath"...
where the titles are the band name
"""
image_container_soup: BeautifulSoup = artist_soup.find(id="band_sidebar")
if image_container_soup is not None:
logo_soup = image_container_soup.find(id="logo")
if logo_soup is not None:
logo_title = logo_soup.get("title")
if logo_title is not None:
name = logo_title.strip()
band_pictures = image_container_soup.find(id="photo")
if band_pictures is not None:
band_picture_title = logo_soup.get("title")
if band_picture_title is not None:
name = band_picture_title.strip()
for h1_band_name_soup in artist_soup.find_all("h1", {"class": "band_name"}):
anchor: BeautifulSoup = h1_band_name_soup.find("a")
if anchor is None:
continue
href = anchor.get("href")
if href is not None:
source_list.append(Source(self.SOURCE_TYPE, href))
name = anchor.get_text(strip=True)
band_stat_soup = artist_soup.find("div", {"id": "band_stats"})
for dl_soup in band_stat_soup.find_all("dl"):
for title, data in zip(dl_soup.find_all("dt"), dl_soup.find_all("dd")):
title_text = title.text
if "Country of origin:" == title_text:
href = data.find('a').get('href')
country = pycountry.countries.get(alpha_2=href.split("/")[-1])
continue
# not needed: Location: Minot, North Dakota
"""
TODO
status: active
need to do enums for that and add it to object
"""
if "Formed in:" == title_text:
if not data.text.isnumeric():
continue
formed_in_year = int(data.text)
continue
if "Genre:" == title_text:
genre = data.text
continue
if "Lyrical themes:" == title_text:
lyrical_themes = data.text.split(", ")
continue
if "Current label:" == title_text:
label_name = data.text
label_anchor = data.find("a")
label_url = None
if label_anchor is not None:
label_url = label_anchor.get("href")
label_id = None
if type(label_url) is str and "/" in label_url:
label_id = label_url.split("/")[-1]
"""
TODO
years active: 2012-present
process this and add field to class
"""
return Artist(
name=name,
country=country,
formed_in=ID3Timestamp(year=formed_in_year),
general_genre=genre,
lyrical_themes=lyrical_themes,
label_list=[
Label(
name=label_name,
source_list=[
Source(self.SOURCE_TYPE, label_url)
]
)
],
source_list=source_list
)
def _fetch_artist_attributes(self, url: str) -> Artist:
r = self.connection.get(url)
if r is None:
return Artist()
soup: BeautifulSoup = self.get_soup_from_response(r)
return self._parse_artist_attributes(artist_soup=soup)
def _fetch_band_notes(self, ma_artist_id: str) -> Optional[FormattedText]:
endpoint = "https://www.metal-archives.com/band/read-more/id/{}"
# make the request
r = self.connection.get(endpoint.format(ma_artist_id))
if r is None:
return FormattedText()
return FormattedText(html=r.text)
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
"""
What it could fetch, and what is implemented:
[x] https://www.metal-archives.com/bands/Ghost_Bath/3540372489
[x] https://www.metal-archives.com/band/discography/id/3540372489/tab/all
[] reviews: https://www.metal-archives.com/review/ajax-list-band/id/3540372489/json/1?sEcho=1&iColumns=4&sColumns=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&mDataProp_3=3&iSortCol_0=3&sSortDir_0=desc&iSortingCols=1&bSortable_0=true&bSortable_1=true&bSortable_2=true&bSortable_3=true&_=1675155257133
[] simmilar: https://www.metal-archives.com/band/ajax-recommendations/id/3540372489
[x] sources: https://www.metal-archives.com/link/ajax-list/type/band/id/3540372489
[x] band notes: https://www.metal-archives.com/band/read-more/id/3540372489
"""
artist = self._fetch_artist_attributes(source.url)
artist_id = source.url.split("/")[-1]
artist_sources = self._fetch_artist_sources(artist_id)
artist.source_collection.extend(artist_sources)
band_notes = self._fetch_band_notes(artist_id)
if band_notes is not None:
artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography)
return artist
def _parse_album_track_row(self, track_row: BeautifulSoup) -> Song:
"""
<tr class="even">
<td width="20"><a class="anchor" name="5948442"> </a>1.</td> # id and tracksort
<td class="wrapWords">Convince Me to Bleed</td> # name
<td align="right">03:40</td> # length
<td nowrap="nowrap"> 
<a href="#5948442" id="lyricsButton5948442" onclick="toggleLyrics('5948442'); return false;">Show lyrics</a>
</td>
</tr>
"""
row_list = track_row.find_all(recursive=False)
source_list: List[Source] = []
track_sort_soup = row_list[0]
track_sort = int(track_sort_soup.text[:-1])
track_id = track_sort_soup.find("a").get("name").strip()
if track_row.find("a", {"href": f"#{track_id}"}) is not None:
source_list.append(Source(self.SOURCE_TYPE, track_id))
title = row_list[1].text.strip()
length = None
duration_stamp = row_list[2].text
if ":" in duration_stamp:
minutes, seconds = duration_stamp.split(":")
length = (int(minutes) * 60 + int(seconds)) * 1000 # in milliseconds
return Song(
title=title,
length=length,
tracksort=track_sort,
source_list=source_list
)
def _parse_album_attributes(self, album_soup: BeautifulSoup, stop_at_level: int = 1) -> Album:
tracklist: List[Song] = []
artist_list = []
album_name: str = None
source_list: List[Source] = []
def _parse_album_info(album_info_soup: BeautifulSoup):
nonlocal artist_list
nonlocal album_name
nonlocal source_list
if album_info_soup is None:
return
album_soup_list = album_info_soup.find_all("h1", {"class": "album_name"})
if len(album_soup_list) == 1:
anchor: BeautifulSoup = album_soup_list[0].find("a")
href = anchor.get("href")
if href is not None:
source_list.append(Source(self.SOURCE_TYPE, href.strip()))
album_name = anchor.get_text(strip=True)
elif len(album_soup_list) > 1:
self.LOGGER.debug("there are more than 1 album soups")
artist_soup_list = album_info_soup.find_all("h2", {"class": "band_name"})
if len(artist_soup_list) == 1:
for anchor in artist_soup_list[0].find_all("a"):
artist_sources: List[Source] = []
href = anchor.get("href")
if href is not None:
artist_sources.append(Source(self.SOURCE_TYPE, href.strip()))
artist_name = anchor.get_text(strip=True)
artist_list.append(Artist(
name=artist_name,
source_list=artist_sources
))
elif len(artist_soup_list) > 1:
self.LOGGER.debug("there are more than 1 artist soups")
_parse_album_info(album_info_soup=album_soup.find(id="album_info"))
tracklist_soup = album_soup.find("table", {"class": "table_lyrics"}).find("tbody")
for track_soup in tracklist_soup.find_all("tr", {"class": ["even", "odd"]}):
tracklist.append(self._parse_album_track_row(track_row=track_soup))
return Album(
title=album_name,
source_list=source_list,
artist_list=artist_list,
song_list=tracklist
)
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
"""
I am preeeety sure I can get way more data than... nothing from there
:param source:
:param stop_at_level:
:return:
"""
# <table class="display table_lyrics
r = self.connection.get(source.url)
if r is None:
return Album()
soup = self.get_soup_from_response(r)
album = self._parse_album_attributes(soup, stop_at_level=stop_at_level)
return album
def _fetch_lyrics(self, song_id: str) -> Optional[Lyrics]:
"""
function toggleLyrics(songId) {
var lyricsRow = $('#song' + songId);
lyricsRow.toggle();
var lyrics = $('#lyrics_' + songId);
if (lyrics.html() == '(loading lyrics...)') {
var realId = songId;
if(!$.isNumeric(songId.substring(songId.length -1, songId.length))) {
realId = songId.substring(0, songId.length -1);
}
lyrics.load(URL_SITE + "release/ajax-view-lyrics/id/" + realId);
}
// toggle link
var linkLabel = "lyrics";
$("#lyricsButton" + songId).text(lyricsRow.css("display") == "none" ? "Show " + linkLabel : "Hide " + linkLabel);
return false;
}
"""
if song_id is None:
return None
endpoint = "https://www.metal-archives.com/release/ajax-view-lyrics/id/{id}".format(id=song_id)
r = self.connection.get(endpoint)
if r is None:
return None
return Lyrics(
text=FormattedText(html=r.text),
language=pycountry.languages.get(alpha_2="en"),
source_list=[
Source(self.SOURCE_TYPE, endpoint)
]
)
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
song_id = source.url
return Song(
lyrics_list=[
self._fetch_lyrics(song_id=song_id)
]
)
def get_source_type(self, source: Source):
if self.SOURCE_TYPE != source.page_enum:
return None
url = source.url
if url is None:
return None
parsed_url = urlparse(url)
path: List[str] = parsed_url.path.split("/")
if "band" in path:
return Artist
if "bands" in path:
return Artist
if "albums" in path:
return Album
if "labels" in path:
return Label
return None

1124
music_kraken/pages/musify.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,65 @@
from typing import List, Optional, Type
from urllib.parse import urlparse
import logging
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ..connection import Connection
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
class Preset(Page):
# CHANGE
SOURCE_TYPE = SourcePages.PRESET
LOGGER = logging.getLogger("preset")
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://www.preset.cum/",
logger=self.LOGGER
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -0,0 +1,353 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
import sponsorblock
from sponsorblock.errors import HTTPException, NotFoundException
from ..objects import Source, DatabaseObject, Song, Target
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ..connection import Connection
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
"""
- https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
- https://yt.artemislena.eu/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA
- https://yt.artemislena.eu/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM
- https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY
"""
def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((youtube_settings["piped_instance"].scheme, youtube_settings["piped_instance"].netloc, path, params, query, fragment))
class YouTube(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host=get_invidious_url(),
logger=self.LOGGER
)
self.piped_connection: Connection = Connection(
host=get_piped_url(),
logger=self.LOGGER
)
self.download_connection: Connection = Connection(
host="https://www.youtube.com/",
logger=self.LOGGER,
sleep_after_404=youtube_settings["sleep_after_youtube_403"]
)
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
_sponsorblock_connection: Connection = Connection(host="https://sponsor.ajay.app/")
self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session)
super().__init__(*args, **kwargs)
def general_search(self, search_query: str) -> List[DatabaseObject]:
return self.artist_search(Artist(name=search_query, dynamic=True))
def _json_to_artist(self, artist_json: dict) -> Artist:#
return Artist(
name=artist_json["author"].replace(" - Topic", ""),
source_list=[
Source(self.SOURCE_TYPE, get_invidious_url(path=artist_json["authorUrl"]))
]
)
def artist_search(self, artist: Artist) -> List[Artist]:
# https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
endpoint = get_invidious_url(path="/api/v1/search", query=f"q={artist.name.replace(' ', '+')}+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance")
artist_list = []
r = self.connection.get(endpoint)
if r is None:
return []
for search_result in r.json():
if search_result["type"] != "channel":
continue
author: str = search_result["author"]
if not author.endswith(" - Topic"):
continue
artist_list.append(self._json_to_artist(search_result))
return artist_list
def _fetch_song_from_id(self, youtube_id: str) -> Tuple[Song, Optional[int]]:
# https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY
r = self.connection.get(get_invidious_url(path=f"/api/v1/videos/{youtube_id}"))
if r is None:
return Song(), None
data = r.json()
if data["genre"] != "Music":
self.LOGGER.warning(f"Genre has to be music, trying anyways")
title = data["title"]
license_str = None
artist_list: List[Artist] = []
_author: str = data["author"]
if _author.endswith(" - Topic"):
artist_list.append(Artist(
name=_author.replace(" - Topic", ""),
source_list=[Source(
self.SOURCE_TYPE, get_invidious_url(path=f"/channel/{data['authorId']}")
)]
))
else:
# If the song is not a topic song in the beginning, it cleans the title. If it is from a topic channel, it is clean anyways
# If cleaned data is returned by the api, it will be overridden in the next step anyways
title = clean_song_title(title, _author)
for music_track in data.get("musicTracks", []):
title = music_track["song"]
license_str = music_track["license"]
for artist_name in music_track["artist"].split(" x "):
artist_list.append(Artist(name=artist_name))
# if all attempts to get a clean artis name (mainly striping topic or getting the stuff in the api) fail, just add an artist with the name of the uploader channel
if len(artist_list) == 0:
artist_list.append(Artist(name=_author))
return Song(
title=title,
source_list=[Source(
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list
), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.VIDEO:
return Song()
song, _ = self._fetch_song_from_id(parsed.id)
return song
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
self.LOGGER.info(f"Getting the metadata of an album may take slightly longer, only panic in a couple minutes <333")
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.PLAYLIST:
return Album()
title = None
source_list = [source]
notes = None
song_list = []
# https://yt.artemislena.eu/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM
r = self.connection.get(get_invidious_url(path=f"/api/v1/playlists/{parsed.id}"))
if r is None:
return Album()
data = r.json()
if data["type"] != "playlist":
return Album()
title = data["title"]
notes = FormattedText(html=data["descriptionHtml"])
timestamps: List[int] = []
"""
TODO
fetch the song and don't get it from there
"""
for video in data["videos"]:
other_song = Song(
source_list=[
Source(
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={video['videoId']}")
)
],
tracksort=video["index"]+1
)
song, utc_timestamp = self._fetch_song_from_id(video["videoId"])
song.merge(other_song)
if utc_timestamp is not None:
timestamps.append(utc_timestamp)
song_list.append(song)
return Album(
title=title,
source_list=source_list,
notes=notes,
song_list=song_list,
date=ID3Timestamp.fromtimestamp(round(sum(timestamps) / len(timestamps)))
)
def fetch_invidious_album_list(self, yt_id: str):
artist_name = None
album_list = []
# playlist
# https://yt.artemislena.eu/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA
r = self.connection.get(get_invidious_url(f"/api/v1/channels/playlists/{yt_id}"))
if r is None:
return Artist()
for playlist_json in r.json()["playlists"]:
if playlist_json["type"] != "playlist":
continue
artist_name = playlist_json["author"].replace(" - Topic", "")
# /playlist?list=OLAK5uy_nbvQeskr8nbIuzeLxoceNLuCL_KjAmzVw
album_list.append(Album(
title=playlist_json["title"],
source_list=[Source(
self.SOURCE_TYPE, get_invidious_url(path="/playlist", query=f"list={playlist_json['playlistId']}")
)],
artist_list=[Artist(
name=artist_name,
source_list=[
Source(self.SOURCE_TYPE, get_invidious_url(path=playlist_json["authorUrl"]))
]
)]
))
return album_list, artist_name
def fetch_piped_album_list(self, yt_id: str):
endpoint = get_piped_url(path=f"/channels/tabs", query='data={"originalUrl":"https://www.youtube.com/' + yt_id + '/playlists","url":"https://www.youtube.com/' + yt_id + 'playlists","id":"' + yt_id + '","contentFilters":["playlists"],"sortFilter":"","baseUrl":"https://www.youtube.com"}')
r = self.piped_connection.get(endpoint)
if r is None:
return [], None
content = r.json()["content"]
artist_name = None
album_list = []
for playlist in content:
if playlist["type"] != "playlist":
continue
artist_name = playlist["uploaderName"].replace(" - Topic", "")
album_list.append(Album(
title=playlist["name"],
source_list=[Source(
self.SOURCE_TYPE, get_invidious_url() + playlist["url"]
)],
artist_list=[Artist(
name=artist_name,
source_list=[
Source(self.SOURCE_TYPE, get_invidious_url(path=playlist["uploaderUrl"]))
]
)]
))
return album_list, artist_name
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.CHANNEL:
return Artist(source_list=[source])
album_list, artist_name = self.fetch_piped_album_list(parsed.id)
if len(album_list) <= 0:
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
"""
1. getting the optimal source
Only audio sources allowed
not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger
2. download it
:param source:
:param target:
:param desc:
:return:
"""
r = self.connection.get(YouTubeUrl(source.url).api)
if r is None:
return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance")
audio_format = None
best_bitrate = 0
for possible_format in r.json()["adaptiveFormats"]:
format_type: str = possible_format["type"]
if not format_type.startswith("audio"):
continue
bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= main_settings["bitrate"]:
best_bitrate = bitrate
audio_format = possible_format
break
if bitrate > best_bitrate:
best_bitrate = bitrate
audio_format = possible_format
if audio_format is None:
return DownloadResult(error_message="Couldn't find the download link.")
endpoint = audio_format["url"]
return self.download_connection.stream_into(endpoint, target, description=desc, raw_url=True)
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
if not youtube_settings["use_sponsor_block"]:
return []
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.VIDEO:
self.LOGGER.warning(f"{source.url} is no video url.")
return []
segments = []
try:
segments = self.sponsorblock_client.get_skip_segments(parsed.id)
except NotFoundException:
self.LOGGER.debug(f"No sponsor found for the video {parsed.id}.")
except HTTPException as e:
self.LOGGER.warning(f"{e}")
return [(segment.start, segment.end) for segment in segments]

View File

@@ -0,0 +1 @@
from .youtube_music import YoutubeMusic

View File

@@ -0,0 +1,112 @@
from typing import List, Optional, Dict, Type
from enum import Enum
from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ._music_object_render import parse_run_list, parse_run_element
LOGGER = logging_settings["youtube_music_logger"]
def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results = parse_run_list(renderer.get("title", {}).get("runs", []))
for sub_renderer in renderer.get("contents", []):
results.extend(parse_renderer(sub_renderer))
return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", []))
def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
results = []
for i, column in enumerate(renderer.get("flexColumns", [])):
_r = parse_renderer(column)
if i == 0 and len(_r) == 0:
renderer["text"] = \
column.get("musicResponsiveListItemFlexColumnRenderer", {}).get("text", {}).get("runs", [{}])[0].get(
"text")
results.extend(_r)
_r = parse_run_element(renderer)
if _r is not None:
results.append(_r)
song_list: List[Song] = []
album_list: List[Album] = []
artist_list: List[Artist] = []
_map: Dict[Type[DatabaseObject], List[DatabaseObject]] = {Song: song_list, Album: album_list, Artist: artist_list}
for result in results:
_map[type(result)].append(result)
for song in song_list:
song.album_collection.extend(album_list)
song.main_artist_collection.extend(artist_list)
for album in album_list:
album.artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
return results
def music_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
result = []
for subrenderer in renderer.get("contents"):
result.extend(parse_renderer(subrenderer))
return result
def music_carousel_shelf_renderer(renderer: dict):
return music_shelf_renderer(renderer=renderer)
def music_two_row_item_renderer(renderer: dict):
return parse_run_list(renderer.get("title", {}).get("runs", []))
RENDERER_PARSERS = {
"musicCardShelfRenderer": music_card_shelf_renderer,
"musicResponsiveListItemRenderer": music_responsive_list_item_renderer,
"musicResponsiveListItemFlexColumnRenderer": music_responsive_list_item_flex_column_renderer,
"musicShelfRenderer": music_card_shelf_renderer,
"musicCarouselShelfRenderer": music_carousel_shelf_renderer,
"musicTwoRowItemRenderer": music_two_row_item_renderer,
"itemSectionRenderer": lambda _: [],
}
def parse_renderer(renderer: dict) -> List[DatabaseObject]:
result: List[DatabaseObject] = []
for renderer_name, renderer in renderer.items():
if renderer_name not in RENDERER_PARSERS:
LOGGER.warning(f"Can't parse the renderer {renderer_name}.")
continue
result.extend(RENDERER_PARSERS[renderer_name](renderer))
return result

View File

@@ -0,0 +1,85 @@
from typing import List, Optional
from enum import Enum
from ...utils.config import youtube_settings, logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
LOGGER = logging_settings["youtube_music_logger"]
SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC
class PageType(Enum):
ARTIST = "MUSIC_PAGE_TYPE_ARTIST"
ALBUM = "MUSIC_PAGE_TYPE_ALBUM"
CHANNEL = "MUSIC_PAGE_TYPE_USER_CHANNEL"
PLAYLIST = "MUSIC_PAGE_TYPE_PLAYLIST"
SONG = "MUSIC_VIDEO_TYPE_ATV"
VIDEO = "MUSIC_VIDEO_TYPE_UGC"
OFFICIAL_MUSIC_VIDEO = "MUSIC_VIDEO_TYPE_OMV"
# returns this type if you search for the band Queen
# S = "MUSIC_VIDEO_TYPE_OFFICIAL_SOURCE_MUSIC"
def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if "navigationEndpoint" not in run_element:
return
_temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
element_type = PageType.SONG
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
if not is_video:
page_type_string = navigation_endpoint.get("browseEndpointContextSupportedConfigs", {}).get("browseEndpointContextMusicConfig", {}).get("pageType", "")
try:
element_type = PageType(page_type_string)
except ValueError:
return
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_text = run_element.get("text")
if element_id is None or element_text is None:
LOGGER.warning("Couldn't find either the id or text of a Youtube music element.")
return
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=element_text, source_list=[source])
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
return Artist(name=element_text, source_list=[source])
if element_type == PageType.ALBUM or (element_type == PageType.PLAYLIST and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/playlist?list={element_id}")
return Album(title=element_text, source_list=[source])
LOGGER.debug(f"Type {page_type_string} wasn't implemented.")
def parse_run_list(run_list: List[dict]) -> List[DatabaseObject]:
music_object_list: List[DatabaseObject] = []
for run_renderer in run_list:
music_object = parse_run_element(run_renderer)
if music_object is None:
continue
music_object_list.append(music_object)
return music_object_list

View File

@@ -0,0 +1,222 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
import requests
import sponsorblock
from sponsorblock.errors import HTTPException, NotFoundException
from ...objects import Source, DatabaseObject, Song, Target
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ...connection import Connection
from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((youtube_settings["invidious_instance"].scheme, youtube_settings["invidious_instance"].netloc, path, params, query, fragment))
class YouTubeUrlType(Enum):
CHANNEL = "channel"
PLAYLIST = "playlist"
VIDEO = "watch"
NONE = ""
class YouTubeUrl:
"""
Artist
https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
Release
https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
Track
https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1
https://www.youtube.com/watch?v=SULFl39UjgY
"""
def __init__(self, url: str) -> None:
self.SOURCE_TYPE = SourcePages.YOUTUBE
"""
Raises Index exception for wrong url, and value error for not found enum type
"""
self.id = ""
parsed = urlparse(url=url)
if parsed.netloc == "music.youtube.com":
self.SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
self.url_type: YouTubeUrlType
type_frag_list = parsed.path.split("/")
if len(type_frag_list) < 2:
self.url_type = YouTubeUrlType.NONE
else:
try:
self.url_type = YouTubeUrlType(type_frag_list[1].strip())
except ValueError:
self.url_type = YouTubeUrlType.NONE
if self.url_type == YouTubeUrlType.CHANNEL:
if len(type_frag_list) < 3:
self.couldnt_find_id(url)
else:
self.id = type_frag_list[2]
elif self.url_type == YouTubeUrlType.PLAYLIST:
query_stuff = parse_qs(parsed.query)
if "list" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["list"][0]
elif self.url_type == YouTubeUrlType.VIDEO:
query_stuff = parse_qs(parsed.query)
if "v" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["v"][0]
def couldnt_find_id(self, url: str):
logging_settings["youtube_logger"].warning(f"The id is missing: {url}")
self.url_type = YouTubeUrlType.NONE
@property
def api(self) -> str:
if self.url_type == YouTubeUrlType.CHANNEL:
return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}")
if self.url_type == YouTubeUrlType.PLAYLIST:
return get_invidious_url(path=f"/api/v1/playlists/{id}")
if self.url_type == YouTubeUrlType.VIDEO:
return get_invidious_url(path=f"/api/v1/videos/{self.id}")
return get_invidious_url()
@property
def normal(self) -> str:
if self.url_type.CHANNEL:
return get_invidious_url(path=f"/channel/{self.id}")
if self.url_type.PLAYLIST:
return get_invidious_url(path="/playlist", query=f"list={self.id}")
if self.url_type.VIDEO:
return get_invidious_url(path="/watch", query=f"v={self.id}")
class SuperYouTube(Page):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
def __init__(self, *args, **kwargs):
self.download_connection: Connection = Connection(
host="https://www.youtube.com/",
logger=self.LOGGER,
sleep_after_404=youtube_settings["sleep_after_youtube_403"]
)
self.connection: Connection = Connection(
host=get_invidious_url(),
logger=self.LOGGER
)
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
_sponsorblock_connection: Connection = Connection(host="https://sponsor.ajay.app/")
self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = {
YouTubeUrlType.CHANNEL: Artist,
YouTubeUrlType.PLAYLIST: Album,
YouTubeUrlType.VIDEO: Song,
}
parsed = YouTubeUrl(source.url)
if parsed.url_type in _url_type:
return _url_type[parsed.url_type]
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
"""
1. getting the optimal source
Only audio sources allowed
not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger
2. download it
:param source:
:param target:
:param desc:
:return:
"""
r: requests.Response = self.connection.get(YouTubeUrl(source.url).api)
if r is None:
return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance")
audio_format = None
best_bitrate = 0
for possible_format in r.json()["adaptiveFormats"]:
format_type: str = possible_format["type"]
if not format_type.startswith("audio"):
continue
bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= main_settings["bitrate"]:
best_bitrate = bitrate
audio_format = possible_format
break
if bitrate > best_bitrate:
best_bitrate = bitrate
audio_format = possible_format
if audio_format is None:
return DownloadResult(error_message="Couldn't find the download link.")
endpoint = audio_format["url"]
return self.download_connection.stream_into(endpoint, target, name=desc, raw_url=True)
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
if not youtube_settings["use_sponsor_block"]:
return []
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.VIDEO:
self.LOGGER.warning(f"{source.url} is no video url.")
return []
segments = []
try:
segments = self.sponsorblock_client.get_skip_segments(parsed.id)
except NotFoundException:
self.LOGGER.debug(f"No sponsor found for the video {parsed.id}.")
except HTTPException as e:
self.LOGGER.warning(f"{e}")
return [(segment.start, segment.end) for segment in segments]

View File

@@ -0,0 +1,542 @@
from __future__ import unicode_literals, annotations
from typing import Dict, List, Optional, Set, Type
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
import logging
import random
import json
from dataclasses import dataclass
import re
from functools import lru_cache
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.functions import get_current_millis
if DEBUG:
from ...utils.debug_utils import dump_to_file
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ...connection import Connection
from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer
from .super_youtube import SuperYouTube
def get_youtube_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse(("https", "music.youtube.com", path, params, query, fragment))
class YoutubeMusicConnection(Connection):
"""
===heartbeat=timings=for=YOUTUBEMUSIC===
96.27
98.16
100.04
101.93
103.82
--> average delay in between: 1.8875 min
"""
def __init__(self, logger: logging.Logger, accept_language: str):
# https://stackoverflow.com/questions/30561260/python-change-accept-language-using-requests
super().__init__(
host="https://music.youtube.com/",
logger=logger,
heartbeat_interval=113.25,
header_values={
"Accept-Language": accept_language
},
module="youtube_music",
)
# cookie consent for youtube
# https://stackoverflow.com/a/66940841/16804841 doesn't work
for cookie_key, cookie_value in youtube_settings["youtube_music_consent_cookies"].items():
self.session.cookies.set(
name=cookie_key,
value=cookie_value,
path='/', domain='.youtube.com'
)
def heartbeat(self):
r = self.get("https://music.youtube.com/verify_session")
if r is None:
self.heartbeat_failed()
return
string = r.text
data = json.loads(string[string.index("{"):])
success: bool = data["success"]
if not success:
self.heartbeat_failed()
@dataclass
class YouTubeMusicCredentials:
api_key: str
# ctoken is probably short for continue-token
# It is probably not strictly necessary, but hey :))
ctoken: str
# the context in requests
context: dict
player_url: str
@property
def player_id(self):
@lru_cache(128)
def _extract_player_info(player_url):
_PLAYER_INFO_RE = (
r'/s/player/(?P<id>[a-zA-Z0-9_-]{8,})/player',
r'/(?P<id>[a-zA-Z0-9_-]{8,})/player(?:_ias\.vflset(?:/[a-zA-Z]{2,3}_[a-zA-Z]{2,3})?|-plasma-ias-(?:phone|tablet)-[a-z]{2}_[A-Z]{2}\.vflset)/base\.js$',
r'\b(?P<id>vfl[a-zA-Z0-9_-]+)\b.*?\.js$',
)
for player_re in _PLAYER_INFO_RE:
id_m = re.search(player_re, player_url)
if id_m:
break
else:
return
return id_m.group('id')
return _extract_player_info(self.player_url)
class YTDLLogger:
def __init__(self, logger: logging.Logger):
self.logger = logger
def debug(self, msg):
self.logger.debug(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
class MusicKrakenYoutubeDL(youtube_dl.YoutubeDL):
def __init__(self, main_instance: YoutubeMusic, ydl_opts: dict, **kwargs):
self.main_instance = main_instance
ydl_opts = ydl_opts or {}
ydl_opts.update({
"logger": YTDLLogger(self.main_instance.LOGGER),
})
super().__init__(ydl_opts, **kwargs)
super().__enter__()
def __del__(self):
super().__exit__(None, None, None)
class MusicKrakenYoutubeIE(YoutubeIE):
def __init__(self, *args, main_instance: YoutubeMusic, **kwargs):
self.main_instance = main_instance
super().__init__(*args, **kwargs)
class YoutubeMusic(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
LOGGER = logging_settings["youtube_music_logger"]
def __init__(self, *args, ydl_opts: dict = None, **kwargs):
self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection(
logger=self.LOGGER,
accept_language="en-US,en;q=0.5"
)
self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials(
api_key=youtube_settings["youtube_music_api_key"],
ctoken="",
context=youtube_settings["youtube_music_innertube_context"],
player_url=youtube_settings["player_url"],
)
self.start_millis = get_current_millis()
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
self._fetch_from_main_page()
SuperYouTube.__init__(self, *args, **kwargs)
self.download_connection: Connection = Connection(
host="https://rr2---sn-cxaf0x-nugl.googlevideo.com/",
logger=self.LOGGER,
sleep_after_404=youtube_settings["sleep_after_youtube_403"],
header_values={
"Referer": "https://music.youtube.com/",
'Origin': 'https://music.youtube.com',
}
)
# https://github.com/ytdl-org/youtube-dl/blob/master/README.md#embedding-youtube-dl
self.ydl = MusicKrakenYoutubeDL(self, ydl_opts)
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
def _fetch_from_main_page(self):
"""
===API=KEY===
AIzaSyC9XL3ZjWddXya6X74dJoCTL-WEYFDNX30
can be found at `view-source:https://music.youtube.com/`
search for: "innertubeApiKey"
"""
r = self.yt_music_connection.get("https://music.youtube.com/")
if r is None:
return
if urlparse(r.url).netloc == "consent.youtube.com":
self.LOGGER.info(f"Making cookie consent request for {type(self).__name__}.")
r = self.yt_music_connection.post("https://consent.youtube.com/save", data={
'gl': 'DE',
'm': '0',
'app': '0',
'pc': 'ytm',
'continue': 'https://music.youtube.com/?cbrd=1',
'x': '6',
'bl': 'boq_identityfrontenduiserver_20230905.04_p0',
'hl': 'en',
'src': '1',
'cm': '2',
'set_ytc': 'true',
'set_apyt': 'true',
'set_eom': 'false'
})
if r is None:
return
# load cookie dict from settings
cookie_dict = youtube_settings["youtube_music_consent_cookies"]
for cookie in r.cookies:
cookie_dict[cookie.name] = cookie.value
for cookie in self.yt_music_connection.session.cookies:
cookie_dict[cookie.name] = cookie.value
# save cookies in settings
youtube_settings["youtube_music_consent_cookies"] = cookie_dict
else:
self.yt_music_connection.save(r, "index.html")
r = self.yt_music_connection.get("https://music.youtube.com/", name="index.html")
if r is None:
return
content = r.text
if DEBUG:
dump_to_file(f"youtube_music_index.html", r.text, exit_after_dump=False)
# api key
api_key_pattern = (
r"(?<=\"innertubeApiKey\":\")(.*?)(?=\")",
r"(?<=\"INNERTUBE_API_KEY\":\")(.*?)(?=\")",
)
api_keys = []
for api_key_patter in api_key_pattern:
api_keys.extend(re.findall(api_key_patter, content))
found_a_good_api_key = False
for api_key in api_keys:
# save the first api key
api_key = api_keys[0]
try:
youtube_settings["youtube_music_api_key"] = api_key
except SettingValueError:
continue
found_a_good_api_key = True
break
if found_a_good_api_key:
self.LOGGER.info(f"Found a valid API-KEY for {type(self).__name__}: \"{api_key}\"")
else:
self.LOGGER.error(f"Couldn't find an API-KEY for {type(self).__name__}. :((")
# context
context_pattern = r"(?<=\"INNERTUBE_CONTEXT\":{)(.*?)(?=},\"INNERTUBE_CONTEXT_CLIENT_NAME\":)"
found_context = False
for context_string in re.findall(context_pattern, content, re.M):
try:
youtube_settings["youtube_music_innertube_context"] = json.loads("{" + context_string + "}")
found_context = True
except json.decoder.JSONDecodeError:
continue
self.credentials.context = youtube_settings["youtube_music_innertube_context"]
break
if not found_context:
self.LOGGER.warning(f"Couldn't find a context for {type(self).__name__}.")
# player url
"""
Thanks to youtube-dl <33
"""
player_pattern = [
r'(?<="jsUrl":")(.*?)(?=")',
r'(?<="PLAYER_JS_URL":")(.*?)(?=")'
]
found_player_url = False
for pattern in player_pattern:
for player_string in re.findall(pattern, content, re.M):
try:
youtube_settings["player_url"] = "https://music.youtube.com" + player_string
found_player_url = True
except json.decoder.JSONDecodeError:
continue
self.credentials.player_url = youtube_settings["player_url"]
break
if found_player_url:
break
if not found_player_url:
self.LOGGER.warning(f"Couldn't find an url for the video player.")
# ytcfg
youtube_settings["ytcfg"] = json.loads(self._search_regex(
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;',
content,
default='{}'
)) or {}
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
search_query = search_query.strip()
urlescaped_query: str = quote(search_query.strip().replace(" ", "+"))
# approximate the ammount of time it would take to type the search, because google for some reason tracks that
LAST_EDITED_TIME = get_current_millis() - random.randint(0, 20)
_estimated_time = sum(len(search_query) * random.randint(50, 100) for _ in search_query.strip())
FIRST_EDITED_TIME = LAST_EDITED_TIME - _estimated_time if LAST_EDITED_TIME - self.start_millis > _estimated_time else random.randint(
50, 100)
query_continue = "" if self.credentials.ctoken == "" else f"&ctoken={self.credentials.ctoken}&continuation={self.credentials.ctoken}"
# construct the request
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/search",
query=f"key={self.credentials.api_key}&prettyPrint=false" + query_continue),
json={
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"query": search_query,
"suggestStats": {
"clientName": "youtube-music",
"firstEditTimeMsec": FIRST_EDITED_TIME,
"inputMethod": "KEYBOARD",
"lastEditTimeMsec": LAST_EDITED_TIME,
"originalQuery": search_query,
"parameterValidationStatus": "VALID_PARAMETERS",
"searchMethod": "ENTER_KEY",
"validationStatus": "VALID",
"zeroPrefixEnabled": True,
"availableSuggestions": []
}
},
headers={
"Referer": get_youtube_url(path=f"/search", query=f"q={urlescaped_query}")
}
)
if r is None:
return []
renderer_list = r.json().get("contents", {}).get("tabbedSearchResultsRenderer", {}).get("tabs", [{}])[0].get(
"tabRenderer").get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list disappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
# construct the request
url = urlparse(source.url)
browse_id = url.path.replace("/channel/", "")
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"key={self.credentials.api_key}&prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
}
)
if r is None:
return artist
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
artist.add_list_of_other_objects(results)
return artist
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album = Album()
parsed_url = urlparse(source.url)
list_id_list = parse_qs(parsed_url.query)['list']
if len(list_id_list) <= 0:
return album
browse_id = list_id_list[0]
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"key={self.credentials.api_key}&prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
}
)
if r is None:
return album
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
album.add_list_of_other_objects(results)
return album
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
song = Song()
return song
def fetch_media_url(self, source: Source) -> dict:
def _get_best_format(format_list: List[Dict]) -> dict:
def _calc_score(_f: dict):
s = 0
_url = _f.get("url", "")
if "mime=audio" in _url:
s += 100
return s
highest_score = 0
best_format = {}
for _format in format_list:
_s = _calc_score(_format)
if _s >= highest_score:
highest_score = _s
best_format = _format
return best_format
ydl_res = self.ydl.extract_info(url=source.url, download=False)
_best_format = _get_best_format(ydl_res.get("formats", [{}]))
print(_best_format)
return {
"url": _best_format.get("url"),
"chunk_size": _best_format.get("downloader_options", {}).get("http_chunk_size", main_settings["chunk_size"]),
"headers": _best_format.get("http_headers", {}),
}
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
media = self.fetch_media_url(source)
result = self.download_connection.stream_into(
media["url"],
target,
name=desc,
raw_url=True,
raw_headers=True,
disable_cache=True,
headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
method="GET",
)
if result.is_fatal_error:
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
return result
def __del__(self):
self.ydl.__exit__()