feat: layed out the calculation of prefered format.

This commit is contained in:
Hazel 2024-01-31 15:24:19 +01:00
parent d93e536ffd
commit c7279eb424

View File

@ -128,22 +128,43 @@ class YouTubeMusicCredentials:
return _extract_player_info(self.player_url)
class MusicKrakenYoutubeIE(YoutubeIE):
def __init__(self, *args, main_instance: YoutubeMusic, **kwargs):
self.main_instance = main_instance
super().__init__(*args, **kwargs)
class YTDLLogger:
def __init__(self, logger: logging.Logger):
self.logger = logger
def debug(self, msg):
self.logger.debug(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
class MusicKrakenYoutubeDL(youtube_dl.YoutubeDL):
def __init__(self, main_instance: YoutubeMusic, ydl_opts: dict, **kwargs):
self.main_instance = main_instance
super().__init__(ydl_opts or {}, **kwargs)
ydl_opts = ydl_opts or {}
ydl_opts.update({
"logger": YTDLLogger(self.main_instance.LOGGER),
})
super().__init__(ydl_opts, **kwargs)
super().__enter__()
def __del__(self):
super().__exit__(None, None, None)
class MusicKrakenYoutubeIE(YoutubeIE):
def __init__(self, *args, main_instance: YoutubeMusic, **kwargs):
self.main_instance = main_instance
super().__init__(*args, **kwargs)
def _extract_player_url(self, *ytcfgs, **kw_webpage):
return youtube_settings["player_url"]
class YoutubeMusic(SuperYouTube):
@ -459,157 +480,39 @@ class YoutubeMusic(SuperYouTube):
return album
@lru_cache()
def _extract_signature_function(self, player_url):
r = self.connection.get(player_url)
if r is None:
return lambda x: None
def _get_best_format(self, format_list: List[Dict]) -> str:
def _calc_score(_f: dict):
s = 0
code = r.text
_url = _f.get("url", "")
if "mime=audio" in _url:
s += 100
funcname = self._search_regex((
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\bm=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)',
r'\bc&&\(c=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(c\)\)',
r'(?:\b|[^a-zA-Z0-9$])(?P<sig>[a-zA-Z0-9$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9$]{2}\.[a-zA-Z0-9$]{2}\(a,\d+\))?',
r'(?P<sig>[a-zA-Z0-9$]+)\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)',
# Obsolete patterns
r'("|\')signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(',
r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\('
),
code, group='sig')
return s
jsi = JSInterpreter(code)
initial_function = jsi.extract_function(funcname)
highest_score = 0
best_format = {}
for _format in format_list:
print(_format)
return lambda s: initial_function([s])
_s = _calc_score(_format)
if _s >= highest_score:
highest_score = _s
best_format = _format
def _decrypt_signature(self, video_id, s):
print(youtube_settings["player_url"])
res = self._extract_signature_function(player_url=youtube_settings["player_url"])
test_string = ''.join(map(str, range(len(s))))
cache_spec = [ord(c) for c in res(test_string)]
signing_func = lambda _s: ''.join(_s[i] for i in cache_spec)
return signing_func(s)
def _parse_adaptive_formats(self, data: list, video_id) -> dict:
best_format = None
best_bitrate = 0
def parse_format(fmt: dict):
fmt_url = fmt.get('url')
if not fmt_url:
sc = parse_qs(possible_format["signatureCipher"])
print(sc["s"][0])
signature = self._decrypt_signature(video_id, sc['s'][0])
print(signature)
tmp = sc.get("sp", ["sig"])
sig_key = "signature" if len(tmp) <= 0 else tmp[-1]
fmt_url = sc.get("url", [None])[0]
ftm_parsed = urlparse(fmt_url)
q = parse_qs(ftm_parsed.query)
q[sig_key] = [signature]
print(json.dumps(q, indent=4))
print(sig_key)
query_str = urlencode(q)
print(query_str)
fmt_url = urlunparse((
ftm_parsed.scheme,
ftm_parsed.netloc,
ftm_parsed.path,
ftm_parsed.params,
query_str,
ftm_parsed.fragment
))
"""
if not isinstance(url, tuple):
url = compat_urllib_parse.urlparse(url)
query = kwargs.pop('query_update', None)
if query:
qs = compat_parse_qs(url.query)
qs.update(query)
kwargs['query'] = compat_urllib_parse_urlencode(qs, True)
kwargs = compat_kwargs(kwargs)
return compat_urllib_parse.urlunparse(url._replace(**kwargs))
"""
return {
"bitrate": fmt.get("bitrate"),
"url": fmt_url
}
for possible_format in sorted(data, key=lambda x: x.get("bitrate", 0)):
if best_bitrate <= 0:
# no format has been found yet
best_format = possible_format
if possible_format.get('targetDurationSec') or possible_format.get('drmFamilies'):
continue
mime_type: str = possible_format["mimeType"]
if not mime_type.startswith("audio"):
continue
bitrate = int(possible_format.get("bitrate", 0))
if bitrate > best_bitrate:
best_bitrate = bitrate
best_format = possible_format
if bitrate >= main_settings["bitrate"]:
break
return parse_format(best_format)
return best_format.get("url")
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
# implement the functionality yt_dl provides
ydl_res = self.yt_ie._real_extract(source.url)
print(ydl_res)
source.audio_url = ydl_res.get("formats")[0].get("url")
source.audio_url = self._get_best_format(ydl_res.get("formats", [{}]))
song = Song(
title=ydl_res.get("title"),
source_list=[source],
)
return song
parsed_url = urlparse(source.url)
video_id = parse_qs(parsed_url.query)['v']
if len(video_id) <= 0:
return song
browse_id = video_id[0]
r = self.connection.post(
url=get_youtube_url(path="/youtubei/v1/player", query=f"key={self.credentials.api_key}&prettyPrint=false"),
json={
"videoId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
}
)
if r is None:
return song
data = r.json()
dump_to_file("yt_video_overview.json", data, exit_after_dump=False)
available_formats = data.get("streamingData", {}).get("adaptiveFormats", [])
if len(available_formats) > 0:
source.audio_url = self._parse_adaptive_formats(available_formats, video_id=browse_id).get("url")
return song
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
if source.audio_url is None: