13 Commits

Author SHA1 Message Date
Hellow
1b22c80e5c fix: removing the possibility or file names containing /
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-06 18:48:13 +02:00
Hellow
6805d1cbe6 feat: allowed to append none to source collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 18:40:21 +02:00
Hellow
542d59562a fix: removed redundand code
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 18:35:25 +02:00
Hellow
131be537c8 fix: actually merging 2024-05-06 17:39:53 +02:00
ed8cc914be feat: lyrics for youtube music
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 16:27:49 +02:00
5ed902489f feat: added additional data
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 14:33:03 +02:00
90d685da81 feat: implemented correct merging of artists
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 12:53:06 +02:00
be7e91cb7b feat: improved the youtube music album fetching 2024-05-06 12:44:15 +02:00
7e5a1f84ae feat: improved the youtube music album fetching 2024-05-06 12:40:06 +02:00
d9105fb55a fix: some bug
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 10:31:21 +02:00
a7711761f9 dfa
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-03 14:55:22 +02:00
9c369b421d feat: oh no
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-03 14:52:12 +02:00
be843f2c10 draft: improved debug even more 2024-04-30 17:43:00 +02:00
15 changed files with 364 additions and 157 deletions

View File

@@ -29,9 +29,11 @@
"pathvalidate",
"Referer",
"sponsorblock",
"tracklist",
"tracksort",
"translit",
"unmap",
"youtube"
"youtube",
"youtubei"
]
}

View File

@@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Crystal F",
"d: 20",
"s: #a Psychonaut 4",
"d: 0"
]

View File

@@ -2,30 +2,24 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist 3"),
]
song_1 = Song(
title="song",
feature_artist_list=[Artist(
name="main_artist"
)]
)
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
],
artist_list=[
Artist(name="artist"),
]
other_artist = Artist(name="other_artist")
song_2 = Song(
title = "song",
main_artist_list=[other_artist]
)
album_1.merge(album_2)
other_artist.name = "main_artist"
print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
song_1.merge(song_2)
print(id(album_1.artist_collection), id(album_2.artist_collection))
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
print("#" * 120)
print("main", *song_1.main_artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@@ -79,7 +79,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
with temp_target.open("wb") as f:
f.write(r.content)
converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}")
with Image.open(temp_target.file_path) as img:
# crop the image if it isn't square in the middle with minimum data loss
width, height = img.size

View File

@@ -40,8 +40,6 @@ class Collection(Generic[T]):
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
@@ -50,10 +48,11 @@ class Collection(Generic[T]):
self.extend(data)
def __repr__(self) -> str:
return f"Collection({id(self)})"
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
def _map_element(self, __object: T, from_map: bool = False):
self._unmap_element(__object.id)
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
@@ -78,107 +77,128 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id]
def _remap(self):
for e in self:
self._map_element(e)
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
if not no_push_to or True:
for c in self.push_to:
found, found_in = c._find_object(__object, no_push_to=True)
if found is not None:
output("push to", found, __object, color=BColors.RED)
return found, found_in
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
if __object.id in self._indexed_from_id:
return self._indexed_values["id"][__object.id]
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value], self
return self._indexed_values[name][value]
return None, self
return None
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
def append(self, __object: Optional[T], **kwargs):
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param __object:
:param other:
:return:
"""
if __object is None:
if other is None:
return
if other.id in self._indexed_from_id:
return
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
object_trace(f"Appending {other.option_string} to {self}")
if map_to is self:
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
if other is None:
continue
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
other.merge(r, **kwargs)
c.remove(r, existing=r, **kwargs)
break
output("pull from", other, __object, color=BColors.RED)
__object.__merge__(other, no_push_to=False, **kwargs)
contained.remove(other)
existing_object = self._find_object(other)
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
if existing_object is None:
# append
self._data.append(__object)
self._map_element(__object)
self._append_new_object(other, **kwargs)
else:
existing_object.merge(other, **kwargs)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection, **kwargs)
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs):
for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
"""
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, **kwargs)
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
"""
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
if a is b:
continue
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
object_trace(f"Syncing [{a}] = [{b}]; {no_sync_collection}")
if id(b) in no_sync_collection:
continue
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b))
# kwargs["no_sync_collection"] = no_sync_collection
del b
a.extend(b_data, **kwargs)
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
return
existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
self._data.remove(existing)
self._unmap_element(existing)
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
def extend(self, __iterable: Optional[Generator[T, None, None]], **kwargs):
if __iterable is None:
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return
for __object in __iterable:
self.append(__object, **kwargs)
for other_object in other_collections:
self.append(other_object, **kwargs)
@property
def data(self) -> List[T]:
@@ -194,8 +214,9 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, **kwargs):
self.extend(__other, **kwargs)
def __merge__(self, other: Collection, **kwargs):
object_trace(f"merging {str(self)} | {str(other)}")
self.extend(other, **kwargs)
def __getitem__(self, item: int):
return self._data[item]
@@ -204,3 +225,9 @@ class Collection(Generic[T]):
if item >= len(self._data):
return default
return self._data[item]
def __eq__(self, other: Collection) -> bool:
if self.empty and other.empty:
return True
return self._data == other._data

View File

@@ -11,7 +11,7 @@ import inspect
from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"]
@@ -60,6 +60,13 @@ class InnerData:
self._fetched_from.update(__other._fetched_from)
for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
continue
if hasattr(value, "__is_collection__") and key in self.__dict__:
self.__getattribute__(key).__merge__(value, **kwargs)
continue
# just set the other value if self doesn't already have it
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
self.__setattr__(key, value)
@@ -67,9 +74,8 @@ class InnerData:
# if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key)
if hasattr(type(existing), "__merge__"):
if hasattr(existing, "__merge__"):
existing.__merge__(value, **kwargs)
continue
class OuterProxy:
@@ -113,7 +119,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
object_trace(f"creating {type(self).__name__} [{self.option_string}]")
self.__init_collections__()
@@ -192,7 +198,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
object_trace(f"merging {a.option_string} | {b.option_string}")
old_inner = b._inner
@@ -243,6 +249,10 @@ class OuterProxy:
return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = []
@property
@@ -278,7 +288,7 @@ class OuterProxy:
TITEL = "id"
@property
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL))
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"

View File

@@ -22,6 +22,7 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .parents import OuterProxy as Base
@@ -44,7 +45,7 @@ def get_collection_string(
ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND,
add_id: bool = True,
add_id: bool = DEBUG_PRINT_ID,
) -> str:
if collection.empty:
return ""
@@ -59,7 +60,7 @@ def get_collection_string(
def get_element_str(element) -> str:
nonlocal add_id
r = element.title_string.strip()
if add_id:
if add_id and False:
r += " " + str(element.id)
return r
@@ -155,7 +156,7 @@ class Song(Base):
return
if isinstance(object_list, Artist):
self.main_artist_collection.extend(object_list)
self.feature_artist_collection.extend(object_list)
return
if isinstance(object_list, Album):
@@ -203,7 +204,7 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
@@ -348,7 +349,7 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
@@ -578,7 +579,7 @@ class Artist(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value

View File

@@ -30,7 +30,7 @@ class Source:
def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum
@cached_property
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@@ -122,6 +122,9 @@ class SourceCollection:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:

View File

@@ -690,13 +690,6 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song)
if stop_at_level > 1:
song: Song
for song in album.song_collection:
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
for source in sources:
song.merge(self.fetch_song(source=source))
album.update_tracksort()
return album

View File

@@ -25,7 +25,6 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results.extend(parse_renderer(sub_renderer))
return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", []))
@@ -54,21 +53,11 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
for result in results:
_map[type(result)].append(result)
for song in song_list:
song.album_collection.extend(album_list)
if len(song_list) == 1:
song = song_list[0]
song.feature_artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
for album in album_list:
album.artist_collection.extend(artist_list)
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
song.album_collection.extend(album_list)
return [song]
return results

View File

@@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
_temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {}))
element_type = PageType.SONG
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
@@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
except ValueError:
return
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId"))
element_text = run_element.get("text")
if element_id is None or element_text is None:
@@ -60,7 +60,11 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=clean_song_title(element_text), source_list=[source])
return Song(
title=clean_song_title(element_text),
source_list=[source]
)
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")

View File

@@ -8,6 +8,7 @@ import json
from dataclasses import dataclass
import re
from functools import lru_cache
from collections import defaultdict
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
@@ -17,7 +18,7 @@ from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
@@ -30,12 +31,16 @@ from ...objects import (
Song,
Album,
Label,
Target
Target,
Lyrics,
FormattedText
)
from ...connection import Connection
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube
@@ -162,6 +167,12 @@ class MusicKrakenYoutubeIE(YoutubeIE):
ALBUM_TYPE_MAP = {
"Single": AlbumType.SINGLE,
"Album": AlbumType.STUDIO_ALBUM,
"EP": AlbumType.EP,
}
class YoutubeMusic(SuperYouTube):
# CHANGE
@@ -401,7 +412,7 @@ class YoutubeMusic(SuperYouTube):
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
artist = Artist(source_list=[source])
# construct the request
url = urlparse(source.url)
@@ -421,6 +432,19 @@ class YoutubeMusic(SuperYouTube):
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
# artist details
data: dict = r.json()
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
artist.name = title_runs[0].get("text", artist.name)
# fetch discography
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@@ -465,6 +489,46 @@ class YoutubeMusic(SuperYouTube):
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
# album details
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
album.title = title_runs[0].get("text", album.title)
def other_parse_run(run: dict) -> str:
nonlocal album
if "text" not in run:
return
text = run["text"]
is_text_field = len(run.keys()) == 1
# regex that text is a year
if is_text_field and re.match(r"\d{4}", text):
album.date = ID3Timestamp.strptime(text, "%Y")
return
if text in ALBUM_TYPE_MAP:
album.album_type = ALBUM_TYPE_MAP[text]
return
if not is_text_field:
r = parse_run_element(run)
if r is not None:
album.add_list_of_other_objects([r])
return
for _run in subtitle_runs:
other_parse_run(_run)
# tracklist
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@@ -472,20 +536,67 @@ class YoutubeMusic(SuperYouTube):
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
album.add_list_of_other_objects(parse_renderer(renderer))
album.add_list_of_other_objects(results)
for song in album.song_collection:
for song_source in song.source_collection:
song_source.additional_data["playlist_id"] = browse_id
return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
}
if playlist_id is not None:
request_data["playlistId"] = playlist_id
tab_request = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_tabs_{video_id}.json",
)
if tab_request is None:
return None
dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False)
tab_data: dict = tab_request.json()
tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[])
browse_id = None
for tab in tabs:
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is None:
return None
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
},
name=f"fetch_song_lyrics_{video_id}.json"
)
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None)
if lyrics_text is None:
return None
return Lyrics(FormattedText(plain=lyrics_text))
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = {}
@@ -502,10 +613,7 @@ class YoutubeMusic(SuperYouTube):
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
"""
elif "artist" in ydl_res:
artist_names = ydl_res.get("artist").split(", ")
"""
artist_list = [
Artist(
name=name,
@@ -523,7 +631,7 @@ class YoutubeMusic(SuperYouTube):
))
artist_name = artist_names[0] if len(artist_names) > 0 else None
return Song(
song = Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"),
album_list=album_list,
@@ -536,6 +644,43 @@ class YoutubeMusic(SuperYouTube):
), source],
)
# other song details
parsed_url = urlparse(source.url)
browse_id = parse_qs(parsed_url.query)['v'][0]
request_data = {
"captionParams": {},
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": browse_id,
}
if "playlist_id" in source.additional_data:
request_data["playlistId"] = source.additional_data["playlist_id"]
initial_details = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_{browse_id}.json",
)
if initial_details is None:
return song
dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False)
data = initial_details.json()
video_details = data.get("videoDetails", {})
browse_id = video_details.get("videoId", browse_id)
song.title = video_details.get("title", song.title)
if video_details.get("isLiveContent", False):
for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
return song
def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict:
def _get_best_format(format_list: List[Dict]) -> dict:

View File

@@ -3,7 +3,7 @@ from pathlib import Path
import json
import logging
import inspect
from typing import List
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
@@ -77,6 +77,37 @@ def object_trace(obj):
misc functions
"""
def traverse_json_path(data, path: Union[str, List[str]], default=None):
"""
Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices.
"""
if isinstance(path, str):
path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".")
path = [p for p in path.split(".") if len(p) > 0]
if len(path) <= 0:
return data
current = path[0]
path = path[1:]
new_data = None
if isinstance(data, dict):
new_data = data.get(current)
elif isinstance(data, list):
try:
new_data = data[int(current)]
except (IndexError, ValueError):
pass
if new_data is None:
return default
return traverse_json_path(data=new_data, path=path, default=default)
_auto_increment = 0
def generate_id() -> int:
global _auto_increment

View File

@@ -15,11 +15,12 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:
print("DEBUG ACTIVE")

View File

@@ -52,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni
string = string[1:]
string = string.replace("/", "_").replace("\\", "_")
try:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
string = sanitize_filename(string)
return string
if isinstance(string, Path):