From d6d20aaf1bfd34ec4ecf1144d5191c2b08d46473 Mon Sep 17 00:00:00 2001 From: Hellow Date: Thu, 20 Apr 2023 18:47:47 +0200 Subject: [PATCH] improved function to connect to the internet --- src/music_kraken/connection/__init__.py | 0 src/music_kraken/connection/connection.py | 153 ++++++++++++++++++++ src/music_kraken/connection/rotating.py | 56 +++++++ src/music_kraken/utils/config/connection.py | 8 +- src/music_kraken/utils/shared.py | 3 +- 5 files changed, 216 insertions(+), 4 deletions(-) create mode 100644 src/music_kraken/connection/__init__.py create mode 100644 src/music_kraken/connection/connection.py create mode 100644 src/music_kraken/connection/rotating.py diff --git a/src/music_kraken/connection/__init__.py b/src/music_kraken/connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/music_kraken/connection/connection.py b/src/music_kraken/connection/connection.py new file mode 100644 index 0000000..c84779c --- /dev/null +++ b/src/music_kraken/connection/connection.py @@ -0,0 +1,153 @@ +from typing import List, Dict, Callable, Optional, Set +from urllib.parse import urlparse, urlunsplit +import logging + +import requests + +from .rotating import RotatingProxy +from ..utils.shared import PROXIES_LIST + +LOGGER = logging.getLogger("connection") + + +class Connection: + def __init__( + self, + host: str, + proxies: List[dict] = None, + tries: int = (len(PROXIES_LIST) + 1) * 2, + timeout: int = 7, + header_values: Dict[str, str] = None, + session: requests.Session = None, + accepted_response_codes: Set[int] = None, + semantic_not_found: bool = True + ): + if proxies is None: + proxies = PROXIES_LIST + if header_values is None: + header_values = dict() + + self.LOGGER = LOGGER + self.HOST = urlparse(host) + self.TRIES = tries + self.TIMEOUT = timeout + self.rotating_proxy = RotatingProxy(proxy_list=proxies) + + self.ACCEPTED_RESPONSE_CODES = accepted_response_codes or {200} + self.SEMANTIC_NOT_FOUND = semantic_not_found + + self.session: requests.Session = session + if self.session is None: + self.new_session(**header_values) + else: + self.rotating_proxy.register_session(session) + self.set_header() + + @property + def base_url(self): + return urlunsplit((self.HOST.scheme, self.HOST.netloc, "", "", "")) + + def new_session(self, **header_values): + session = requests.Session() + session.headers = self.get_header(**header_values) + self.rotating_proxy.register_session(session) + self.session = session + + def get_header(self, **header_values) -> Dict[str, str]: + return { + "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0", + "Connection": "keep-alive", + "Host": self.HOST.netloc, + "Referer": self.base_url, + **header_values + } + + def set_header(self, **header_values): + self.session.headers = self.get_header(**header_values) + + def rotate(self): + self.rotating_proxy.rotate() + + def _request( + self, + request: Callable, + try_count: int, + accepted_response_code: set, + url: str, + **kwargs + ) -> Optional[requests.Response]: + if try_count >= self.TRIES: + return + + retry = False + try: + r = request(url=url, **kwargs) + except requests.exceptions.Timeout: + self.LOGGER.warning(f"Request timed out at \"{url}\": ({try_count}-{self.TRIES})") + retry = True + except requests.exceptions.ConnectionError: + self.LOGGER.warning(f"Couldn't connect to \"{url}\": ({try_count}-{self.TRIES})") + retry = True + + if not retry: + if self.SEMANTIC_NOT_FOUND and r.status_code == 404: + self.LOGGER.warning(f"Couldn't find url (404): {url}") + return + if r.status_code in accepted_response_code: + return r + + if not retry: + self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} " + f"at {url}. ({try_count}-{self.TRIES})") + self.LOGGER.debug(r.content) + + self.rotate() + + return self._request( + request=request, + try_count=try_count, + accepted_response_code=accepted_response_code, + url=url, + **kwargs + ) + + def get( + self, + url: str, + stream: bool = False, + accepted_response_codes: set = None, + **kwargs + ) -> Optional[requests.Response]: + r = self._request( + request=self.session.get, + try_count=0, + accepted_response_code=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES, + url=url, + stream=stream, + **kwargs + ) + if r is None: + self.LOGGER.warning(f"Max attempts ({self.TRIES}) exceeded for: GET:{url}") + return r + + def post( + self, + url: str, + json: dict, + stream: bool = False, + accepted_response_codes: set = None, + **kwargs + ) -> Optional[requests.Response]: + r = self._request( + request=self.session.post, + try_count=0, + accepted_response_code=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES, + url=url, + json=json, + stream=stream, + **kwargs + ) + if r is None: + self.LOGGER.warning(f"Max attempts ({self.TRIES}) exceeded for: GET:{url}") + self.LOGGER.warning(f"payload: {json}") + return r diff --git a/src/music_kraken/connection/rotating.py b/src/music_kraken/connection/rotating.py new file mode 100644 index 0000000..adb4010 --- /dev/null +++ b/src/music_kraken/connection/rotating.py @@ -0,0 +1,56 @@ +from typing import Dict, List + +import requests + + +class RotatingObject: + """ + This will be used for RotatingProxies and invidious instances. + """ + def __init__(self, object_list: list): + self._object_list: list = object_list + + if len(self._object_list) <= 0: + raise ValueError("There needs to be at least one item in a Rotating structure.") + + self._current_index = 0 + + @property + def object(self): + return self._object_list[self._current_index] + + def __len__(self): + return len(self._object_list) + + @property + def next(self): + self._current_index = (self._current_index + 1) % len(self._object_list) + + return self._object_list[self._current_index] + + +class RotatingProxy(RotatingObject): + def __init__(self, proxy_list: List[Dict[str, str]], session_list: List[requests.Session] = None): + self._session_list: List[requests.Session] = session_list + if self._session_list is None: + self._session_list = [] + + super().__init__(proxy_list if len(proxy_list) > 0 else [{}]) + + def register_session(self, session: requests.Session): + self._session_list.append(session) + session.proxies = self.current_proxy + + def rotate(self): + new_proxy = self.next + + for session in self._session_list: + session.proxies = new_proxy + + @property + def current_proxy(self) -> Dict[str, str]: + return super().object + + @property + def next(self) -> Dict[str, str]: + return super().object diff --git a/src/music_kraken/utils/config/connection.py b/src/music_kraken/utils/config/connection.py index 2bd5824..d6bae7a 100644 --- a/src/music_kraken/utils/config/connection.py +++ b/src/music_kraken/utils/config/connection.py @@ -14,6 +14,7 @@ class ProxAttribute(ListAttribute): 'ftp': value } + class UrlListAttribute(ListAttribute): def validate(self, value: str): v = value.strip() @@ -24,10 +25,9 @@ class UrlListAttribute(ListAttribute): setting_value=v, rule="has to be a valid url" ) - + def single_object_from_element(self, value: str): return urlparse(value) - class ConnectionSection(Section): @@ -62,7 +62,7 @@ class ConnectionSection(Section): "all the error messages are shown.", value="0.3" ) - + # INVIDIOUS INSTANCES LIST self.INVIDIOUS_INSTANCE_LIST = UrlListAttribute( name="invidious_instances", @@ -80,6 +80,8 @@ class ConnectionSection(Section): # INVIDIOUS PROXY self.INVIDIOUS_PROXY_VIDEOS = BoolAttribute( name="invidious_proxy_video", + value="false", + description="Downloads the videos using the given instances." ) self.attribute_list = [ diff --git a/src/music_kraken/utils/shared.py b/src/music_kraken/utils/shared.py index 782a1e8..ef396e2 100644 --- a/src/music_kraken/utils/shared.py +++ b/src/music_kraken/utils/shared.py @@ -1,7 +1,7 @@ import logging import random from pathlib import Path -from typing import List, Tuple, Set +from typing import List, Tuple, Set, Dict from .path_manager import LOCATIONS from .config import LOGGING_SECTION, AUDIO_SECTION, CONNECTION_SECTION, MISC_SECTION, PATHS_SECTION @@ -77,6 +77,7 @@ DEFAULT_VALUES = { } TOR: bool = CONNECTION_SECTION.USE_TOR.object_from_value +PROXIES_LIST: List[Dict[str, str]] = CONNECTION_SECTION.PROXIES.object_from_value proxies = {} if len(CONNECTION_SECTION.PROXIES) > 0: """