Merge branch 'youtube_music' into experimental

This commit is contained in:
2023-09-12 11:06:26 +02:00
55 changed files with 11775 additions and 688 deletions

View File

@@ -1,5 +1,6 @@
from .encyclopaedia_metallum import EncyclopaediaMetallum
from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .abstract import Page, INDEPENDENT_DB_OBJECTS

View File

@@ -22,10 +22,10 @@ from ..objects import (
from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils import shared
from ..utils.shared import DOWNLOAD_PATH, DOWNLOAD_FILE, AUDIO_FORMAT
from ..utils.config import main_settings
from ..utils.support_classes import Query, DownloadResult
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@@ -44,7 +44,7 @@ class NamingDict(dict):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = AUDIO_FORMAT
self["audio_format"] = main_settings["audio_format"]
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
@@ -351,7 +351,7 @@ class Page:
if self.NO_ADDITIONAL_DATA_FROM_SONG:
skip_next_details = True
if not download_all and music_object.album_type in shared.ALBUM_TYPE_BLACKLIST:
if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]:
return DownloadResult()
if not isinstance(music_object, Song) or not self.NO_ADDITIONAL_DATA_FROM_SONG:
@@ -380,12 +380,12 @@ class Page:
if song.genre is None:
song.genre = naming_dict["genre"]
path_parts = Formatter().parse(DOWNLOAD_PATH)
file_parts = Formatter().parse(DOWNLOAD_FILE)
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
new_target = Target(
relative_to_music_dir=True,
path=DOWNLOAD_PATH.format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
file=DOWNLOAD_FILE.format(**{part[1]: naming_dict[part[1]] for part in file_parts})
path=main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
file=main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
@@ -397,7 +397,7 @@ class Page:
return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.")
temp_target: Target = Target(
path=shared.TEMP_DIR,
path=main_settings["temp_directory"],
file=str(random.randint(0, 999999))
)

View File

@@ -5,7 +5,7 @@ import pycountry
from urllib.parse import urlparse
from ..connection import Connection
from ..utils.shared import ENCYCLOPAEDIA_METALLUM_LOGGER
from ..utils.config import logging_settings
from .abstract import Page
from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType
@@ -108,12 +108,12 @@ def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Al
class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourcePages.ENCYCLOPAEDIA_METALLUM
LOGGER = ENCYCLOPAEDIA_METALLUM_LOGGER
LOGGER = logging_settings["metal_archives_logger"]
def __init__(self, **kwargs):
self.connection: Connection = Connection(
host="https://www.metal-archives.com/",
logger=ENCYCLOPAEDIA_METALLUM_LOGGER
logger=self.LOGGER
)
super().__init__(**kwargs)

View File

@@ -23,7 +23,7 @@ from ..objects import (
DatabaseObject,
Lyrics
)
from ..utils.shared import MUSIFY_LOGGER
from ..utils.config import logging_settings
from ..utils import string_processing, shared
from ..utils.support_classes import DownloadResult, Query
@@ -95,7 +95,7 @@ def parse_url(url: str) -> MusifyUrl:
try:
type_enum = MusifyTypes(path[1])
except ValueError as e:
MUSIFY_LOGGER.warning(f"{path[1]} is not yet implemented, add it to MusifyTypes")
logging_settings["musify_logger"].warning(f"{path[1]} is not yet implemented, add it to MusifyTypes")
raise e
return MusifyUrl(
@@ -110,7 +110,7 @@ def parse_url(url: str) -> MusifyUrl:
class Musify(Page):
# CHANGE
SOURCE_TYPE = SourcePages.MUSIFY
LOGGER = MUSIFY_LOGGER
LOGGER = logging_settings["musify_logger"]
HOST = "https://musify.club"

View File

@@ -21,7 +21,9 @@ from ..objects import (
from ..connection import Connection
from ..utils.string_processing import clean_song_title
from ..utils.support_classes import DownloadResult
from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403
from ..utils.config import youtube_settings, main_settings, logging_settings
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
"""
@@ -32,107 +34,14 @@ from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_S
"""
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment))
def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((PIPED_INSTANCE.scheme, PIPED_INSTANCE.netloc, path, params, query, fragment))
return urlunparse((youtube_settings["piped_instance"].scheme, youtube_settings["piped_instance"].netloc, path, params, query, fragment))
class YouTubeUrlType(Enum):
CHANNEL = "channel"
PLAYLIST = "playlist"
VIDEO = "watch"
NONE = ""
class YouTubeUrl:
"""
Artist
https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
Release
https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
Track
https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1
https://www.youtube.com/watch?v=SULFl39UjgY
"""
def __init__(self, url: str) -> None:
"""
Raises Index exception for wrong url, and value error for not found enum type
"""
self.id = ""
parsed = urlparse(url=url)
self.url_type: YouTubeUrlType
type_frag_list = parsed.path.split("/")
if len(type_frag_list) < 2:
self.url_type = YouTubeUrlType.NONE
else:
try:
self.url_type = YouTubeUrlType(type_frag_list[1].strip())
except ValueError:
self.url_type = YouTubeUrlType.NONE
if self.url_type == YouTubeUrlType.CHANNEL:
if len(type_frag_list) < 3:
self.couldnt_find_id(url)
else:
self.id = type_frag_list[2]
elif self.url_type == YouTubeUrlType.PLAYLIST:
query_stuff = parse_qs(parsed.query)
if "list" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["list"][0]
elif self.url_type == YouTubeUrlType.VIDEO:
query_stuff = parse_qs(parsed.query)
if "v" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["v"][0]
def couldnt_find_id(self, url: str):
YOUTUBE_LOGGER.warning(f"The id is missing: {url}")
self.url_type = YouTubeUrlType.NONE
@property
def api(self) -> str:
if self.url_type == YouTubeUrlType.CHANNEL:
return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}")
if self.url_type == YouTubeUrlType.PLAYLIST:
return get_invidious_url(path=f"/api/v1/playlists/{id}")
if self.url_type == YouTubeUrlType.VIDEO:
return get_invidious_url(path=f"/api/v1/videos/{self.id}")
return get_invidious_url()
@property
def normal(self) -> str:
if self.url_type.CHANNEL:
return get_invidious_url(path=f"/channel/{self.id}")
if self.url_type.PLAYLIST:
return get_invidious_url(path="/playlist", query=f"list={self.id}")
if self.url_type.VIDEO:
return get_invidious_url(path="/watch", query=f"v={self.id}")
class YouTube(Page):
class YouTube(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = YOUTUBE_LOGGER
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
@@ -150,7 +59,7 @@ class YouTube(Page):
self.download_connection: Connection = Connection(
host="https://www.youtube.com/",
logger=self.LOGGER,
sleep_after_404=SLEEP_AFTER_YOUTUBE_403
sleep_after_404=youtube_settings["sleep_after_youtube_403"]
)
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
@@ -159,17 +68,6 @@ class YouTube(Page):
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = {
YouTubeUrlType.CHANNEL: Artist,
YouTubeUrlType.PLAYLIST: Album,
YouTubeUrlType.VIDEO: Song,
}
parsed = YouTubeUrl(source.url)
if parsed.url_type in _url_type:
return _url_type[parsed.url_type]
def general_search(self, search_query: str) -> List[DatabaseObject]:
return self.artist_search(Artist(name=search_query, dynamic=True))
@@ -418,7 +316,7 @@ class YouTube(Page):
bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= BITRATE:
if bitrate >= main_settings["bitrate"]:
best_bitrate = bitrate
audio_format = possible_format
break
@@ -436,7 +334,7 @@ class YouTube(Page):
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
if not ENABLE_SPONSOR_BLOCK:
if not youtube_settings["use_sponsor_block"]:
return []
parsed = YouTubeUrl(source.url)

View File

@@ -0,0 +1 @@
from .youtube_music import YoutubeMusic

View File

@@ -0,0 +1,105 @@
from typing import List, Optional, Dict, Type
from enum import Enum
from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ._music_object_render import parse_run_list, parse_run_element
LOGGER = logging_settings["youtube_music_logger"]
def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results = parse_run_list(renderer.get("title", {}).get("runs", []))
for sub_renderer in renderer.get("contents", []):
results.extend(parse_renderer(sub_renderer))
return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", []))
def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
results = []
for i, collumn in enumerate(renderer.get("flexColumns", [])):
_r = parse_renderer(collumn)
if i == 0 and len(_r) == 0:
renderer["text"] = collumn.get("musicResponsiveListItemFlexColumnRenderer", {}).get("text", {}).get("runs", [{}])[0].get("text")
results.extend(_r)
_r = parse_run_element(renderer)
if _r is not None:
results.append(_r)
song_list: List[Song] = []
album_list: List[Album] = []
artist_list: List[Artist] = []
_map: Dict[Type[DatabaseObject], List[DatabaseObject]] = {Song: song_list, Album: album_list, Artist: artist_list}
for result in results:
_map[type(result)].append(result)
for song in song_list:
song.album_collection.extend(album_list)
song.main_artist_collection.extend(artist_list)
for album in album_list:
album.artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
return results
def music_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
result = []
for subrenderer in renderer.get("contents"):
result.extend(parse_renderer(subrenderer))
return result
def music_carousel_shelf_renderer(renderer: dict):
return music_shelf_renderer(renderer=renderer)
def music_two_row_item_renderer(renderer: dict):
return parse_run_list(renderer.get("title", {}).get("runs", []))
RENDERER_PARSERS = {
"musicCardShelfRenderer": music_card_shelf_renderer,
"musicResponsiveListItemRenderer": music_responsive_list_item_renderer,
"musicResponsiveListItemFlexColumnRenderer": music_responsive_list_item_flex_column_renderer,
"musicShelfRenderer": music_card_shelf_renderer,
"musicCarouselShelfRenderer": music_carousel_shelf_renderer,
"musicTwoRowItemRenderer": music_two_row_item_renderer,
"itemSectionRenderer": lambda _: [],
}
def parse_renderer(renderer: dict) -> List[DatabaseObject]:
result: List[DatabaseObject] = []
for renderer_name, renderer in renderer.items():
if renderer_name not in RENDERER_PARSERS:
LOGGER.warning(f"Can't parse the renderer {renderer_name}.")
continue
result.extend(RENDERER_PARSERS[renderer_name](renderer))
return result

View File

@@ -0,0 +1,80 @@
from typing import List, Optional
from enum import Enum
from ...utils.config import youtube_settings, logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
LOGGER = logging_settings["youtube_music_logger"]
SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC
class PageType(Enum):
ARTIST = "MUSIC_PAGE_TYPE_ARTIST"
ALBUM = "MUSIC_PAGE_TYPE_ALBUM"
CHANNEL = "MUSIC_PAGE_TYPE_USER_CHANNEL"
PLAYLIST = "MUSIC_PAGE_TYPE_PLAYLIST"
SONG = "MUSIC_VIDEO_TYPE_ATV"
VIDEO = "MUSIC_VIDEO_TYPE_UGC"
OFFICIAL_MUSIC_VIDEO = "MUSIC_VIDEO_TYPE_OMV"
def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if "navigationEndpoint" not in run_element:
return
_temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
element_type = PageType.SONG
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
if not is_video:
page_type_string = navigation_endpoint.get("browseEndpointContextSupportedConfigs", {}).get("browseEndpointContextMusicConfig", {}).get("pageType", "")
element_type = PageType(page_type_string)
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_text = run_element.get("text")
if element_id is None or element_text is None:
LOGGER.warning("Couldn't find either the id or text of a Youtube music element.")
return
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=element_text, source_list=[source])
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
return Artist(name=element_text, source_list=[source])
if element_type == PageType.ALBUM or (element_type == PageType.PLAYLIST and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/playlist?list={element_id}")
return Album(title=element_text, source_list=[source])
LOGGER.debug(f"Type {page_type_string} wasn't implemented.")
def parse_run_list(run_list: List[dict]) -> List[DatabaseObject]:
music_object_list: List[DatabaseObject] = []
for run_renderer in run_list:
music_object = parse_run_element(run_renderer)
if music_object is None:
continue
music_object_list.append(music_object)
return music_object_list

View File

@@ -0,0 +1,217 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
import sponsorblock
from sponsorblock.errors import HTTPException, NotFoundException
from ...objects import Source, DatabaseObject, Song, Target
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ...connection import Connection
from ...utils.support_classes import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((youtube_settings["invidious_instance"].scheme, youtube_settings["invidious_instance"].netloc, path, params, query, fragment))
class YouTubeUrlType(Enum):
CHANNEL = "channel"
PLAYLIST = "playlist"
VIDEO = "watch"
NONE = ""
class YouTubeUrl:
"""
Artist
https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
Release
https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
Track
https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1
https://www.youtube.com/watch?v=SULFl39UjgY
"""
def __init__(self, url: str) -> None:
self.SOURCE_TYPE = SourcePages.YOUTUBE
"""
Raises Index exception for wrong url, and value error for not found enum type
"""
self.id = ""
parsed = urlparse(url=url)
if parsed.netloc == "music.youtube.com":
self.SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
self.url_type: YouTubeUrlType
type_frag_list = parsed.path.split("/")
if len(type_frag_list) < 2:
self.url_type = YouTubeUrlType.NONE
else:
try:
self.url_type = YouTubeUrlType(type_frag_list[1].strip())
except ValueError:
self.url_type = YouTubeUrlType.NONE
if self.url_type == YouTubeUrlType.CHANNEL:
if len(type_frag_list) < 3:
self.couldnt_find_id(url)
else:
self.id = type_frag_list[2]
elif self.url_type == YouTubeUrlType.PLAYLIST:
query_stuff = parse_qs(parsed.query)
if "list" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["list"][0]
elif self.url_type == YouTubeUrlType.VIDEO:
query_stuff = parse_qs(parsed.query)
if "v" not in query_stuff:
self.couldnt_find_id(url)
else:
self.id = query_stuff["v"][0]
def couldnt_find_id(self, url: str):
logging_settings["youtube_logger"].warning(f"The id is missing: {url}")
self.url_type = YouTubeUrlType.NONE
@property
def api(self) -> str:
if self.url_type == YouTubeUrlType.CHANNEL:
return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}")
if self.url_type == YouTubeUrlType.PLAYLIST:
return get_invidious_url(path=f"/api/v1/playlists/{id}")
if self.url_type == YouTubeUrlType.VIDEO:
return get_invidious_url(path=f"/api/v1/videos/{self.id}")
return get_invidious_url()
@property
def normal(self) -> str:
if self.url_type.CHANNEL:
return get_invidious_url(path=f"/channel/{self.id}")
if self.url_type.PLAYLIST:
return get_invidious_url(path="/playlist", query=f"list={self.id}")
if self.url_type.VIDEO:
return get_invidious_url(path="/watch", query=f"v={self.id}")
class SuperYouTube(Page):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
def __init__(self, *args, **kwargs):
self.download_connection: Connection = Connection(
host="https://www.youtube.com/",
logger=self.LOGGER,
sleep_after_404=youtube_settings["sleep_after_youtube_403"]
)
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
_sponsorblock_connection: Connection = Connection(host="https://sponsor.ajay.app/")
self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = {
YouTubeUrlType.CHANNEL: Artist,
YouTubeUrlType.PLAYLIST: Album,
YouTubeUrlType.VIDEO: Song,
}
parsed = YouTubeUrl(source.url)
if parsed.url_type in _url_type:
return _url_type[parsed.url_type]
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
"""
1. getting the optimal source
Only audio sources allowed
not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger
2. download it
:param source:
:param target:
:param desc:
:return:
"""
r = self.connection.get(YouTubeUrl(source.url).api)
if r is None:
return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance")
audio_format = None
best_bitrate = 0
for possible_format in r.json()["adaptiveFormats"]:
format_type: str = possible_format["type"]
if not format_type.startswith("audio"):
continue
bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= main_settings["bitrate"]:
best_bitrate = bitrate
audio_format = possible_format
break
if bitrate > best_bitrate:
best_bitrate = bitrate
audio_format = possible_format
if audio_format is None:
return DownloadResult(error_message="Couldn't find the download link.")
endpoint = audio_format["url"]
return self.download_connection.stream_into(endpoint, target, description=desc, raw_url=True)
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
if not youtube_settings["use_sponsor_block"]:
return []
parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.VIDEO:
self.LOGGER.warning(f"{source.url} is no video url.")
return []
segments = []
try:
segments = self.sponsorblock_client.get_skip_segments(parsed.id)
except NotFoundException:
self.LOGGER.debug(f"No sponsor found for the video {parsed.id}.")
except HTTPException as e:
self.LOGGER.warning(f"{e}")
return [(segment.start, segment.end) for segment in segments]

View File

@@ -0,0 +1,353 @@
from typing import Dict, List, Optional, Set, Type
from urllib.parse import urlparse, urlunparse, quote, parse_qs
import logging
import random
import json
from dataclasses import dataclass
import re
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.functions import get_current_millis
if DEBUG:
from ...utils.debug_utils import dump_to_file
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ...connection import Connection
from ...utils.support_classes import DownloadResult
from ._list_render import parse_renderer
from .super_youtube import SuperYouTube
def get_youtube_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse(("https", "music.youtube.com", path, params, query, fragment))
class YoutubeMusicConnection(Connection):
"""
===heartbeat=timings=for=YOUTUBEMUSIC===
96.27
98.16
100.04
101.93
103.82
--> average delay in between: 1.8875 min
"""
def __init__(self, logger: logging.Logger, accept_language: str):
# https://stackoverflow.com/questions/30561260/python-change-accept-language-using-requests
super().__init__(
host="https://music.youtube.com/",
logger=logger,
heartbeat_interval=113.25,
header_values={
"Accept-Language": accept_language
}
)
# cookie consent for youtube
# https://stackoverflow.com/a/66940841/16804841 doesn't work
for cookie_key, cookie_value in youtube_settings["youtube_music_consent_cookies"].items():
self.session.cookies.set(
name=cookie_key,
value=cookie_value,
path='/', domain='.youtube.com'
)
def heartbeat(self):
r = self.get("https://music.youtube.com/verify_session", is_heartbeat=True)
if r is None:
self.heartbeat_failed()
string = r.content.decode("utf-8")
data = json.loads(string[string.index("{"):])
success: bool = data["success"]
if not success:
self.heartbeat_failed()
@dataclass
class YouTubeMusicCredentials:
api_key: str
# ctoken is probably short for continue-token
# It is probably not strictly necessary, but hey :))
ctoken: str
# the context in requests
context: dict
class YoutubeMusic(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
LOGGER = logging_settings["youtube_music_logger"]
def __init__(self, *args, **kwargs):
self.connection: YoutubeMusicConnection = YoutubeMusicConnection(logger=self.LOGGER, accept_language="en-US,en;q=0.5")
self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials(
api_key=youtube_settings["youtube_music_api_key"],
ctoken="",
context=youtube_settings["youtube_music_innertube_context"]
)
self.start_millis = get_current_millis()
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
self._fetch_from_main_page()
super().__init__(*args, **kwargs)
def _fetch_from_main_page(self):
"""
===API=KEY===
AIzaSyC9XL3ZjWddXya6X74dJoCTL-WEYFDNX30
can be found at `view-source:https://music.youtube.com/`
search for: "innertubeApiKey"
"""
r = self.connection.get("https://music.youtube.com/")
if r is None:
return
if urlparse(r.url).netloc == "consent.youtube.com":
self.LOGGER.info(f"Making cookie consent request for {type(self).__name__}.")
r = self.connection.post("https://consent.youtube.com/save", data={
'gl': 'DE',
'm': '0',
'app': '0',
'pc': 'ytm',
'continue': 'https://music.youtube.com/?cbrd=1',
'x': '6',
'bl': 'boq_identityfrontenduiserver_20230905.04_p0',
'hl': 'en',
'src': '1',
'cm': '2',
'set_ytc': 'true',
'set_apyt': 'true',
'set_eom': 'false'
})
if r is None:
return
# load cookie dict from settings
cookie_dict = youtube_settings["youtube_music_consent_cookies"]
for cookie in r.cookies:
cookie_dict[cookie.name] = cookie.value
for cookie in self.connection.session.cookies:
cookie_dict[cookie.name] = cookie.value
# save cookies in settings
youtube_settings["youtube_music_consent_cookies"] = cookie_dict
r = self.connection.get("https://music.youtube.com/")
if r is None:
return
content = r.text
if DEBUG:
dump_to_file(f"youtube_music_index.html", r.text, exit_after_dump=False)
# api key
api_key_pattern = (
r"(?<=\"innertubeApiKey\":\")(.*?)(?=\")",
r"(?<=\"INNERTUBE_API_KEY\":\")(.*?)(?=\")",
)
api_keys = []
for api_key_patter in api_key_pattern:
api_keys.extend(re.findall(api_key_patter, content))
found_a_good_api_key = False
for api_key in api_keys:
# save the first api key
api_key = api_keys[0]
try:
youtube_settings["youtube_music_api_key"] = api_key
except SettingValueError:
continue
found_a_good_api_key = True
break
if found_a_good_api_key:
self.LOGGER.info(f"Found a valid API-KEY for {type(self).__name__}: \"{api_key}\"")
else:
self.LOGGER.error(f"Couldn't find an API-KEY for {type(self).__name__}. :((")
# context
context_pattern = r"(?<=\"INNERTUBE_CONTEXT\":{)(.*?)(?=},\"INNERTUBE_CONTEXT_CLIENT_NAME\":)"
found_context = False
for context_string in re.findall(context_pattern, content, re.M):
try:
youtube_settings["youtube_music_innertube_context"] = json.loads("{" + context_string + "}")
found_context = True
except json.decoder.JSONDecodeError:
continue
self.credentials.context = youtube_settings["youtube_music_innertube_context"]
break
if not found_context:
self.LOGGER.warning(f"Couldn't find a context for {type(self).__name__}.")
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
search_query = search_query.strip()
urlescaped_query: str = quote(search_query.strip().replace(" ", "+"))
# approximate the ammount of time it would take to type the search, because google for some reason tracks that
LAST_EDITED_TIME = get_current_millis() - random.randint(0, 20)
_estimated_time = sum(len(search_query) * random.randint(50, 100) for _ in search_query.strip())
FIRST_EDITED_TIME = LAST_EDITED_TIME - _estimated_time if LAST_EDITED_TIME - self.start_millis > _estimated_time else random.randint(50, 100)
query_continue = "" if self.credentials.ctoken == "" else f"&ctoken={self.credentials.ctoken}&continuation={self.credentials.ctoken}"
# construct the request
r = self.connection.post(
url=get_youtube_url(path="/youtubei/v1/search", query=f"key={self.credentials.api_key}&prettyPrint=false"+query_continue),
json={
"context": {**self.credentials.context, "adSignalsInfo":{"params":[]}},
"query": search_query,
"suggestStats": {
"clientName": "youtube-music",
"firstEditTimeMsec": FIRST_EDITED_TIME,
"inputMethod": "KEYBOARD",
"lastEditTimeMsec": LAST_EDITED_TIME,
"originalQuery": search_query,
"parameterValidationStatus": "VALID_PARAMETERS",
"searchMethod": "ENTER_KEY",
"validationStatus": "VALID",
"zeroPrefixEnabled": True,
"availableSuggestions": []
}
},
headers={
"Referer": get_youtube_url(path=f"/search", query=f"q={urlescaped_query}")
}
)
renderer_list = r.json().get("contents", {}).get("tabbedSearchResultsRenderer", {}).get("tabs", [{}])[0].get("tabRenderer").get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
# construct the request
url = urlparse(source.url)
browse_id = url.path.replace("/channel/", "")
r = self.connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"key={self.credentials.api_key}&prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo":{"params":[]}}
}
)
if r is None:
return artist
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
artist.add_list_of_other_objects(results)
return artist
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album = Album()
parsed_url = urlparse(source.url)
list_id_list = parse_qs(parsed_url.query)['list']
if len(list_id_list) <= 0:
return album
browse_id = list_id_list[0]
r = self.connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"key={self.credentials.api_key}&prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo":{"params":[]}}
}
)
if r is None:
return album
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
album.add_list_of_other_objects(results)
return album
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
print(source)
return Song()