Lars Noack
788103a68e
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
120 lines
3.6 KiB
Python
120 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import List, Tuple, TextIO, Union, Optional
|
|
import logging
|
|
import random
|
|
import requests
|
|
from tqdm import tqdm
|
|
|
|
from .parents import OuterProxy
|
|
from ..utils.shared import HIGHEST_ID
|
|
from ..utils.config import main_settings, logging_settings
|
|
from ..utils.string_processing import fit_to_file_system
|
|
|
|
|
|
LOGGER = logging.getLogger("target")
|
|
|
|
|
|
class Target(OuterProxy):
|
|
"""
|
|
create somehow like that
|
|
```python
|
|
# I know path is pointless, and I will change that (don't worry about backwards compatibility there)
|
|
Target(file="song.mp3", path="~/Music/genre/artist/album")
|
|
```
|
|
"""
|
|
|
|
file_path: Path
|
|
|
|
_default_factories = {
|
|
}
|
|
|
|
@classmethod
|
|
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
|
|
if file_extension is not None:
|
|
name = f"{name}.{file_extension}"
|
|
|
|
return cls(main_settings["temp_directory"] / name)
|
|
|
|
# This is automatically generated
|
|
def __init__(self, file_path: Union[Path, str], relative_to_music_dir: bool = False, **kwargs) -> None:
|
|
if not isinstance(file_path, Path):
|
|
file_path = Path(file_path)
|
|
|
|
if relative_to_music_dir:
|
|
file_path = Path(main_settings["music_directory"], file_path)
|
|
|
|
super().__init__(file_path=fit_to_file_system(file_path), **kwargs)
|
|
|
|
self.is_relative_to_music_dir: bool = relative_to_music_dir
|
|
|
|
def __repr__(self) -> str:
|
|
return str(self.file_path)
|
|
|
|
@property
|
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
|
return [('filepath', self.file_path)]
|
|
|
|
@property
|
|
def exists(self) -> bool:
|
|
return self.file_path.is_file()
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
"""
|
|
returns the size the downloaded audio takes up in bytes
|
|
returns 0 if the file doesn't exist
|
|
"""
|
|
if not self.exists:
|
|
return 0
|
|
|
|
return self.file_path.stat().st_size
|
|
|
|
def create_path(self):
|
|
self.file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
def copy_content(self, copy_to: Target):
|
|
if not self.exists:
|
|
LOGGER.warning(f"No file exists at: {self.file_path}")
|
|
return
|
|
|
|
with open(self.file_path, "rb") as read_from:
|
|
copy_to.create_path()
|
|
with open(copy_to.file_path, "wb") as write_to:
|
|
write_to.write(read_from.read())
|
|
|
|
def stream_into(self, r: requests.Response, desc: str = None) -> bool:
|
|
if r is None:
|
|
return False
|
|
|
|
self.create_path()
|
|
|
|
total_size = int(r.headers.get('content-length'))
|
|
|
|
with open(self.file_path, 'wb') as f:
|
|
try:
|
|
"""
|
|
https://en.wikipedia.org/wiki/Kilobyte
|
|
> The internationally recommended unit symbol for the kilobyte is kB.
|
|
"""
|
|
with tqdm(total=total_size, unit='B', unit_scale=True, unit_divisor=1024, desc=desc) as t:
|
|
|
|
for chunk in r.iter_content(chunk_size=main_settings["chunk_size"]):
|
|
size = f.write(chunk)
|
|
t.update(size)
|
|
return True
|
|
|
|
except requests.exceptions.Timeout:
|
|
logging_settings["download_logger"].error("Stream timed out.")
|
|
return False
|
|
|
|
def open(self, file_mode: str, **kwargs) -> TextIO:
|
|
return self.file_path.open(file_mode, **kwargs)
|
|
|
|
def delete(self):
|
|
self.file_path.unlink(missing_ok=True)
|
|
|
|
def read_bytes(self) -> bytes:
|
|
return self.file_path.read_bytes()
|