made streaming more stable

This commit is contained in:
Hellow
2023-06-16 23:27:08 +02:00
parent 7277b7e512
commit 3cba909c05
8 changed files with 738 additions and 53 deletions

View File

@@ -17,7 +17,7 @@ class Connection:
self,
host: str,
proxies: List[dict] = None,
tries: int = (len(PROXIES_LIST) + 1) * 2,
tries: int = (len(PROXIES_LIST) + 1) * 4,
timeout: int = 7,
logger: logging.Logger = logging.getLogger("connection"),
header_values: Dict[str, str] = None,
@@ -80,7 +80,7 @@ class Connection:
self,
request: Callable,
try_count: int,
accepted_response_code: set,
accepted_response_codes: set,
url: str,
timeout: float,
headers: dict,
@@ -109,7 +109,7 @@ class Connection:
try:
r: requests.Response = request(request_url, timeout=timeout, headers=headers, **kwargs)
if r.status_code in accepted_response_code:
if r.status_code in accepted_response_codes:
return r
if self.SEMANTIC_NOT_FOUND and r.status_code == 404:
@@ -136,7 +136,7 @@ class Connection:
return self._request(
request=request,
try_count=try_count+1,
accepted_response_code=accepted_response_code,
accepted_response_codes=accepted_response_codes,
url=url,
timeout=timeout,
headers=headers,
@@ -154,10 +154,13 @@ class Connection:
raw_url: bool = False,
**kwargs
) -> Optional[requests.Response]:
if accepted_response_codes is None:
accepted_response_codes = self.ACCEPTED_RESPONSE_CODES
r = self._request(
request=self.session.get,
try_count=0,
accepted_response_code=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES,
accepted_response_codes=accepted_response_codes,
url=url,
timeout=timeout,
headers=headers,
@@ -185,7 +188,7 @@ class Connection:
r = self._request(
request=self.session.post,
try_count=0,
accepted_response_code=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES,
accepted_response_codes=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES,
url=url,
timeout=timeout,
headers=headers,
@@ -215,11 +218,19 @@ class Connection:
progress: int = 0,
**kwargs
) -> DownloadResult:
if progress > 0:
if headers is None:
headers = dict()
headers["Range"] = f"bytes={target.size}-"
if accepted_response_codes is None:
accepted_response_codes = self.ACCEPTED_RESPONSE_CODES
r = self._request(
request=self.session.get,
try_count=0,
accepted_response_code=accepted_response_codes or self.ACCEPTED_RESPONSE_CODES,
accepted_response_codes=accepted_response_codes,
url=url,
timeout=timeout,
headers=headers,
@@ -238,45 +249,47 @@ class Connection:
retry = False
with target.open("wb") as f:
try:
"""
https://en.wikipedia.org/wiki/Kilobyte
> The internationally recommended unit symbol for the kilobyte is kB.
"""
with target.open("ab") as f:
"""
https://en.wikipedia.org/wiki/Kilobyte
> The internationally recommended unit symbol for the kilobyte is kB.
"""
with tqdm(total=total_size, unit='B', unit_scale=True, unit_divisor=1024, desc=description) as t:
with tqdm(total=total_size-target.size, unit='B', unit_scale=True, unit_divisor=1024, desc=description) as t:
try:
for chunk in r.iter_content(chunk_size=chunk_size):
size = f.write(chunk)
progress += size
t.update(size)
return True
except requests.exceptions.ConnectionError:
if try_count >= self.TRIES:
self.LOGGER.warning(f"Stream timed out at \"{url}\": to many retries, aborting.")
return DownloadResult(error_message=f"Stream timed out from {url}, reducing the chunksize might help.")
self.LOGGER.warning(f"Stream timed out at \"{url}\": ({try_count}-{self.TRIES})")
except requests.exceptions.ConnectionError:
if try_count >= self.TRIES:
self.LOGGER.warning(f"Stream timed out at \"{url}\": to many retries, aborting.")
return DownloadResult(error_message=f"Stream timed out from {url}, reducing the chunksize might help.")
self.LOGGER.warning(f"Stream timed out at \"{url}\": ({try_count}-{self.TRIES})")
retry = True
if total_size > progress:
retry = True
finally:
if total_size > progress or retry:
return self.stream_into(
url = url,
target = target,
description = description,
try_count=try_count+1,
progress=progress,
accepted_response_code=accepted_response_codes,
timeout=timeout,
headers=headers,
raw_url=raw_url,
refer_from_origin=refer_from_origin,
chunk_size=chunk_size,
**kwargs
)
return DownloadResult()
if retry:
self.LOGGER.warning(f"Retrying stream...")
accepted_response_codes.add(206)
return self.stream_into(
url = url,
target = target,
description = description,
try_count=try_count+1,
progress=progress,
accepted_response_codes=accepted_response_codes,
timeout=timeout,
headers=headers,
raw_url=raw_url,
refer_from_origin=refer_from_origin,
chunk_size=chunk_size,
**kwargs
)
return DownloadResult()