diff --git a/src/music_kraken/pages/youtube_music/youtube_music.py b/src/music_kraken/pages/youtube_music/youtube_music.py index f0b84c8..fb3f812 100644 --- a/src/music_kraken/pages/youtube_music/youtube_music.py +++ b/src/music_kraken/pages/youtube_music/youtube_music.py @@ -128,22 +128,43 @@ class YouTubeMusicCredentials: return _extract_player_info(self.player_url) -class MusicKrakenYoutubeIE(YoutubeIE): - def __init__(self, *args, main_instance: YoutubeMusic, **kwargs): - self.main_instance = main_instance - super().__init__(*args, **kwargs) +class YTDLLogger: + def __init__(self, logger: logging.Logger): + self.logger = logger + + def debug(self, msg): + self.logger.debug(msg) + + def warning(self, msg): + self.logger.warning(msg) + + def error(self, msg): + self.logger.error(msg) class MusicKrakenYoutubeDL(youtube_dl.YoutubeDL): def __init__(self, main_instance: YoutubeMusic, ydl_opts: dict, **kwargs): self.main_instance = main_instance - super().__init__(ydl_opts or {}, **kwargs) + ydl_opts = ydl_opts or {} + ydl_opts.update({ + "logger": YTDLLogger(self.main_instance.LOGGER), + }) + + super().__init__(ydl_opts, **kwargs) super().__enter__() def __del__(self): super().__exit__(None, None, None) +class MusicKrakenYoutubeIE(YoutubeIE): + def __init__(self, *args, main_instance: YoutubeMusic, **kwargs): + self.main_instance = main_instance + super().__init__(*args, **kwargs) + + def _extract_player_url(self, *ytcfgs, **kw_webpage): + return youtube_settings["player_url"] + class YoutubeMusic(SuperYouTube): @@ -459,157 +480,39 @@ class YoutubeMusic(SuperYouTube): return album - @lru_cache() - def _extract_signature_function(self, player_url): - r = self.connection.get(player_url) - if r is None: - return lambda x: None + def _get_best_format(self, format_list: List[Dict]) -> str: + def _calc_score(_f: dict): + s = 0 - code = r.text + _url = _f.get("url", "") + if "mime=audio" in _url: + s += 100 - funcname = self._search_regex(( - r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(', - r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P[a-zA-Z0-9$]+)\(', - r'\bm=(?P[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)', - r'\bc&&\(c=(?P[a-zA-Z0-9$]{2,})\(decodeURIComponent\(c\)\)', - r'(?:\b|[^a-zA-Z0-9$])(?P[a-zA-Z0-9$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9$]{2}\.[a-zA-Z0-9$]{2}\(a,\d+\))?', - r'(?P[a-zA-Z0-9$]+)\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)', - # Obsolete patterns - r'("|\')signature\1\s*,\s*(?P[a-zA-Z0-9$]+)\(', - r'\.sig\|\|(?P[a-zA-Z0-9$]+)\(', - r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P[a-zA-Z0-9$]+)\(', - r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P[a-zA-Z0-9$]+)\(', - r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*(?P[a-zA-Z0-9$]+)\(', - r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P[a-zA-Z0-9$]+)\(' - ), - code, group='sig') + return s - jsi = JSInterpreter(code) - initial_function = jsi.extract_function(funcname) + highest_score = 0 + best_format = {} + for _format in format_list: + print(_format) - return lambda s: initial_function([s]) + _s = _calc_score(_format) + if _s >= highest_score: + highest_score = _s + best_format = _format - def _decrypt_signature(self, video_id, s): - print(youtube_settings["player_url"]) - res = self._extract_signature_function(player_url=youtube_settings["player_url"]) - test_string = ''.join(map(str, range(len(s)))) - cache_spec = [ord(c) for c in res(test_string)] - signing_func = lambda _s: ''.join(_s[i] for i in cache_spec) - return signing_func(s) - - def _parse_adaptive_formats(self, data: list, video_id) -> dict: - best_format = None - best_bitrate = 0 - - def parse_format(fmt: dict): - fmt_url = fmt.get('url') - - if not fmt_url: - sc = parse_qs(possible_format["signatureCipher"]) - print(sc["s"][0]) - signature = self._decrypt_signature(video_id, sc['s'][0]) - print(signature) - - tmp = sc.get("sp", ["sig"]) - sig_key = "signature" if len(tmp) <= 0 else tmp[-1] - - fmt_url = sc.get("url", [None])[0] - - ftm_parsed = urlparse(fmt_url) - q = parse_qs(ftm_parsed.query) - q[sig_key] = [signature] - print(json.dumps(q, indent=4)) - print(sig_key) - query_str = urlencode(q) - print(query_str) - - fmt_url = urlunparse(( - ftm_parsed.scheme, - ftm_parsed.netloc, - ftm_parsed.path, - ftm_parsed.params, - query_str, - ftm_parsed.fragment - )) - - """ - if not isinstance(url, tuple): - url = compat_urllib_parse.urlparse(url) - query = kwargs.pop('query_update', None) - if query: - qs = compat_parse_qs(url.query) - qs.update(query) - kwargs['query'] = compat_urllib_parse_urlencode(qs, True) - kwargs = compat_kwargs(kwargs) - return compat_urllib_parse.urlunparse(url._replace(**kwargs)) - """ - - return { - "bitrate": fmt.get("bitrate"), - "url": fmt_url - } - - for possible_format in sorted(data, key=lambda x: x.get("bitrate", 0)): - if best_bitrate <= 0: - # no format has been found yet - best_format = possible_format - - if possible_format.get('targetDurationSec') or possible_format.get('drmFamilies'): - continue - - mime_type: str = possible_format["mimeType"] - if not mime_type.startswith("audio"): - continue - - bitrate = int(possible_format.get("bitrate", 0)) - - if bitrate > best_bitrate: - best_bitrate = bitrate - best_format = possible_format - - if bitrate >= main_settings["bitrate"]: - break - - return parse_format(best_format) + return best_format.get("url") def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: # implement the functionality yt_dl provides ydl_res = self.yt_ie._real_extract(source.url) - print(ydl_res) - source.audio_url = ydl_res.get("formats")[0].get("url") + source.audio_url = self._get_best_format(ydl_res.get("formats", [{}])) song = Song( title=ydl_res.get("title"), source_list=[source], ) return song - parsed_url = urlparse(source.url) - video_id = parse_qs(parsed_url.query)['v'] - if len(video_id) <= 0: - return song - browse_id = video_id[0] - - r = self.connection.post( - url=get_youtube_url(path="/youtubei/v1/player", query=f"key={self.credentials.api_key}&prettyPrint=false"), - json={ - "videoId": browse_id, - "context": {**self.credentials.context, "adSignalsInfo": {"params": []}} - } - ) - if r is None: - return song - - data = r.json() - - dump_to_file("yt_video_overview.json", data, exit_after_dump=False) - - available_formats = data.get("streamingData", {}).get("adaptiveFormats", []) - - if len(available_formats) > 0: - source.audio_url = self._parse_adaptive_formats(available_formats, video_id=browse_id).get("url") - - return song def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: if source.audio_url is None: