feat: implemented decryption
This commit is contained in:
parent
f9b126001c
commit
311faabeab
@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
from copy import copy
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union, Type, Dict, Set, List, Tuple
|
||||
@ -145,6 +146,33 @@ class Page:
|
||||
# set this to true, if all song details can also be fetched by fetching album details
|
||||
NO_ADDITIONAL_DATA_FROM_SONG = False
|
||||
|
||||
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
|
||||
"""
|
||||
Perform a regex search on the given string, using a single or a list of
|
||||
patterns returning the first matching group.
|
||||
In case of failure return a default value or raise a WARNING or a
|
||||
RegexNotFoundError, depending on fatal, specifying the field name.
|
||||
"""
|
||||
|
||||
if isinstance(pattern, str):
|
||||
mobj = re.search(pattern, string, flags)
|
||||
else:
|
||||
for p in pattern:
|
||||
mobj = re.search(p, string, flags)
|
||||
if mobj:
|
||||
break
|
||||
|
||||
if mobj:
|
||||
if group is None:
|
||||
# return the first matching group
|
||||
return next(g for g in mobj.groups() if g is not None)
|
||||
elif isinstance(group, (list, tuple)):
|
||||
return tuple(mobj.group(g) for g in group)
|
||||
else:
|
||||
return mobj.group(group)
|
||||
|
||||
return default
|
||||
|
||||
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
||||
return None
|
||||
|
||||
|
@ -5,12 +5,16 @@ import random
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
import re
|
||||
from functools import lru_cache
|
||||
|
||||
from ...utils.exception.config import SettingValueError
|
||||
from ...utils.config import main_settings, youtube_settings, logging_settings
|
||||
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
|
||||
from ...utils.functions import get_current_millis
|
||||
|
||||
from .yt_utils.jsinterp import JSInterpreter
|
||||
|
||||
|
||||
if DEBUG:
|
||||
from ...utils.debug_utils import dump_to_file
|
||||
|
||||
@ -94,6 +98,32 @@ class YouTubeMusicCredentials:
|
||||
# the context in requests
|
||||
context: dict
|
||||
|
||||
player_url: str
|
||||
|
||||
|
||||
|
||||
@property
|
||||
def player_id(self):
|
||||
@lru_cache(128)
|
||||
def _extract_player_info(player_url):
|
||||
_PLAYER_INFO_RE = (
|
||||
r'/s/player/(?P<id>[a-zA-Z0-9_-]{8,})/player',
|
||||
r'/(?P<id>[a-zA-Z0-9_-]{8,})/player(?:_ias\.vflset(?:/[a-zA-Z]{2,3}_[a-zA-Z]{2,3})?|-plasma-ias-(?:phone|tablet)-[a-z]{2}_[A-Z]{2}\.vflset)/base\.js$',
|
||||
r'\b(?P<id>vfl[a-zA-Z0-9_-]+)\b.*?\.js$',
|
||||
)
|
||||
|
||||
for player_re in _PLAYER_INFO_RE:
|
||||
id_m = re.search(player_re, player_url)
|
||||
if id_m:
|
||||
break
|
||||
else:
|
||||
return
|
||||
|
||||
return id_m.group('id')
|
||||
|
||||
return _extract_player_info(self.player_url)
|
||||
|
||||
|
||||
|
||||
class YoutubeMusic(SuperYouTube):
|
||||
# CHANGE
|
||||
@ -106,7 +136,8 @@ class YoutubeMusic(SuperYouTube):
|
||||
self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials(
|
||||
api_key=youtube_settings["youtube_music_api_key"],
|
||||
ctoken="",
|
||||
context=youtube_settings["youtube_music_innertube_context"]
|
||||
context=youtube_settings["youtube_music_innertube_context"],
|
||||
player_url=youtube_settings["player_url"],
|
||||
)
|
||||
|
||||
self.start_millis = get_current_millis()
|
||||
@ -114,7 +145,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
|
||||
self._fetch_from_main_page()
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
SuperYouTube.__init__(self,*args, **kwargs)
|
||||
|
||||
def _fetch_from_main_page(self):
|
||||
"""
|
||||
@ -212,6 +243,41 @@ class YoutubeMusic(SuperYouTube):
|
||||
if not found_context:
|
||||
self.LOGGER.warning(f"Couldn't find a context for {type(self).__name__}.")
|
||||
|
||||
# player url
|
||||
"""
|
||||
Thanks to youtube-dl <33
|
||||
"""
|
||||
player_pattern = [
|
||||
r'(?<="jsUrl":")(.*?)(?=")',
|
||||
r'(?<="PLAYER_JS_URL":")(.*?)(?=")'
|
||||
]
|
||||
found_player_url = False
|
||||
|
||||
for pattern in player_pattern:
|
||||
for player_string in re.findall(pattern, content, re.M):
|
||||
try:
|
||||
youtube_settings["player_url"] = "https://music.youtube.com" + player_string
|
||||
found_player_url = True
|
||||
except json.decoder.JSONDecodeError:
|
||||
continue
|
||||
|
||||
self.credentials.player_url = youtube_settings["player_url"]
|
||||
break
|
||||
|
||||
if found_player_url:
|
||||
break
|
||||
|
||||
if not found_player_url:
|
||||
self.LOGGER.warning(f"Couldn't find an url for the video player.")
|
||||
|
||||
# ytcfg
|
||||
youtube_settings["ytcfg"] = json.loads(self._search_regex(
|
||||
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;',
|
||||
content,
|
||||
default='{}'
|
||||
)) or {}
|
||||
|
||||
|
||||
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
||||
return super().get_source_type(source)
|
||||
|
||||
@ -359,66 +425,85 @@ class YoutubeMusic(SuperYouTube):
|
||||
|
||||
return album
|
||||
|
||||
@staticmethod
|
||||
def _parse_adaptive_formats(data: list) -> str:
|
||||
best_url = None
|
||||
audio_format = None
|
||||
@lru_cache()
|
||||
def _extract_signature_function(self, player_url):
|
||||
r = self.connection.get(player_url)
|
||||
if r is None:
|
||||
return lambda x: None
|
||||
|
||||
code = r.text
|
||||
|
||||
funcname = self._search_regex((
|
||||
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\bm=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)',
|
||||
r'\bc&&\(c=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(c\)\)',
|
||||
r'(?:\b|[^a-zA-Z0-9$])(?P<sig>[a-zA-Z0-9$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9$]{2}\.[a-zA-Z0-9$]{2}\(a,\d+\))?',
|
||||
r'(?P<sig>[a-zA-Z0-9$]+)\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)',
|
||||
# Obsolete patterns
|
||||
r'("|\')signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
|
||||
r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\('
|
||||
),
|
||||
code, group='sig')
|
||||
|
||||
jsi = JSInterpreter(code)
|
||||
initial_function = jsi.extract_function(funcname)
|
||||
return lambda s: initial_function([s])
|
||||
|
||||
def _decrypt_signature(self, s):
|
||||
signing_func = self._extract_signature_function(player_url=youtube_settings["player_url"])
|
||||
print(signing_func)
|
||||
return signing_func(s)
|
||||
|
||||
def _parse_adaptive_formats(self, data: list, video_id) -> dict:
|
||||
best_format = None
|
||||
best_bitrate = 0
|
||||
|
||||
def decode_url(_format: dict):
|
||||
"""
|
||||
s=0%3Dw9hlenzIQkU5qD55flWqkO-wn8G6CJxI%3Dn9_OSUAUh3AiACkI5TNetQixuV6PJAxH-NFqGWGQCivQMkqprwyv8z2NAhIQRwsSdQfJAfJA
|
||||
sp=sig
|
||||
url=https://rr1---sn-cxaf0x-nugl.googlevideo.com/videoplayback%3Fexpire%3D1705426390%26ei%3DdmmmZZK1OYf41gL26KGwDg%26ip%3D129.143.170.58%26id%3Do-APgHuP61UnxvMxskECWmga1BRWYDFv91DMB7E6R_b_CG%26itag%3D18%26source%3Dyoutube%26requiressl%3Dyes%26xpc%3DEgVo2aDSNQ%253D%253D%26mh%3D_V%26mm%3D31%252C29%26mn%3Dsn-cxaf0x-nugl%252Csn-4g5edndd%26ms%3Dau%252Crdu%26mv%3Dm%26mvi%3D1%26pl%3D16%26pcm2%3Dyes%26gcr%3Dde%26initcwndbps%3D1737500%26spc%3DUWF9f6gk6WFPUFJZkjGIeb9q8NjPmmcsXzCp%26vprv%3D1%26svpuc%3D1%26mime%3Dvideo%252Fmp4%26ns%3DJnQgwQe-JazkZpURVB2rmlUQ%26cnr%3D14%26ratebypass%3Dyes%26dur%3D170.643%26lmt%3D1697280536047282%26mt%3D1705404526%26fvip%3D4%26fexp%3D24007246%26c%3DWEB_REMIX%26txp%3D2318224%26n%3DEq7jcRmeC89oLlbr%26sparams%3Dexpire%252Cei%252Cip%252Cid%252Citag%252Csource%252Crequiressl%252Cxpc%252Cpcm2%252Cgcr%252Cspc%252Cvprv%252Csvpuc%252Cmime%252Cns%252Ccnr%252Cratebypass%252Cdur%252Clmt%26lsparams%3Dmh%252Cmm%252Cmn%252Cms%252Cmv%252Cmvi%252Cpl%252Cinitcwndbps%26lsig%3DAAO5W4owRQIhAOJSldsMn2QA8b-rMr8mJoPr-9-8piIMe6J-800YB0DiAiBKLBHGfr-a6d87K0-WbsJzVf9f2DhYgv0vcntWvHmvGA%253D%253D"
|
||||
:param _format:
|
||||
:return:
|
||||
"""
|
||||
sc = parse_qs(_format["signatureCipher"])
|
||||
def parse_format(fmt: dict):
|
||||
fmt_url = fmt.get('url')
|
||||
|
||||
fmt_url = sc["url"][0]
|
||||
encrypted_sig = sc['s'][0]
|
||||
if not fmt_url:
|
||||
sc = parse_qs(possible_format["signatureCipher"])
|
||||
print(sc["s"][0])
|
||||
signature = self._decrypt_signature(sc['s'][0])
|
||||
print(signature)
|
||||
|
||||
if not (sc and fmt_url and encrypted_sig):
|
||||
return
|
||||
sp = sc.get("sp", ["sig"])[0]
|
||||
fmt_url = sc.get("url", [None])[0]
|
||||
|
||||
"""
|
||||
if not player_url:
|
||||
player_url = self._extract_player_url(webpage)
|
||||
if not player_url:
|
||||
return
|
||||
|
||||
signature = self._decrypt_signature(sc['s'][0], video_id, player_url)
|
||||
sp = try_get(sc, lambda x: x['sp'][0]) or 'signature'
|
||||
fmt_url += '&' + sp + '=' + signature
|
||||
"""
|
||||
fmt_url += '&' + sp + '=' + signature
|
||||
|
||||
for possible_format in data:
|
||||
_url_list = parse_qs(possible_format["signatureCipher"])["url"]
|
||||
if len(_url_list) <= 0:
|
||||
return {
|
||||
"bitrate": fmt.get("bitrate"),
|
||||
"url": fmt_url
|
||||
}
|
||||
|
||||
for possible_format in sorted(data, key=lambda x: x.get("bitrate", 0)):
|
||||
if best_bitrate <= 0:
|
||||
# no format has been found yet
|
||||
best_format = possible_format
|
||||
|
||||
if possible_format.get('targetDurationSec') or possible_format.get('drmFamilies'):
|
||||
continue
|
||||
|
||||
url = _url_list[0]
|
||||
if best_url is None:
|
||||
best_url = url
|
||||
|
||||
mime_type: str = possible_format["mimeType"]
|
||||
if not mime_type.startswith("audio"):
|
||||
continue
|
||||
|
||||
bitrate = int(possible_format.get("bitrate", 0))
|
||||
|
||||
if bitrate >= main_settings["bitrate"]:
|
||||
best_bitrate = bitrate
|
||||
audio_format = possible_format
|
||||
best_url = url
|
||||
break
|
||||
|
||||
if bitrate > best_bitrate:
|
||||
best_bitrate = bitrate
|
||||
audio_format = possible_format
|
||||
best_url = url
|
||||
best_format = possible_format
|
||||
|
||||
return best_url
|
||||
if bitrate >= main_settings["bitrate"]:
|
||||
break
|
||||
|
||||
return parse_format(best_format)
|
||||
|
||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||
"""
|
||||
@ -471,7 +556,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
available_formats = data.get("streamingData", {}).get("adaptiveFormats", [])
|
||||
|
||||
if len(available_formats) > 0:
|
||||
source.audio_url = self._parse_adaptive_formats(available_formats)
|
||||
source.audio_url = self._parse_adaptive_formats(available_formats, video_id=browse_id).get("url")
|
||||
|
||||
return song
|
||||
|
||||
|
3308
src/music_kraken/pages/youtube_music/yt_utils/compat.py
Normal file
3308
src/music_kraken/pages/youtube_music/yt_utils/compat.py
Normal file
File diff suppressed because it is too large
Load Diff
1054
src/music_kraken/pages/youtube_music/yt_utils/jsinterp.py
Normal file
1054
src/music_kraken/pages/youtube_music/yt_utils/jsinterp.py
Normal file
File diff suppressed because it is too large
Load Diff
273
src/music_kraken/pages/youtube_music/yt_utils/socks.py
Normal file
273
src/music_kraken/pages/youtube_music/yt_utils/socks.py
Normal file
@ -0,0 +1,273 @@
|
||||
# Public Domain SOCKS proxy protocol implementation
|
||||
# Adapted from https://gist.github.com/bluec0re/cafd3764412967417fd3
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
# References:
|
||||
# SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
|
||||
# SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
|
||||
# SOCKS5 protocol https://tools.ietf.org/html/rfc1928
|
||||
# SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
|
||||
|
||||
import collections
|
||||
import socket
|
||||
|
||||
from .compat import (
|
||||
compat_ord,
|
||||
compat_struct_pack,
|
||||
compat_struct_unpack,
|
||||
)
|
||||
|
||||
__author__ = 'Timo Schmid <coding@timoschmid.de>'
|
||||
|
||||
SOCKS4_VERSION = 4
|
||||
SOCKS4_REPLY_VERSION = 0x00
|
||||
# Excerpt from SOCKS4A protocol:
|
||||
# if the client cannot resolve the destination host's domain name to find its
|
||||
# IP address, it should set the first three bytes of DSTIP to NULL and the last
|
||||
# byte to a non-zero value.
|
||||
SOCKS4_DEFAULT_DSTIP = compat_struct_pack('!BBBB', 0, 0, 0, 0xFF)
|
||||
|
||||
SOCKS5_VERSION = 5
|
||||
SOCKS5_USER_AUTH_VERSION = 0x01
|
||||
SOCKS5_USER_AUTH_SUCCESS = 0x00
|
||||
|
||||
|
||||
class Socks4Command(object):
|
||||
CMD_CONNECT = 0x01
|
||||
CMD_BIND = 0x02
|
||||
|
||||
|
||||
class Socks5Command(Socks4Command):
|
||||
CMD_UDP_ASSOCIATE = 0x03
|
||||
|
||||
|
||||
class Socks5Auth(object):
|
||||
AUTH_NONE = 0x00
|
||||
AUTH_GSSAPI = 0x01
|
||||
AUTH_USER_PASS = 0x02
|
||||
AUTH_NO_ACCEPTABLE = 0xFF # For server response
|
||||
|
||||
|
||||
class Socks5AddressType(object):
|
||||
ATYP_IPV4 = 0x01
|
||||
ATYP_DOMAINNAME = 0x03
|
||||
ATYP_IPV6 = 0x04
|
||||
|
||||
|
||||
class ProxyError(socket.error):
|
||||
ERR_SUCCESS = 0x00
|
||||
|
||||
def __init__(self, code=None, msg=None):
|
||||
if code is not None and msg is None:
|
||||
msg = self.CODES.get(code) or 'unknown error'
|
||||
super(ProxyError, self).__init__(code, msg)
|
||||
|
||||
|
||||
class InvalidVersionError(ProxyError):
|
||||
def __init__(self, expected_version, got_version):
|
||||
msg = ('Invalid response version from server. Expected {0:02x} got '
|
||||
'{1:02x}'.format(expected_version, got_version))
|
||||
super(InvalidVersionError, self).__init__(0, msg)
|
||||
|
||||
|
||||
class Socks4Error(ProxyError):
|
||||
ERR_SUCCESS = 90
|
||||
|
||||
CODES = {
|
||||
91: 'request rejected or failed',
|
||||
92: 'request rejected because SOCKS server cannot connect to identd on the client',
|
||||
93: 'request rejected because the client program and identd report different user-ids'
|
||||
}
|
||||
|
||||
|
||||
class Socks5Error(ProxyError):
|
||||
ERR_GENERAL_FAILURE = 0x01
|
||||
|
||||
CODES = {
|
||||
0x01: 'general SOCKS server failure',
|
||||
0x02: 'connection not allowed by ruleset',
|
||||
0x03: 'Network unreachable',
|
||||
0x04: 'Host unreachable',
|
||||
0x05: 'Connection refused',
|
||||
0x06: 'TTL expired',
|
||||
0x07: 'Command not supported',
|
||||
0x08: 'Address type not supported',
|
||||
0xFE: 'unknown username or invalid password',
|
||||
0xFF: 'all offered authentication methods were rejected'
|
||||
}
|
||||
|
||||
|
||||
class ProxyType(object):
|
||||
SOCKS4 = 0
|
||||
SOCKS4A = 1
|
||||
SOCKS5 = 2
|
||||
|
||||
|
||||
Proxy = collections.namedtuple('Proxy', (
|
||||
'type', 'host', 'port', 'username', 'password', 'remote_dns'))
|
||||
|
||||
|
||||
class sockssocket(socket.socket):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._proxy = None
|
||||
super(sockssocket, self).__init__(*args, **kwargs)
|
||||
|
||||
def setproxy(self, proxytype, addr, port, rdns=True, username=None, password=None):
|
||||
assert proxytype in (ProxyType.SOCKS4, ProxyType.SOCKS4A, ProxyType.SOCKS5)
|
||||
|
||||
self._proxy = Proxy(proxytype, addr, port, username, password, rdns)
|
||||
|
||||
def recvall(self, cnt):
|
||||
data = b''
|
||||
while len(data) < cnt:
|
||||
cur = self.recv(cnt - len(data))
|
||||
if not cur:
|
||||
raise EOFError('{0} bytes missing'.format(cnt - len(data)))
|
||||
data += cur
|
||||
return data
|
||||
|
||||
def _recv_bytes(self, cnt):
|
||||
data = self.recvall(cnt)
|
||||
return compat_struct_unpack('!{0}B'.format(cnt), data)
|
||||
|
||||
@staticmethod
|
||||
def _len_and_data(data):
|
||||
return compat_struct_pack('!B', len(data)) + data
|
||||
|
||||
def _check_response_version(self, expected_version, got_version):
|
||||
if got_version != expected_version:
|
||||
self.close()
|
||||
raise InvalidVersionError(expected_version, got_version)
|
||||
|
||||
def _resolve_address(self, destaddr, default, use_remote_dns):
|
||||
try:
|
||||
return socket.inet_aton(destaddr)
|
||||
except socket.error:
|
||||
if use_remote_dns and self._proxy.remote_dns:
|
||||
return default
|
||||
else:
|
||||
return socket.inet_aton(socket.gethostbyname(destaddr))
|
||||
|
||||
def _setup_socks4(self, address, is_4a=False):
|
||||
destaddr, port = address
|
||||
|
||||
ipaddr = self._resolve_address(destaddr, SOCKS4_DEFAULT_DSTIP, use_remote_dns=is_4a)
|
||||
|
||||
packet = compat_struct_pack('!BBH', SOCKS4_VERSION, Socks4Command.CMD_CONNECT, port) + ipaddr
|
||||
|
||||
username = (self._proxy.username or '').encode('utf-8')
|
||||
packet += username + b'\x00'
|
||||
|
||||
if is_4a and self._proxy.remote_dns:
|
||||
packet += destaddr.encode('utf-8') + b'\x00'
|
||||
|
||||
self.sendall(packet)
|
||||
|
||||
version, resp_code, dstport, dsthost = compat_struct_unpack('!BBHI', self.recvall(8))
|
||||
|
||||
self._check_response_version(SOCKS4_REPLY_VERSION, version)
|
||||
|
||||
if resp_code != Socks4Error.ERR_SUCCESS:
|
||||
self.close()
|
||||
raise Socks4Error(resp_code)
|
||||
|
||||
return (dsthost, dstport)
|
||||
|
||||
def _setup_socks4a(self, address):
|
||||
self._setup_socks4(address, is_4a=True)
|
||||
|
||||
def _socks5_auth(self):
|
||||
packet = compat_struct_pack('!B', SOCKS5_VERSION)
|
||||
|
||||
auth_methods = [Socks5Auth.AUTH_NONE]
|
||||
if self._proxy.username and self._proxy.password:
|
||||
auth_methods.append(Socks5Auth.AUTH_USER_PASS)
|
||||
|
||||
packet += compat_struct_pack('!B', len(auth_methods))
|
||||
packet += compat_struct_pack('!{0}B'.format(len(auth_methods)), *auth_methods)
|
||||
|
||||
self.sendall(packet)
|
||||
|
||||
version, method = self._recv_bytes(2)
|
||||
|
||||
self._check_response_version(SOCKS5_VERSION, version)
|
||||
|
||||
if method == Socks5Auth.AUTH_NO_ACCEPTABLE or (
|
||||
method == Socks5Auth.AUTH_USER_PASS and (not self._proxy.username or not self._proxy.password)):
|
||||
self.close()
|
||||
raise Socks5Error(Socks5Auth.AUTH_NO_ACCEPTABLE)
|
||||
|
||||
if method == Socks5Auth.AUTH_USER_PASS:
|
||||
username = self._proxy.username.encode('utf-8')
|
||||
password = self._proxy.password.encode('utf-8')
|
||||
packet = compat_struct_pack('!B', SOCKS5_USER_AUTH_VERSION)
|
||||
packet += self._len_and_data(username) + self._len_and_data(password)
|
||||
self.sendall(packet)
|
||||
|
||||
version, status = self._recv_bytes(2)
|
||||
|
||||
self._check_response_version(SOCKS5_USER_AUTH_VERSION, version)
|
||||
|
||||
if status != SOCKS5_USER_AUTH_SUCCESS:
|
||||
self.close()
|
||||
raise Socks5Error(Socks5Error.ERR_GENERAL_FAILURE)
|
||||
|
||||
def _setup_socks5(self, address):
|
||||
destaddr, port = address
|
||||
|
||||
ipaddr = self._resolve_address(destaddr, None, use_remote_dns=True)
|
||||
|
||||
self._socks5_auth()
|
||||
|
||||
reserved = 0
|
||||
packet = compat_struct_pack('!BBB', SOCKS5_VERSION, Socks5Command.CMD_CONNECT, reserved)
|
||||
if ipaddr is None:
|
||||
destaddr = destaddr.encode('utf-8')
|
||||
packet += compat_struct_pack('!B', Socks5AddressType.ATYP_DOMAINNAME)
|
||||
packet += self._len_and_data(destaddr)
|
||||
else:
|
||||
packet += compat_struct_pack('!B', Socks5AddressType.ATYP_IPV4) + ipaddr
|
||||
packet += compat_struct_pack('!H', port)
|
||||
|
||||
self.sendall(packet)
|
||||
|
||||
version, status, reserved, atype = self._recv_bytes(4)
|
||||
|
||||
self._check_response_version(SOCKS5_VERSION, version)
|
||||
|
||||
if status != Socks5Error.ERR_SUCCESS:
|
||||
self.close()
|
||||
raise Socks5Error(status)
|
||||
|
||||
if atype == Socks5AddressType.ATYP_IPV4:
|
||||
destaddr = self.recvall(4)
|
||||
elif atype == Socks5AddressType.ATYP_DOMAINNAME:
|
||||
alen = compat_ord(self.recv(1))
|
||||
destaddr = self.recvall(alen)
|
||||
elif atype == Socks5AddressType.ATYP_IPV6:
|
||||
destaddr = self.recvall(16)
|
||||
destport = compat_struct_unpack('!H', self.recvall(2))[0]
|
||||
|
||||
return (destaddr, destport)
|
||||
|
||||
def _make_proxy(self, connect_func, address):
|
||||
if not self._proxy:
|
||||
return connect_func(self, address)
|
||||
|
||||
result = connect_func(self, (self._proxy.host, self._proxy.port))
|
||||
if result != 0 and result is not None:
|
||||
return result
|
||||
setup_funcs = {
|
||||
ProxyType.SOCKS4: self._setup_socks4,
|
||||
ProxyType.SOCKS4A: self._setup_socks4a,
|
||||
ProxyType.SOCKS5: self._setup_socks5,
|
||||
}
|
||||
setup_funcs[self._proxy.type](address)
|
||||
return result
|
||||
|
||||
def connect(self, address):
|
||||
self._make_proxy(socket.socket.connect, address)
|
||||
|
||||
def connect_ex(self, address):
|
||||
return self._make_proxy(socket.socket.connect_ex, address)
|
6513
src/music_kraken/pages/youtube_music/yt_utils/utils.py
Normal file
6513
src/music_kraken/pages/youtube_music/yt_utils/utils.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -32,7 +32,7 @@ class ConfigDict(dict):
|
||||
|
||||
|
||||
class Config:
|
||||
def __init__(self, component_list: Tuple[Union[Attribute, Description, EmptyLine]], config_file: Path) -> None:
|
||||
def __init__(self, component_list: Tuple[Union[Attribute, Description, EmptyLine], ...], config_file: Path) -> None:
|
||||
self.config_file: Path = config_file
|
||||
|
||||
self.component_list: List[Union[Attribute, Description, EmptyLine]] = [
|
||||
|
@ -9,7 +9,7 @@ from ..attributes.attribute import Attribute
|
||||
from ..attributes.special_attributes import SelectAttribute, PathAttribute, UrlAttribute
|
||||
|
||||
|
||||
config = Config([
|
||||
config = Config((
|
||||
Attribute(name="use_youtube_alongside_youtube_music", default_value=False, description="""If set to true, it will search youtube through invidious and piped,
|
||||
despite a direct wrapper for the youtube music INNERTUBE api being implemented.
|
||||
I my INNERTUBE api wrapper doesn't work, set this to true."""),
|
||||
@ -35,6 +35,9 @@ Dw. if it is empty, Rachel will fetch it automatically for you <333
|
||||
If any instance seems to be missing, run music kraken with the -f flag."""),
|
||||
Attribute(name="use_sponsor_block", default_value=True, description="Use sponsor block to remove adds or simmilar from the youtube videos."),
|
||||
|
||||
Attribute(name="player_url", default_value="/s/player/80b90bfd/player_ias.vflset/en_US/base.js", description="""
|
||||
This is needed to fetch videos without invidious
|
||||
"""),
|
||||
Attribute(name="youtube_music_consent_cookies", default_value={
|
||||
"CONSENT": "PENDING+258"
|
||||
}, description="The cookie with the key CONSENT says to what stuff you agree. Per default you decline all cookies, but it honestly doesn't matter."),
|
||||
@ -89,8 +92,9 @@ If any instance seems to be missing, run music kraken with the -f flag."""),
|
||||
"adSignalsInfo": {
|
||||
"params": []
|
||||
}
|
||||
}, description="Don't bother about this. It is something technical, but if you wanna change the innertube requests... go on.")
|
||||
], LOCATIONS.get_config_file("youtube"))
|
||||
}, description="Don't bother about this. It is something technical, but if you wanna change the innertube requests... go on."),
|
||||
Attribute(name="ytcfg", description="Please... ignore it.", default_value={})
|
||||
), LOCATIONS.get_config_file("youtube"))
|
||||
|
||||
|
||||
class SettingsStructure(TypedDict):
|
||||
@ -102,5 +106,7 @@ class SettingsStructure(TypedDict):
|
||||
youtube_music_clean_data: bool
|
||||
youtube_url: List[ParseResult]
|
||||
use_sponsor_block: bool
|
||||
player_url: str
|
||||
youtube_music_innertube_context: dict
|
||||
youtube_music_consent_cookies: dict
|
||||
ytcfg: dict
|
||||
|
Loading…
Reference in New Issue
Block a user