Compare commits

..

4 Commits

Author SHA1 Message Date
ae921c3626 feat: cleaned song title from youtube music
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-26 14:29:56 +02:00
f52b5e6325 fix: properly stored encoding now 2024-04-26 14:24:14 +02:00
25eceb727b fix: encoding of cache 2024-04-26 14:04:44 +02:00
e77afa584b feat: added caching to youtube 2024-04-26 13:50:17 +02:00
7 changed files with 56 additions and 27 deletions

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Ruffiction",
"d: 8",
"s: #a Crystal F",
"d: 20",
]

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Optional
from functools import lru_cache
@ -18,6 +18,8 @@ class CacheAttribute:
created: datetime
expires: datetime
additional_info: dict = field(default_factory=dict)
@property
def id(self):
return f"{self.module}_{self.name}"
@ -32,6 +34,12 @@ class CacheAttribute:
return self.__dict__ == other.__dict__
@dataclass
class CacheResult:
content: bytes
attribute: CacheAttribute
class Cache:
def __init__(self, module: str, logger: logging.Logger):
self.module = module
@ -49,13 +57,16 @@ class Cache:
self._time_fields = {"created", "expires"}
with self.index.open("r") as i:
for c in json.loads(i.read()):
for key in self._time_fields:
c[key] = datetime.fromisoformat(c[key])
try:
for c in json.loads(i.read()):
for key in self._time_fields:
c[key] = datetime.fromisoformat(c[key])
ca = CacheAttribute(**c)
self.cached_attributes.append(ca)
self._id_to_attribute[ca.id] = ca
ca = CacheAttribute(**c)
self.cached_attributes.append(ca)
self._id_to_attribute[ca.id] = ca
except json.JSONDecodeError:
pass
@lru_cache()
def _init_module(self, module: str) -> Path:
@ -100,7 +111,7 @@ class Cache:
return True
def set(self, content: bytes, name: str, expires_in: float = 10, module: str = ""):
def set(self, content: bytes, name: str, expires_in: float = 10, module: str = "", additional_info: dict = None):
"""
:param content:
:param module:
@ -111,6 +122,7 @@ class Cache:
if name == "":
return
additional_info = additional_info or {}
module = self.module if module == "" else module
module_path = self._init_module(module)
@ -120,6 +132,7 @@ class Cache:
name=name,
created=datetime.now(),
expires=datetime.now() + timedelta(days=expires_in),
additional_info=additional_info,
)
self._write_attribute(cache_attribute)
@ -128,7 +141,7 @@ class Cache:
self.logger.debug(f"writing cache to {cache_path}")
content_file.write(content)
def get(self, name: str) -> Optional[bytes]:
def get(self, name: str) -> Optional[CacheResult]:
path = fit_to_file_system(Path(self._dir, self.module, name), hidden_ok=True)
if not path.is_file():
@ -140,7 +153,7 @@ class Cache:
return
with path.open("rb") as f:
return f.read()
return CacheResult(content=f.read(), attribute=existing_attribute)
def clean(self):
keep = set()

View File

@ -125,12 +125,17 @@ class Connection:
return headers
def save(self, r: requests.Response, name: str, error: bool = False, **kwargs):
def save(self, r: requests.Response, name: str, error: bool = False, no_update_if_valid_exists: bool = False, **kwargs):
n_kwargs = {}
if error:
n_kwargs["module"] = "failed_requests"
self.cache.set(r.content, name, expires_in=kwargs.get("expires_in", self.cache_expiring_duration), **n_kwargs)
if self.cache.get(name) is not None and no_update_if_valid_exists:
return
self.cache.set(r.content, name, expires_in=kwargs.get("expires_in", self.cache_expiring_duration), additional_info={
"encoding": r.encoding,
}, **n_kwargs)
def request(
self,
@ -145,6 +150,7 @@ class Connection:
sleep_after_404: float = None,
is_heartbeat: bool = False,
disable_cache: bool = None,
enable_cache_readonly: bool = False,
method: str = None,
name: str = "",
exclude_headers: List[str] = None,
@ -178,17 +184,23 @@ class Connection:
request_url = parsed_url.geturl() if not raw_url else url
if name != "" and not disable_cache:
if name != "" and (not disable_cache or enable_cache_readonly):
cached = self.cache.get(name)
if cached is not None:
request_trace(f"{trace_string}\t[cached]")
with responses.RequestsMock() as resp:
additional_info = cached.attribute.additional_info
body = cached.content
if "encoding" in additional_info:
body = body.decode(additional_info["encoding"])
resp.add(
method=method,
url=request_url,
body=cached,
body=body,
)
return requests.request(method=method, url=url, timeout=timeout, headers=headers, **kwargs)

View File

@ -451,7 +451,7 @@ class Page:
source = sources[0]
if not found_on_disc:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.title)
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error:
r.merge(self._post_process_targets(song, temp_target,

View File

@ -1128,4 +1128,4 @@ class Musify(Page):
self.LOGGER.warning(f"The source has no audio link. Falling back to {endpoint}.")
return self.stream_connection.stream_into(endpoint, target, raw_url=True, exclude_headers=["Host"])
return self.stream_connection.stream_into(endpoint, target, raw_url=True, exclude_headers=["Host"], name=desc)

View File

@ -2,6 +2,7 @@ from typing import List, Optional
from enum import Enum
from ...utils.config import youtube_settings, logging_settings
from ...utils.string_processing import clean_song_title
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
@ -59,7 +60,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=element_text, source_list=[source])
return Song(title=clean_song_title(element_text), source_list=[source])
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")

View File

@ -171,7 +171,7 @@ class YoutubeMusic(SuperYouTube):
def __init__(self, *args, ydl_opts: dict = None, **kwargs):
self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection(
logger=self.LOGGER,
accept_language="en-US,en;q=0.5"
accept_language="en-US,en;q=0.5",
)
self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials(
api_key=youtube_settings["youtube_music_api_key"],
@ -212,7 +212,7 @@ class YoutubeMusic(SuperYouTube):
search for: "innertubeApiKey"
"""
r = self.yt_music_connection.get("https://music.youtube.com/")
r = self.yt_music_connection.get("https://music.youtube.com/", name="youtube_music_index.html", disable_cache=True, enable_cache_readonly=True)
if r is None:
return
@ -232,7 +232,7 @@ class YoutubeMusic(SuperYouTube):
'set_ytc': 'true',
'set_apyt': 'true',
'set_eom': 'false'
})
}, disable_cache=True)
if r is None:
return
@ -247,9 +247,9 @@ class YoutubeMusic(SuperYouTube):
# save cookies in settings
youtube_settings["youtube_music_consent_cookies"] = cookie_dict
else:
self.yt_music_connection.save(r, "index.html")
self.yt_music_connection.save(r, "youtube_music_index.html", no_update_if_valid_exists=True)
r = self.yt_music_connection.get("https://music.youtube.com/", name="index.html")
r = self.yt_music_connection.get("https://music.youtube.com/", name="youtube_music_index.html")
if r is None:
return
@ -374,7 +374,8 @@ class YoutubeMusic(SuperYouTube):
},
headers={
"Referer": get_youtube_url(path=f"/search", query=f"q={urlescaped_query}")
}
},
name=f"search_{search_query}.json"
)
if r is None:
@ -411,7 +412,8 @@ class YoutubeMusic(SuperYouTube):
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
}
},
name=f"fetch_artist_{browse_id}.json"
)
if r is None:
return artist
@ -454,7 +456,8 @@ class YoutubeMusic(SuperYouTube):
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
}
},
name=f"fetch_album_{browse_id}.json"
)
if r is None:
return album