Compare commits
11 Commits
9c369b421d
...
fix/reinde
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b22c80e5c | ||
|
|
6805d1cbe6 | ||
|
|
542d59562a | ||
|
|
131be537c8 | ||
| ed8cc914be | |||
| 5ed902489f | |||
| 90d685da81 | |||
| be7e91cb7b | |||
| 7e5a1f84ae | |||
| d9105fb55a | |||
| a7711761f9 |
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@@ -29,9 +29,11 @@
|
||||
"pathvalidate",
|
||||
"Referer",
|
||||
"sponsorblock",
|
||||
"tracklist",
|
||||
"tracksort",
|
||||
"translit",
|
||||
"unmap",
|
||||
"youtube"
|
||||
"youtube",
|
||||
"youtubei"
|
||||
]
|
||||
}
|
||||
@@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
if __name__ == "__main__":
|
||||
commands = [
|
||||
"s: #a Crystal F",
|
||||
"d: 20"
|
||||
"s: #a Psychonaut 4",
|
||||
"d: 0"
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -4,9 +4,6 @@ from music_kraken.objects import Song, Album, Artist, Collection
|
||||
if __name__ == "__main__":
|
||||
song_1 = Song(
|
||||
title="song",
|
||||
main_artist_list=[Artist(
|
||||
name="main_artist"
|
||||
)],
|
||||
feature_artist_list=[Artist(
|
||||
name="main_artist"
|
||||
)]
|
||||
|
||||
@@ -79,7 +79,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
|
||||
with temp_target.open("wb") as f:
|
||||
f.write(r.content)
|
||||
|
||||
converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
|
||||
converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}")
|
||||
with Image.open(temp_target.file_path) as img:
|
||||
# crop the image if it isn't square in the middle with minimum data loss
|
||||
width, height = img.size
|
||||
|
||||
@@ -84,26 +84,18 @@ class Collection(Generic[T]):
|
||||
for e in self._data:
|
||||
self._map_element(e, no_unmap=True)
|
||||
|
||||
|
||||
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
|
||||
self._remap()
|
||||
|
||||
if __object.id in self._indexed_from_id:
|
||||
return self._indexed_values["id"][__object.id]
|
||||
|
||||
for name, value in __object.indexing_values:
|
||||
if value in self._indexed_values[name]:
|
||||
return self._indexed_values[name][value]
|
||||
|
||||
return None
|
||||
|
||||
def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
|
||||
"""
|
||||
This function merges the other object into the existing object, which is contained in the current collection.
|
||||
This also modifies the correct mapping.
|
||||
"""
|
||||
|
||||
if existing.id == other.id:
|
||||
return
|
||||
|
||||
self._map_element(existing)
|
||||
existing.merge(other, **kwargs)
|
||||
|
||||
def _append_new_object(self, other: T, **kwargs):
|
||||
"""
|
||||
@@ -112,7 +104,6 @@ class Collection(Generic[T]):
|
||||
"""
|
||||
|
||||
self._data.append(other)
|
||||
self._map_element(other)
|
||||
|
||||
# all of the existing hooks to get the defined datastructure
|
||||
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
||||
@@ -127,17 +118,10 @@ class Collection(Generic[T]):
|
||||
if a is b:
|
||||
continue
|
||||
|
||||
"""
|
||||
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
|
||||
if id(b) in no_sync_collection:
|
||||
continue
|
||||
"""
|
||||
object_trace(f"Syncing [{a}] = [{b}]")
|
||||
|
||||
|
||||
b_data = b.data.copy()
|
||||
b_collection_for = b._collection_for.copy()
|
||||
# no_sync_collection.add(id(b))
|
||||
|
||||
del b
|
||||
|
||||
@@ -157,7 +141,6 @@ class Collection(Generic[T]):
|
||||
:return:
|
||||
"""
|
||||
|
||||
|
||||
if other is None:
|
||||
return
|
||||
if other.id in self._indexed_from_id:
|
||||
@@ -165,66 +148,57 @@ class Collection(Generic[T]):
|
||||
|
||||
object_trace(f"Appending {other.option_string} to {self}")
|
||||
|
||||
push_to: Optional[Tuple[Collection, T]] = None
|
||||
for c in self.push_to:
|
||||
r = c._find_object(other)
|
||||
if r is not None:
|
||||
push_to_collection = (c, r)
|
||||
output("found push to", found, other, self, color=BColors.RED, sep="\t")
|
||||
break
|
||||
|
||||
pull_from: Optional[Tuple[Collection, T]] = None
|
||||
for c in self.pull_from:
|
||||
r = c._find_object(other)
|
||||
if r is not None:
|
||||
pull_from_collection = (c, r)
|
||||
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
|
||||
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
|
||||
other.merge(r, **kwargs)
|
||||
c.remove(r, existing=r, **kwargs)
|
||||
break
|
||||
|
||||
if pull_from is not None:
|
||||
pull_from[0].remove(pull_from[1])
|
||||
existing_object = self._find_object(other)
|
||||
|
||||
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
|
||||
# switching collection in the case of push to
|
||||
for c in self.push_to:
|
||||
r = c._find_object(other)
|
||||
if r is not None:
|
||||
output("found push to", r, other, self, color=BColors.RED, sep="\t")
|
||||
return c.append(other, **kwargs)
|
||||
|
||||
if existing_object is None:
|
||||
if push_to is None:
|
||||
self._append_new_object(other, **kwargs)
|
||||
else:
|
||||
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
|
||||
|
||||
if pull_from is not None:
|
||||
self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs)
|
||||
self._append_new_object(other, **kwargs)
|
||||
else:
|
||||
self._merge_into_contained_object(existing_object, other, **kwargs)
|
||||
if pull_from is not None:
|
||||
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
|
||||
existing_object.merge(other, **kwargs)
|
||||
|
||||
def remove(self, *other_list: List[T], silent: bool = False):
|
||||
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs):
|
||||
for other in other_list:
|
||||
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
|
||||
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
|
||||
if existing is None:
|
||||
if not silent:
|
||||
raise ValueError(f"Object {other} not found in {self}")
|
||||
return other
|
||||
|
||||
"""
|
||||
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
||||
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
|
||||
|
||||
for attribute, new_object in self.append_object_to_attribute.items():
|
||||
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
|
||||
"""
|
||||
|
||||
self._data.remove(existing)
|
||||
self._unmap_element(existing)
|
||||
|
||||
def contains(self, other: T) -> bool:
|
||||
return self._find_object(other) is not None
|
||||
def contains(self, __object: T) -> bool:
|
||||
return self._find_object(__object) is not None
|
||||
|
||||
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
|
||||
if other_collections is None:
|
||||
return
|
||||
|
||||
for __object in other_collections:
|
||||
self.append(__object, **kwargs)
|
||||
for other_object in other_collections:
|
||||
self.append(other_object, **kwargs)
|
||||
|
||||
@property
|
||||
def data(self) -> List[T]:
|
||||
@@ -241,6 +215,7 @@ class Collection(Generic[T]):
|
||||
yield from self._data
|
||||
|
||||
def __merge__(self, other: Collection, **kwargs):
|
||||
object_trace(f"merging {str(self)} | {str(other)}")
|
||||
self.extend(other, **kwargs)
|
||||
|
||||
def __getitem__(self, item: int):
|
||||
@@ -250,3 +225,9 @@ class Collection(Generic[T]):
|
||||
if item >= len(self._data):
|
||||
return default
|
||||
return self._data[item]
|
||||
|
||||
def __eq__(self, other: Collection) -> bool:
|
||||
if self.empty and other.empty:
|
||||
return True
|
||||
|
||||
return self._data == other._data
|
||||
|
||||
@@ -60,6 +60,13 @@ class InnerData:
|
||||
self._fetched_from.update(__other._fetched_from)
|
||||
|
||||
for key, value in __other.__dict__.copy().items():
|
||||
if key.startswith("_"):
|
||||
continue
|
||||
|
||||
if hasattr(value, "__is_collection__") and key in self.__dict__:
|
||||
self.__getattribute__(key).__merge__(value, **kwargs)
|
||||
continue
|
||||
|
||||
# just set the other value if self doesn't already have it
|
||||
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
|
||||
self.__setattr__(key, value)
|
||||
@@ -67,9 +74,8 @@ class InnerData:
|
||||
|
||||
# if the object of value implemented __merge__, it merges
|
||||
existing = self.__getattribute__(key)
|
||||
if hasattr(type(existing), "__merge__"):
|
||||
if hasattr(existing, "__merge__"):
|
||||
existing.__merge__(value, **kwargs)
|
||||
continue
|
||||
|
||||
|
||||
class OuterProxy:
|
||||
|
||||
@@ -60,7 +60,7 @@ def get_collection_string(
|
||||
def get_element_str(element) -> str:
|
||||
nonlocal add_id
|
||||
r = element.title_string.strip()
|
||||
if add_id:
|
||||
if add_id and False:
|
||||
r += " " + str(element.id)
|
||||
return r
|
||||
|
||||
@@ -156,7 +156,7 @@ class Song(Base):
|
||||
return
|
||||
|
||||
if isinstance(object_list, Artist):
|
||||
self.main_artist_collection.extend(object_list)
|
||||
self.feature_artist_collection.extend(object_list)
|
||||
return
|
||||
|
||||
if isinstance(object_list, Album):
|
||||
|
||||
@@ -122,6 +122,9 @@ class SourceCollection:
|
||||
yield from self._page_to_source_list[page]
|
||||
|
||||
def append(self, source: Source):
|
||||
if source is None:
|
||||
return
|
||||
|
||||
existing_source = None
|
||||
for key in source.indexing_values:
|
||||
if key in self._indexed_sources:
|
||||
|
||||
@@ -690,13 +690,6 @@ class Musify(Page):
|
||||
new_song = self._parse_song_card(card_soup)
|
||||
album.song_collection.append(new_song)
|
||||
|
||||
if stop_at_level > 1:
|
||||
song: Song
|
||||
for song in album.song_collection:
|
||||
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
|
||||
for source in sources:
|
||||
song.merge(self.fetch_song(source=source))
|
||||
|
||||
album.update_tracksort()
|
||||
|
||||
return album
|
||||
|
||||
@@ -25,7 +25,6 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
|
||||
results.extend(parse_renderer(sub_renderer))
|
||||
return results
|
||||
|
||||
|
||||
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
|
||||
return parse_run_list(renderer.get("text", {}).get("runs", []))
|
||||
|
||||
@@ -54,21 +53,11 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
|
||||
for result in results:
|
||||
_map[type(result)].append(result)
|
||||
|
||||
for song in song_list:
|
||||
song.album_collection.extend(album_list)
|
||||
if len(song_list) == 1:
|
||||
song = song_list[0]
|
||||
song.feature_artist_collection.extend(artist_list)
|
||||
|
||||
if len(song_list) > 0:
|
||||
return song_list
|
||||
|
||||
for album in album_list:
|
||||
album.artist_collection.extend(artist_list)
|
||||
|
||||
if len(album_list) > 0:
|
||||
return album_list
|
||||
|
||||
if len(artist_list) > 0:
|
||||
return artist_list
|
||||
song.album_collection.extend(album_list)
|
||||
return [song]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
||||
_temp_nav = run_element.get("navigationEndpoint", {})
|
||||
is_video = "watchEndpoint" in _temp_nav
|
||||
|
||||
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
|
||||
navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {}))
|
||||
|
||||
element_type = PageType.SONG
|
||||
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
|
||||
@@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
|
||||
element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId"))
|
||||
element_text = run_element.get("text")
|
||||
|
||||
if element_id is None or element_text is None:
|
||||
@@ -60,7 +60,11 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
||||
|
||||
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
|
||||
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
|
||||
return Song(title=clean_song_title(element_text), source_list=[source])
|
||||
|
||||
return Song(
|
||||
title=clean_song_title(element_text),
|
||||
source_list=[source]
|
||||
)
|
||||
|
||||
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
|
||||
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
|
||||
|
||||
@@ -8,6 +8,7 @@ import json
|
||||
from dataclasses import dataclass
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from collections import defaultdict
|
||||
|
||||
import youtube_dl
|
||||
from youtube_dl.extractor.youtube import YoutubeIE
|
||||
@@ -17,7 +18,7 @@ from ...utils.exception.config import SettingValueError
|
||||
from ...utils.config import main_settings, youtube_settings, logging_settings
|
||||
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
|
||||
from ...utils.string_processing import clean_song_title
|
||||
from ...utils import get_current_millis
|
||||
from ...utils import get_current_millis, traverse_json_path
|
||||
|
||||
from ...utils import dump_to_file
|
||||
|
||||
@@ -30,12 +31,16 @@ from ...objects import (
|
||||
Song,
|
||||
Album,
|
||||
Label,
|
||||
Target
|
||||
Target,
|
||||
Lyrics,
|
||||
FormattedText
|
||||
)
|
||||
from ...connection import Connection
|
||||
from ...utils.enums.album import AlbumType
|
||||
from ...utils.support_classes.download_result import DownloadResult
|
||||
|
||||
from ._list_render import parse_renderer
|
||||
from ._music_object_render import parse_run_element
|
||||
from .super_youtube import SuperYouTube
|
||||
|
||||
|
||||
@@ -162,6 +167,12 @@ class MusicKrakenYoutubeIE(YoutubeIE):
|
||||
|
||||
|
||||
|
||||
ALBUM_TYPE_MAP = {
|
||||
"Single": AlbumType.SINGLE,
|
||||
"Album": AlbumType.STUDIO_ALBUM,
|
||||
"EP": AlbumType.EP,
|
||||
}
|
||||
|
||||
|
||||
class YoutubeMusic(SuperYouTube):
|
||||
# CHANGE
|
||||
@@ -401,7 +412,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
return results
|
||||
|
||||
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
||||
artist = Artist()
|
||||
artist = Artist(source_list=[source])
|
||||
|
||||
# construct the request
|
||||
url = urlparse(source.url)
|
||||
@@ -421,6 +432,19 @@ class YoutubeMusic(SuperYouTube):
|
||||
if DEBUG:
|
||||
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
||||
|
||||
# artist details
|
||||
data: dict = r.json()
|
||||
header = data.get("header", {})
|
||||
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
|
||||
|
||||
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
||||
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
||||
|
||||
if len(title_runs) > 0:
|
||||
artist.name = title_runs[0].get("text", artist.name)
|
||||
|
||||
|
||||
# fetch discography
|
||||
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
||||
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
||||
|
||||
@@ -465,6 +489,46 @@ class YoutubeMusic(SuperYouTube):
|
||||
if DEBUG:
|
||||
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
||||
|
||||
data = r.json()
|
||||
|
||||
# album details
|
||||
header = data.get("header", {})
|
||||
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
|
||||
|
||||
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
||||
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
||||
|
||||
if len(title_runs) > 0:
|
||||
album.title = title_runs[0].get("text", album.title)
|
||||
|
||||
def other_parse_run(run: dict) -> str:
|
||||
nonlocal album
|
||||
|
||||
if "text" not in run:
|
||||
return
|
||||
text = run["text"]
|
||||
|
||||
is_text_field = len(run.keys()) == 1
|
||||
|
||||
# regex that text is a year
|
||||
if is_text_field and re.match(r"\d{4}", text):
|
||||
album.date = ID3Timestamp.strptime(text, "%Y")
|
||||
return
|
||||
|
||||
if text in ALBUM_TYPE_MAP:
|
||||
album.album_type = ALBUM_TYPE_MAP[text]
|
||||
return
|
||||
|
||||
if not is_text_field:
|
||||
r = parse_run_element(run)
|
||||
if r is not None:
|
||||
album.add_list_of_other_objects([r])
|
||||
return
|
||||
|
||||
for _run in subtitle_runs:
|
||||
other_parse_run(_run)
|
||||
|
||||
# tracklist
|
||||
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
||||
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
||||
|
||||
@@ -472,20 +536,67 @@ class YoutubeMusic(SuperYouTube):
|
||||
for i, content in enumerate(renderer_list):
|
||||
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
|
||||
|
||||
results = []
|
||||
|
||||
"""
|
||||
cant use fixed indices, because if something has no entries, the list dissappears
|
||||
instead I have to try parse everything, and just reject community playlists and profiles.
|
||||
"""
|
||||
|
||||
for renderer in renderer_list:
|
||||
results.extend(parse_renderer(renderer))
|
||||
album.add_list_of_other_objects(parse_renderer(renderer))
|
||||
|
||||
album.add_list_of_other_objects(results)
|
||||
for song in album.song_collection:
|
||||
for song_source in song.source_collection:
|
||||
song_source.additional_data["playlist_id"] = browse_id
|
||||
|
||||
return album
|
||||
|
||||
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
|
||||
request_data = {
|
||||
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
|
||||
"videoId": video_id,
|
||||
}
|
||||
if playlist_id is not None:
|
||||
request_data["playlistId"] = playlist_id
|
||||
|
||||
tab_request = self.yt_music_connection.post(
|
||||
url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"),
|
||||
json=request_data,
|
||||
name=f"fetch_song_tabs_{video_id}.json",
|
||||
)
|
||||
|
||||
if tab_request is None:
|
||||
return None
|
||||
|
||||
dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False)
|
||||
|
||||
tab_data: dict = tab_request.json()
|
||||
|
||||
tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[])
|
||||
browse_id = None
|
||||
for tab in tabs:
|
||||
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
|
||||
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
|
||||
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
|
||||
break
|
||||
|
||||
if browse_id is None:
|
||||
return None
|
||||
|
||||
|
||||
r = self.yt_music_connection.post(
|
||||
url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"),
|
||||
json={
|
||||
"browseId": browse_id,
|
||||
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
|
||||
},
|
||||
name=f"fetch_song_lyrics_{video_id}.json"
|
||||
)
|
||||
|
||||
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
|
||||
|
||||
data = r.json()
|
||||
lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None)
|
||||
if lyrics_text is None:
|
||||
return None
|
||||
|
||||
return Lyrics(FormattedText(plain=lyrics_text))
|
||||
|
||||
|
||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||
ydl_res: dict = {}
|
||||
@@ -502,10 +613,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
uploader = ydl_res.get("uploader", "")
|
||||
if uploader.endswith(" - Topic"):
|
||||
artist_names = [uploader.rstrip(" - Topic")]
|
||||
"""
|
||||
elif "artist" in ydl_res:
|
||||
artist_names = ydl_res.get("artist").split(", ")
|
||||
"""
|
||||
|
||||
artist_list = [
|
||||
Artist(
|
||||
name=name,
|
||||
@@ -523,7 +631,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
))
|
||||
|
||||
artist_name = artist_names[0] if len(artist_names) > 0 else None
|
||||
return Song(
|
||||
song = Song(
|
||||
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
|
||||
note=ydl_res.get("descriptions"),
|
||||
album_list=album_list,
|
||||
@@ -536,6 +644,43 @@ class YoutubeMusic(SuperYouTube):
|
||||
), source],
|
||||
)
|
||||
|
||||
# other song details
|
||||
parsed_url = urlparse(source.url)
|
||||
browse_id = parse_qs(parsed_url.query)['v'][0]
|
||||
request_data = {
|
||||
"captionParams": {},
|
||||
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
|
||||
"videoId": browse_id,
|
||||
}
|
||||
if "playlist_id" in source.additional_data:
|
||||
request_data["playlistId"] = source.additional_data["playlist_id"]
|
||||
|
||||
initial_details = self.yt_music_connection.post(
|
||||
url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"),
|
||||
json=request_data,
|
||||
name=f"fetch_song_{browse_id}.json",
|
||||
)
|
||||
|
||||
if initial_details is None:
|
||||
return song
|
||||
|
||||
dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False)
|
||||
|
||||
data = initial_details.json()
|
||||
video_details = data.get("videoDetails", {})
|
||||
|
||||
browse_id = video_details.get("videoId", browse_id)
|
||||
song.title = video_details.get("title", song.title)
|
||||
if video_details.get("isLiveContent", False):
|
||||
for album in song.album_list:
|
||||
album.album_type = AlbumType.LIVE_ALBUM
|
||||
for thumbnail in video_details.get("thumbnails", []):
|
||||
song.artwork.append(**thumbnail)
|
||||
|
||||
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
|
||||
|
||||
return song
|
||||
|
||||
|
||||
def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict:
|
||||
def _get_best_format(format_list: List[Dict]) -> dict:
|
||||
|
||||
@@ -3,7 +3,7 @@ from pathlib import Path
|
||||
import json
|
||||
import logging
|
||||
import inspect
|
||||
from typing import List
|
||||
from typing import List, Union
|
||||
|
||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
|
||||
from .config import config, read_config, write_config
|
||||
@@ -77,6 +77,37 @@ def object_trace(obj):
|
||||
misc functions
|
||||
"""
|
||||
|
||||
def traverse_json_path(data, path: Union[str, List[str]], default=None):
|
||||
"""
|
||||
Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices.
|
||||
"""
|
||||
|
||||
if isinstance(path, str):
|
||||
path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".")
|
||||
path = [p for p in path.split(".") if len(p) > 0]
|
||||
|
||||
if len(path) <= 0:
|
||||
return data
|
||||
|
||||
current = path[0]
|
||||
path = path[1:]
|
||||
|
||||
new_data = None
|
||||
|
||||
if isinstance(data, dict):
|
||||
new_data = data.get(current)
|
||||
|
||||
elif isinstance(data, list):
|
||||
try:
|
||||
new_data = data[int(current)]
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
|
||||
if new_data is None:
|
||||
return default
|
||||
|
||||
return traverse_json_path(data=new_data, path=path, default=default)
|
||||
|
||||
_auto_increment = 0
|
||||
def generate_id() -> int:
|
||||
global _auto_increment
|
||||
|
||||
@@ -15,7 +15,7 @@ __stage__ = os.getenv("STAGE", "prod")
|
||||
DEBUG = (__stage__ == "dev") and True
|
||||
DEBUG_LOGGING = DEBUG and False
|
||||
DEBUG_TRACE = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE = DEBUG and False
|
||||
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||
DEBUG_PAGES = DEBUG and False
|
||||
|
||||
@@ -52,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni
|
||||
string = string[1:]
|
||||
|
||||
string = string.replace("/", "_").replace("\\", "_")
|
||||
|
||||
try:
|
||||
string = translit(string, reversed=True)
|
||||
except LanguageDetectionError:
|
||||
pass
|
||||
|
||||
string = sanitize_filename(string)
|
||||
|
||||
return string
|
||||
|
||||
if isinstance(string, Path):
|
||||
|
||||
Reference in New Issue
Block a user