Merge pull request 'fix/caching_signatures' (#32) from fix/caching_signatures into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

Reviewed-on: #32
This commit is contained in:
Hazel 2024-05-13 15:15:37 +00:00
commit 9769cf4033
10 changed files with 68 additions and 15 deletions

View File

@ -24,6 +24,7 @@
"encyclopaedia", "encyclopaedia",
"ENDC", "ENDC",
"Gitea", "Gitea",
"isrc",
"levenshtein", "levenshtein",
"metallum", "metallum",
"musify", "musify",

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Psychonaut 4", "s: #a Crystal F",
"d: 0", "d: 20",
] ]

View File

@ -112,8 +112,6 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
USLT(encoding=3, lang=u'eng', desc=u'desc', text=uslt_val) USLT(encoding=3, lang=u'eng', desc=u'desc', text=uslt_val)
) )
mutagen_file = mutagen.File(target.file_path)
id3_object.add_metadata(metadata) id3_object.add_metadata(metadata)
id3_object.save() id3_object.save()

View File

@ -301,6 +301,33 @@ class OuterProxy:
return r return r
@property
def root_collections(self) -> List[Collection]:
if len(self.UPWARDS_COLLECTION_STRING_ATTRIBUTES) == 0:
return [self]
r = []
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
def _compile(self, **kwargs):
pass
def compile(self, from_root=False, **kwargs):
# compile from the root
if not from_root:
for c in self.root_collections:
c.compile(from_root=True, **kwargs)
return
self._compile(**kwargs)
for c_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
for c in self.__getattribute__(c_attribute):
c.compile(from_root=True, **kwargs)
TITEL = "id" TITEL = "id"
@property @property
def title_string(self) -> str: def title_string(self) -> str:

View File

@ -376,6 +376,25 @@ class Album(Base):
r += f" with {len(self.song_collection)} songs" r += f" with {len(self.song_collection)} songs"
return r return r
def _compile(self):
self.analyze_implied_album_type()
self.update_tracksort()
def analyze_implied_album_type(self):
# if the song collection has only one song, it is reasonable to assume that it is a single
if len(self.song_collection) == 1:
self.album_type = AlbumType.SINGLE
return
# if the album already has an album type, we don't need to do anything
if self.album_type is not AlbumType.OTHER:
return
# for information on EP's I looked at https://www.reddit.com/r/WeAreTheMusicMakers/comments/a354ql/whats_the_cutoff_length_between_ep_and_album/
if len(self.song_collection) < 9:
self.album_type = AlbumType.EP
return
def update_tracksort(self): def update_tracksort(self):
""" """
This updates the tracksort attributes, of the songs in This updates the tracksort attributes, of the songs in
@ -525,6 +544,9 @@ class Artist(Base):
self.label_collection.extend(object_list) self.label_collection.extend(object_list)
return return
def _compile(self):
self.update_albumsort()
def update_albumsort(self): def update_albumsort(self):
""" """
This updates the albumsort attributes, of the albums in This updates the albumsort attributes, of the albums in
@ -535,9 +557,6 @@ class Artist(Base):
:return: :return:
""" """
if len(self.main_album_collection) <= 0:
return
type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, { type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, {
AlbumType.OTHER: 0, # if I don't know it, I add it to the first section AlbumType.OTHER: 0, # if I don't know it, I add it to the first section
AlbumType.STUDIO_ALBUM: 0, AlbumType.STUDIO_ALBUM: 0,
@ -580,7 +599,7 @@ class Artist(Base):
album_list.extend(sections[section_index]) album_list.extend(sections[section_index])
# replace the old collection with the new one # replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album) self.main_album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property @property

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import List, Tuple, TextIO, Union from typing import List, Tuple, TextIO, Union, Optional
import logging import logging
import random import random
import requests import requests
@ -31,7 +31,10 @@ class Target(OuterProxy):
} }
@classmethod @classmethod
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID))) -> P: def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
if file_extension is not None:
name = f"{name}.{file_extension}"
return cls(main_settings["temp_directory"] / name) return cls(main_settings["temp_directory"] / name)
# This is automatically generated # This is automatically generated

View File

@ -361,6 +361,7 @@ class Page:
return download_result return download_result
def _download_song(self, song: Song, naming_dict: NamingDict): def _download_song(self, song: Song, naming_dict: NamingDict):
song.compile()
if "genre" not in naming_dict and song.genre is not None: if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre naming_dict["genre"] = song.genre
@ -381,7 +382,7 @@ class Page:
song.target_collection.append(new_target) song.target_collection.append(new_target)
r = DownloadResult(1) r = DownloadResult(1)
temp_target: Target = Target.temp() temp_target: Target = Target.temp(file_extension=main_settings["audio_format"])
found_on_disc = False found_on_disc = False
target: Target target: Target

View File

@ -145,6 +145,8 @@ class SuperYouTube(Page):
_sponsorblock_connection: Connection = Connection() _sponsorblock_connection: Connection = Connection()
self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session) self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = { _url_type = {
YouTubeUrlType.CHANNEL: Artist, YouTubeUrlType.CHANNEL: Artist,

View File

@ -193,7 +193,6 @@ class YoutubeMusic(SuperYouTube):
self.start_millis = get_current_millis() self.start_millis = get_current_millis()
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
self._fetch_from_main_page() self._fetch_from_main_page()
SuperYouTube.__init__(self, *args, **kwargs) SuperYouTube.__init__(self, *args, **kwargs)
@ -215,6 +214,8 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url: dict = {} self.download_values_by_url: dict = {}
self.not_download: Dict[str, DownloadError] = {} self.not_download: Dict[str, DownloadError] = {}
super().__init__(*args, **kwargs)
def _fetch_from_main_page(self): def _fetch_from_main_page(self):
""" """
===API=KEY=== ===API=KEY===
@ -736,8 +737,9 @@ class YoutubeMusic(SuperYouTube):
raw_headers=True, raw_headers=True,
disable_cache=True, disable_cache=True,
headers=media.get("headers", {}), headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]), chunk_size=main_settings["chunk_size"],
method="GET", method="GET",
timeout=5,
) )
else: else:
result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url])) result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url]))

View File

@ -69,7 +69,7 @@ dependencies = [
"toml~=0.10.2", "toml~=0.10.2",
"typing_extensions~=4.7.1", "typing_extensions~=4.7.1",
"python-sponsorblock~=0.1.dev1", "python-sponsorblock~=0.1",
"youtube_dl", "youtube_dl",
] ]
dynamic = [ dynamic = [