Compare commits
No commits in common. "90d685da816daa8e207491b03d3628ca94d9be6d" and "d9105fb55a6795291035138740e4e7766d79491f" have entirely different histories.
90d685da81
...
d9105fb55a
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@ -29,7 +29,6 @@
|
|||||||
"pathvalidate",
|
"pathvalidate",
|
||||||
"Referer",
|
"Referer",
|
||||||
"sponsorblock",
|
"sponsorblock",
|
||||||
"tracklist",
|
|
||||||
"tracksort",
|
"tracksort",
|
||||||
"translit",
|
"translit",
|
||||||
"unmap",
|
"unmap",
|
||||||
|
@ -93,6 +93,18 @@ class Collection(Generic[T]):
|
|||||||
return self._indexed_values[name][value]
|
return self._indexed_values[name][value]
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
|
||||||
|
"""
|
||||||
|
This function merges the other object into the existing object, which is contained in the current collection.
|
||||||
|
This also modifies the correct mapping.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if existing.id == other.id:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._map_element(existing)
|
||||||
|
existing.merge(other, **kwargs)
|
||||||
|
|
||||||
def _append_new_object(self, other: T, **kwargs):
|
def _append_new_object(self, other: T, **kwargs):
|
||||||
"""
|
"""
|
||||||
@ -101,6 +113,7 @@ class Collection(Generic[T]):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
self._data.append(other)
|
self._data.append(other)
|
||||||
|
self._map_element(other)
|
||||||
|
|
||||||
# all of the existing hooks to get the defined datastructure
|
# all of the existing hooks to get the defined datastructure
|
||||||
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
||||||
@ -115,10 +128,17 @@ class Collection(Generic[T]):
|
|||||||
if a is b:
|
if a is b:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
"""
|
||||||
|
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
|
||||||
|
if id(b) in no_sync_collection:
|
||||||
|
continue
|
||||||
|
"""
|
||||||
object_trace(f"Syncing [{a}] = [{b}]")
|
object_trace(f"Syncing [{a}] = [{b}]")
|
||||||
|
|
||||||
|
|
||||||
b_data = b.data.copy()
|
b_data = b.data.copy()
|
||||||
b_collection_for = b._collection_for.copy()
|
b_collection_for = b._collection_for.copy()
|
||||||
|
# no_sync_collection.add(id(b))
|
||||||
|
|
||||||
del b
|
del b
|
||||||
|
|
||||||
@ -146,7 +166,6 @@ class Collection(Generic[T]):
|
|||||||
|
|
||||||
object_trace(f"Appending {other.option_string} to {self}")
|
object_trace(f"Appending {other.option_string} to {self}")
|
||||||
|
|
||||||
# switching collection in the case of push to
|
|
||||||
for c in self.push_to:
|
for c in self.push_to:
|
||||||
r = c._find_object(other)
|
r = c._find_object(other)
|
||||||
if r is not None:
|
if r is not None:
|
||||||
@ -154,24 +173,25 @@ class Collection(Generic[T]):
|
|||||||
return c.append(other, **kwargs)
|
return c.append(other, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
pull_from: Optional[Tuple[Collection, T]] = None
|
||||||
for c in self.pull_from:
|
for c in self.pull_from:
|
||||||
r = c._find_object(other)
|
r = c._find_object(other)
|
||||||
if r is not None:
|
if r is not None:
|
||||||
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
|
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
|
||||||
other.merge(r, **kwargs)
|
other.merge(r, **kwargs)
|
||||||
c.remove(r, existing=r, **kwargs)
|
c.remove(r, **kwargs)
|
||||||
break
|
break
|
||||||
|
|
||||||
existing_object = self._find_object(other)
|
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
|
||||||
|
|
||||||
if existing_object is None:
|
if existing_object is None:
|
||||||
self._append_new_object(other, **kwargs)
|
self._append_new_object(other, **kwargs)
|
||||||
else:
|
else:
|
||||||
existing_object.merge(other, **kwargs)
|
existing_object.merge(other, **kwargs)
|
||||||
|
|
||||||
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs):
|
def remove(self, *other_list: List[T], silent: bool = False, **kwargs):
|
||||||
for other in other_list:
|
for other in other_list:
|
||||||
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
|
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
|
||||||
if existing is None:
|
if existing is None:
|
||||||
if not silent:
|
if not silent:
|
||||||
raise ValueError(f"Object {other} not found in {self}")
|
raise ValueError(f"Object {other} not found in {self}")
|
||||||
@ -213,7 +233,6 @@ class Collection(Generic[T]):
|
|||||||
yield from self._data
|
yield from self._data
|
||||||
|
|
||||||
def __merge__(self, other: Collection, **kwargs):
|
def __merge__(self, other: Collection, **kwargs):
|
||||||
object_trace(f"merging {str(self)} | {str(other)}")
|
|
||||||
self.extend(other, **kwargs)
|
self.extend(other, **kwargs)
|
||||||
|
|
||||||
def __getitem__(self, item: int):
|
def __getitem__(self, item: int):
|
||||||
@ -223,9 +242,3 @@ class Collection(Generic[T]):
|
|||||||
if item >= len(self._data):
|
if item >= len(self._data):
|
||||||
return default
|
return default
|
||||||
return self._data[item]
|
return self._data[item]
|
||||||
|
|
||||||
def __eq__(self, other: Collection) -> bool:
|
|
||||||
if self.empty and other.empty:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return self._data == other._data
|
|
||||||
|
@ -156,7 +156,7 @@ class Song(Base):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(object_list, Artist):
|
if isinstance(object_list, Artist):
|
||||||
self.feature_artist_collection.extend(object_list)
|
self.main_artist_collection.extend(object_list)
|
||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(object_list, Album):
|
if isinstance(object_list, Album):
|
||||||
|
@ -25,6 +25,7 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
|
|||||||
results.extend(parse_renderer(sub_renderer))
|
results.extend(parse_renderer(sub_renderer))
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
|
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
|
||||||
return parse_run_list(renderer.get("text", {}).get("runs", []))
|
return parse_run_list(renderer.get("text", {}).get("runs", []))
|
||||||
|
|
||||||
@ -53,11 +54,21 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
|
|||||||
for result in results:
|
for result in results:
|
||||||
_map[type(result)].append(result)
|
_map[type(result)].append(result)
|
||||||
|
|
||||||
if len(song_list) == 1:
|
for song in song_list:
|
||||||
song = song_list[0]
|
|
||||||
song.feature_artist_collection.extend(artist_list)
|
|
||||||
song.album_collection.extend(album_list)
|
song.album_collection.extend(album_list)
|
||||||
return [song]
|
song.feature_artist_collection.extend(artist_list)
|
||||||
|
|
||||||
|
if len(song_list) > 0:
|
||||||
|
return song_list
|
||||||
|
|
||||||
|
for album in album_list:
|
||||||
|
album.artist_collection.extend(artist_list)
|
||||||
|
|
||||||
|
if len(album_list) > 0:
|
||||||
|
return album_list
|
||||||
|
|
||||||
|
if len(artist_list) > 0:
|
||||||
|
return artist_list
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
|||||||
_temp_nav = run_element.get("navigationEndpoint", {})
|
_temp_nav = run_element.get("navigationEndpoint", {})
|
||||||
is_video = "watchEndpoint" in _temp_nav
|
is_video = "watchEndpoint" in _temp_nav
|
||||||
|
|
||||||
navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {}))
|
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
|
||||||
|
|
||||||
element_type = PageType.SONG
|
element_type = PageType.SONG
|
||||||
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
|
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
|
||||||
@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
return
|
return
|
||||||
|
|
||||||
element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId"))
|
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
|
||||||
element_text = run_element.get("text")
|
element_text = run_element.get("text")
|
||||||
|
|
||||||
if element_id is None or element_text is None:
|
if element_id is None or element_text is None:
|
||||||
@ -60,11 +60,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
|
|||||||
|
|
||||||
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
|
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
|
||||||
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
|
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
|
||||||
|
return Song(title=clean_song_title(element_text), source_list=[source])
|
||||||
return Song(
|
|
||||||
title=clean_song_title(element_text),
|
|
||||||
source_list=[source]
|
|
||||||
)
|
|
||||||
|
|
||||||
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
|
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
|
||||||
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
|
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
|
||||||
|
@ -8,7 +8,6 @@ import json
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import re
|
import re
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
import youtube_dl
|
import youtube_dl
|
||||||
from youtube_dl.extractor.youtube import YoutubeIE
|
from youtube_dl.extractor.youtube import YoutubeIE
|
||||||
@ -34,11 +33,9 @@ from ...objects import (
|
|||||||
Target
|
Target
|
||||||
)
|
)
|
||||||
from ...connection import Connection
|
from ...connection import Connection
|
||||||
from ...utils.enums.album import AlbumType
|
|
||||||
from ...utils.support_classes.download_result import DownloadResult
|
from ...utils.support_classes.download_result import DownloadResult
|
||||||
|
|
||||||
from ._list_render import parse_renderer
|
from ._list_render import parse_renderer
|
||||||
from ._music_object_render import parse_run_element
|
|
||||||
from .super_youtube import SuperYouTube
|
from .super_youtube import SuperYouTube
|
||||||
|
|
||||||
|
|
||||||
@ -165,12 +162,6 @@ class MusicKrakenYoutubeIE(YoutubeIE):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
ALBUM_TYPE_MAP = {
|
|
||||||
"Single": AlbumType.SINGLE,
|
|
||||||
"Album": AlbumType.STUDIO_ALBUM,
|
|
||||||
"EP": AlbumType.EP,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class YoutubeMusic(SuperYouTube):
|
class YoutubeMusic(SuperYouTube):
|
||||||
# CHANGE
|
# CHANGE
|
||||||
@ -410,7 +401,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
return results
|
return results
|
||||||
|
|
||||||
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
||||||
artist = Artist(source_list=[source])
|
artist = Artist()
|
||||||
|
|
||||||
# construct the request
|
# construct the request
|
||||||
url = urlparse(source.url)
|
url = urlparse(source.url)
|
||||||
@ -430,19 +421,6 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
if DEBUG:
|
if DEBUG:
|
||||||
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
# artist details
|
|
||||||
data: dict = r.json()
|
|
||||||
header = data.get("header", {})
|
|
||||||
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
|
|
||||||
|
|
||||||
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
|
||||||
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
|
||||||
|
|
||||||
if len(title_runs) > 0:
|
|
||||||
artist.name = title_runs[0].get("text", artist.name)
|
|
||||||
|
|
||||||
|
|
||||||
# fetch discography
|
|
||||||
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
||||||
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
||||||
|
|
||||||
@ -487,46 +465,6 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
if DEBUG:
|
if DEBUG:
|
||||||
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
data = r.json()
|
|
||||||
|
|
||||||
# album details
|
|
||||||
header = data.get("header", {})
|
|
||||||
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
|
|
||||||
|
|
||||||
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
|
||||||
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
|
||||||
|
|
||||||
if len(title_runs) > 0:
|
|
||||||
album.title = title_runs[0].get("text", album.title)
|
|
||||||
|
|
||||||
def other_parse_run(run: dict) -> str:
|
|
||||||
nonlocal album
|
|
||||||
|
|
||||||
if "text" not in run:
|
|
||||||
return
|
|
||||||
text = run["text"]
|
|
||||||
|
|
||||||
is_text_field = len(run.keys()) == 1
|
|
||||||
|
|
||||||
# regex that text is a year
|
|
||||||
if is_text_field and re.match(r"\d{4}", text):
|
|
||||||
album.date = ID3Timestamp.strptime(text, "%Y")
|
|
||||||
return
|
|
||||||
|
|
||||||
if text in ALBUM_TYPE_MAP:
|
|
||||||
album.album_type = ALBUM_TYPE_MAP[text]
|
|
||||||
return
|
|
||||||
|
|
||||||
if not is_text_field:
|
|
||||||
r = parse_run_element(run)
|
|
||||||
if r is not None:
|
|
||||||
album.add_list_of_other_objects([r])
|
|
||||||
return
|
|
||||||
|
|
||||||
for _run in subtitle_runs:
|
|
||||||
other_parse_run(_run)
|
|
||||||
|
|
||||||
# tracklist
|
|
||||||
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
|
||||||
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
|
||||||
|
|
||||||
@ -534,9 +472,17 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
for i, content in enumerate(renderer_list):
|
for i, content in enumerate(renderer_list):
|
||||||
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
|
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
|
||||||
|
"""
|
||||||
|
cant use fixed indices, because if something has no entries, the list dissappears
|
||||||
|
instead I have to try parse everything, and just reject community playlists and profiles.
|
||||||
|
"""
|
||||||
|
|
||||||
for renderer in renderer_list:
|
for renderer in renderer_list:
|
||||||
album.add_list_of_other_objects(parse_renderer(renderer))
|
results.extend(parse_renderer(renderer))
|
||||||
|
|
||||||
|
album.add_list_of_other_objects(results)
|
||||||
|
|
||||||
return album
|
return album
|
||||||
|
|
||||||
|
@ -15,11 +15,11 @@ __stage__ = os.getenv("STAGE", "prod")
|
|||||||
DEBUG = (__stage__ == "dev") and True
|
DEBUG = (__stage__ == "dev") and True
|
||||||
DEBUG_LOGGING = DEBUG and False
|
DEBUG_LOGGING = DEBUG and False
|
||||||
DEBUG_TRACE = DEBUG and True
|
DEBUG_TRACE = DEBUG and True
|
||||||
DEBUG_OBJECT_TRACE = DEBUG and True
|
DEBUG_OBJECT_TRACE = DEBUG and False
|
||||||
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
||||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||||
DEBUG_PAGES = DEBUG and False
|
DEBUG_PAGES = DEBUG and False
|
||||||
DEBUG_DUMP = DEBUG and True
|
DEBUG_DUMP = DEBUG and False
|
||||||
DEBUG_PRINT_ID = DEBUG and True
|
DEBUG_PRINT_ID = DEBUG and True
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
|
Loading…
Reference in New Issue
Block a user