6 Commits

Author SHA1 Message Date
Hellow
1b22c80e5c fix: removing the possibility or file names containing /
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-06 18:48:13 +02:00
Hellow
6805d1cbe6 feat: allowed to append none to source collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 18:40:21 +02:00
Hellow
542d59562a fix: removed redundand code
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 18:35:25 +02:00
Hellow
131be537c8 fix: actually merging 2024-05-06 17:39:53 +02:00
ed8cc914be feat: lyrics for youtube music
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 16:27:49 +02:00
5ed902489f feat: added additional data
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-06 14:33:03 +02:00
12 changed files with 168 additions and 34 deletions

View File

@@ -33,6 +33,7 @@
"tracksort",
"translit",
"unmap",
"youtube"
"youtube",
"youtubei"
]
}

View File

@@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Crystal F",
"d: 20"
"s: #a Psychonaut 4",
"d: 0"
]

View File

@@ -79,7 +79,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
with temp_target.open("wb") as f:
f.write(r.content)
converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}")
with Image.open(temp_target.file_path) as img:
# crop the image if it isn't square in the middle with minimum data loss
width, height = img.size

View File

@@ -88,6 +88,9 @@ class Collection(Generic[T]):
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
if __object.id in self._indexed_from_id:
return self._indexed_values["id"][__object.id]
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
@@ -138,7 +141,6 @@ class Collection(Generic[T]):
:return:
"""
if other is None:
return
if other.id in self._indexed_from_id:
@@ -146,13 +148,6 @@ class Collection(Generic[T]):
object_trace(f"Appending {other.option_string} to {self}")
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
for c in self.pull_from:
r = c._find_object(other)
@@ -163,6 +158,13 @@ class Collection(Generic[T]):
break
existing_object = self._find_object(other)
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
if existing_object is None:
self._append_new_object(other, **kwargs)
@@ -195,8 +197,8 @@ class Collection(Generic[T]):
if other_collections is None:
return
for __object in other_collections:
self.append(__object, **kwargs)
for other_object in other_collections:
self.append(other_object, **kwargs)
@property
def data(self) -> List[T]:

View File

@@ -60,6 +60,13 @@ class InnerData:
self._fetched_from.update(__other._fetched_from)
for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
continue
if hasattr(value, "__is_collection__") and key in self.__dict__:
self.__getattribute__(key).__merge__(value, **kwargs)
continue
# just set the other value if self doesn't already have it
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
self.__setattr__(key, value)
@@ -67,9 +74,8 @@ class InnerData:
# if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key)
if hasattr(type(existing), "__merge__"):
if hasattr(existing, "__merge__"):
existing.__merge__(value, **kwargs)
continue
class OuterProxy:

View File

@@ -60,7 +60,7 @@ def get_collection_string(
def get_element_str(element) -> str:
nonlocal add_id
r = element.title_string.strip()
if add_id:
if add_id and False:
r += " " + str(element.id)
return r

View File

@@ -122,6 +122,9 @@ class SourceCollection:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:

View File

@@ -690,13 +690,6 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song)
if stop_at_level > 1:
song: Song
for song in album.song_collection:
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
for source in sources:
song.merge(self.fetch_song(source=source))
album.update_tracksort()
return album

View File

@@ -18,7 +18,7 @@ from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
@@ -31,7 +31,9 @@ from ...objects import (
Song,
Album,
Label,
Target
Target,
Lyrics,
FormattedText
)
from ...connection import Connection
from ...utils.enums.album import AlbumType
@@ -538,8 +540,63 @@ class YoutubeMusic(SuperYouTube):
for renderer in renderer_list:
album.add_list_of_other_objects(parse_renderer(renderer))
for song in album.song_collection:
for song_source in song.source_collection:
song_source.additional_data["playlist_id"] = browse_id
return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
}
if playlist_id is not None:
request_data["playlistId"] = playlist_id
tab_request = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_tabs_{video_id}.json",
)
if tab_request is None:
return None
dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False)
tab_data: dict = tab_request.json()
tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[])
browse_id = None
for tab in tabs:
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is None:
return None
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
},
name=f"fetch_song_lyrics_{video_id}.json"
)
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None)
if lyrics_text is None:
return None
return Lyrics(FormattedText(plain=lyrics_text))
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = {}
@@ -556,10 +613,7 @@ class YoutubeMusic(SuperYouTube):
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
"""
elif "artist" in ydl_res:
artist_names = ydl_res.get("artist").split(", ")
"""
artist_list = [
Artist(
name=name,
@@ -577,7 +631,7 @@ class YoutubeMusic(SuperYouTube):
))
artist_name = artist_names[0] if len(artist_names) > 0 else None
return Song(
song = Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"),
album_list=album_list,
@@ -590,6 +644,43 @@ class YoutubeMusic(SuperYouTube):
), source],
)
# other song details
parsed_url = urlparse(source.url)
browse_id = parse_qs(parsed_url.query)['v'][0]
request_data = {
"captionParams": {},
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": browse_id,
}
if "playlist_id" in source.additional_data:
request_data["playlistId"] = source.additional_data["playlist_id"]
initial_details = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_{browse_id}.json",
)
if initial_details is None:
return song
dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False)
data = initial_details.json()
video_details = data.get("videoDetails", {})
browse_id = video_details.get("videoId", browse_id)
song.title = video_details.get("title", song.title)
if video_details.get("isLiveContent", False):
for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
return song
def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict:
def _get_best_format(format_list: List[Dict]) -> dict:

View File

@@ -3,7 +3,7 @@ from pathlib import Path
import json
import logging
import inspect
from typing import List
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
@@ -77,6 +77,37 @@ def object_trace(obj):
misc functions
"""
def traverse_json_path(data, path: Union[str, List[str]], default=None):
"""
Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices.
"""
if isinstance(path, str):
path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".")
path = [p for p in path.split(".") if len(p) > 0]
if len(path) <= 0:
return data
current = path[0]
path = path[1:]
new_data = None
if isinstance(data, dict):
new_data = data.get(current)
elif isinstance(data, list):
try:
new_data = data[int(current)]
except (IndexError, ValueError):
pass
if new_data is None:
return default
return traverse_json_path(data=new_data, path=path, default=default)
_auto_increment = 0
def generate_id() -> int:
global _auto_increment

View File

@@ -15,11 +15,11 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and True
DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:

View File

@@ -52,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni
string = string[1:]
string = string.replace("/", "_").replace("\\", "_")
try:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
string = sanitize_filename(string)
return string
if isinstance(string, Path):