finished top results

This commit is contained in:
Hellow 2023-07-28 02:04:29 +02:00
parent c682342303
commit a52c441322
3 changed files with 69 additions and 12 deletions

View File

@ -29,7 +29,7 @@ if __name__ == "__main__":
] ]
youtube_music_test = [ youtube_music_test = [
"s: billie eilish" "s: psychonaut 4"
] ]
music_kraken.cli.download(genre="test", command_list=youtube_music_test, process_metadata_anyway=True) music_kraken.cli.download(genre="test", command_list=youtube_music_test, process_metadata_anyway=True)

View File

@ -1,4 +1,4 @@
from typing import List, Optional from typing import List, Optional, Dict, Type
from enum import Enum from enum import Enum
from ...utils.shared import YOUTUBE_MUSIC_LOGGER as LOGGER from ...utils.shared import YOUTUBE_MUSIC_LOGGER as LOGGER
@ -13,15 +13,62 @@ from ...objects import (
Label, Label,
Target Target
) )
from ._music_object_render import parse_run_list from ._music_object_render import parse_run_list, parse_run_element
def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]: def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("title", {}).get("runs", [])) results = parse_run_list(renderer.get("title", {}).get("runs", []))
for sub_renderer in renderer.get("contents", []):
results.extend(parse_renderer(sub_renderer))
return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", []))
def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
results = []
for i, collumn in enumerate(renderer.get("flexColumns", [])):
_r = parse_renderer(collumn)
if i == 0 and len(_r) == 0:
renderer["text"] = collumn.get("musicResponsiveListItemFlexColumnRenderer", {}).get("text", {}).get("runs", [{}])[0].get("text")
results.extend(_r)
_r = parse_run_element(renderer)
if _r is not None:
results.append(_r)
song_list: List[Song] = []
album_list: List[Album] = []
artist_list: List[Artist] = []
_map: Dict[Type[DatabaseObject], List[DatabaseObject]] = {Song: song_list, Album: album_list, Artist: artist_list}
for result in results:
_map[type(result)].append(result)
for song in song_list:
song.album_collection.extend(album_list)
song.main_artist_collection.extend(artist_list)
for album in album_list:
album.artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
return results
RENDERER_PARSERS = { RENDERER_PARSERS = {
"musicCardShelfRenderer": music_card_shelf_renderer "musicCardShelfRenderer": music_card_shelf_renderer,
"musicResponsiveListItemRenderer": music_responsive_list_item_renderer,
"musicResponsiveListItemFlexColumnRenderer": music_responsive_list_item_flex_column_renderer
} }
def parse_renderer(renderer: dict) -> List[DatabaseObject]: def parse_renderer(renderer: dict) -> List[DatabaseObject]:

View File

@ -23,21 +23,34 @@ class PageType(Enum):
ALBUM = "MUSIC_PAGE_TYPE_ALBUM" ALBUM = "MUSIC_PAGE_TYPE_ALBUM"
CHANNEL = "MUSIC_PAGE_TYPE_USER_CHANNEL" CHANNEL = "MUSIC_PAGE_TYPE_USER_CHANNEL"
PLAYLIST = "MUSIC_PAGE_TYPE_PLAYLIST" PLAYLIST = "MUSIC_PAGE_TYPE_PLAYLIST"
SONG = "song"
def parse_run_element(run_element: dict) -> Optional[DatabaseObject]: def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
navigation_endpoint = run_element.get("navigationEndpoint", {}).get("browseEndpoint", {}) if "navigationEndpoint" not in run_element:
return
_temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
element_type = PageType.SONG
if not is_video:
page_type_string = navigation_endpoint.get("browseEndpointContextSupportedConfigs", {}).get("browseEndpointContextMusicConfig", {}).get("pageType", "") page_type_string = navigation_endpoint.get("browseEndpointContextSupportedConfigs", {}).get("browseEndpointContextMusicConfig", {}).get("pageType", "")
element_type = PageType(page_type_string) element_type = PageType(page_type_string)
element_id = navigation_endpoint.get("browseId") element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_text = run_element.get("text") element_text = run_element.get("text")
if element_id is None or element_text is None: if element_id is None or element_text is None:
LOGGER.warning("Couldn't find either the id or text of a Youtube music element.") LOGGER.warning("Couldn't find either the id or text of a Youtube music element.")
return return
if is_video:
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=element_text, source_list=[source])
if element_type == PageType.ARTIST or element_type == PageType.CHANNEL: if element_type == PageType.ARTIST or element_type == PageType.CHANNEL:
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
return Artist(name=element_text, source_list=[source]) return Artist(name=element_text, source_list=[source])
@ -53,9 +66,6 @@ def parse_run_list(run_list: List[dict]) -> List[DatabaseObject]:
music_object_list: List[DatabaseObject] = [] music_object_list: List[DatabaseObject] = []
for run_renderer in run_list: for run_renderer in run_list:
if "navigationEndpoint" not in run_renderer:
continue
music_object = parse_run_element(run_renderer) music_object = parse_run_element(run_renderer)
if music_object is None: if music_object is None:
continue continue