feat: implemented cover artwork taggin

This commit is contained in:
Hazel 2024-04-10 18:18:52 +02:00
parent 28ad5311f2
commit 0c367884e3
11 changed files with 73 additions and 28 deletions

View File

@ -16,9 +16,11 @@
}, },
"python.formatting.provider": "none", "python.formatting.provider": "none",
"cSpell.words": [ "cSpell.words": [
"APIC",
"Bandcamp", "Bandcamp",
"dotenv", "dotenv",
"levenshtein", "levenshtein",
"OKBLUE" "OKBLUE",
"Referer"
] ]
} }

View File

@ -1,16 +1,20 @@
import mutagen import mutagen
from mutagen.id3 import ID3, Frame from mutagen.id3 import ID3, Frame, APIC
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import logging import logging
from PIL import Image
from ..utils.config import logging_settings from ..utils.config import logging_settings
from ..objects import Song, Target, Metadata from ..objects import Song, Target, Metadata
from ..connection import Connection
LOGGER = logging_settings["tagging_logger"] LOGGER = logging_settings["tagging_logger"]
artwork_connection: Connection = Connection()
class AudioMetadata: class AudioMetadata:
def __init__(self, file_location: str = None) -> None: def __init__(self, file_location: str = None) -> None:
self._file_location = None self._file_location = None
@ -52,11 +56,39 @@ class AudioMetadata:
file_location = property(fget=lambda self: self._file_location, fset=set_file_location) file_location = property(fget=lambda self: self._file_location, fset=set_file_location)
def write_metadata_to_target(metadata: Metadata, target: Target): def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
if not target.exists: if not target.exists:
LOGGER.warning(f"file {target.file_path} not found")
return return
id3_object = AudioMetadata(file_location=target.file_path) id3_object = AudioMetadata(file_location=target.file_path)
if song.artwork.best_variant is not None:
r = artwork_connection.get(
url=song.artwork.best_variant["url"],
disable_cache=False,
)
temp_target: Target = Target.temp()
with temp_target.open("wb") as f:
f.write(r.content)
converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
with Image.open(temp_target.file_path) as img:
img.save(converted_target.file_path, "JPEG")
id3_object.frames.add(
APIC(
encoding=3,
mime="image/jpeg",
type=3,
desc="Cover",
data=converted_target.read_bytes(),
)
)
mutagen_file = mutagen.File(target.file_path)
id3_object.add_metadata(metadata) id3_object.add_metadata(metadata)
id3_object.save() id3_object.save()
@ -70,19 +102,9 @@ def write_metadata(song: Song, ignore_file_not_found: bool = True):
else: else:
raise ValueError(f"{song.target.file} not found") raise ValueError(f"{song.target.file} not found")
id3_object = AudioMetadata(file_location=target.file_path) write_metadata_to_target(metadata=song.metadata, target=target, song=song)
id3_object.add_song_metadata(song=song)
id3_object.save()
def write_many_metadata(song_list: List[Song]): def write_many_metadata(song_list: List[Song]):
for song in song_list: for song in song_list:
write_metadata(song=song, ignore_file_not_found=True) write_metadata(song=song, ignore_file_not_found=True)
if __name__ == "__main__":
print("called directly")
filepath = "/home/lars/Music/deathcore/Archspire/Bleed the Future/Bleed the Future.mp3"
audio_metadata = AudioMetadata(file_location=filepath)
print(audio_metadata.frames.pprint())

View File

@ -23,7 +23,7 @@ from ..utils.hacking import merge_args
class Connection: class Connection:
def __init__( def __init__(
self, self,
host: str, host: str = None,
proxies: List[dict] = None, proxies: List[dict] = None,
tries: int = (len(main_settings["proxies"]) + 1) * main_settings["tries_per_proxy"], tries: int = (len(main_settings["proxies"]) + 1) * main_settings["tries_per_proxy"],
timeout: int = 7, timeout: int = 7,
@ -45,7 +45,7 @@ class Connection:
self.HEADER_VALUES = dict() if header_values is None else header_values self.HEADER_VALUES = dict() if header_values is None else header_values
self.LOGGER = logger self.LOGGER = logger
self.HOST = urlparse(host) self.HOST = host if host is None else urlparse(host)
self.TRIES = tries self.TRIES = tries
self.TIMEOUT = timeout self.TIMEOUT = timeout
self.rotating_proxy = RotatingProxy(proxy_list=proxies) self.rotating_proxy = RotatingProxy(proxy_list=proxies)
@ -87,22 +87,27 @@ class Connection:
time.sleep(interval) time.sleep(interval)
def base_url(self, url: ParseResult = None): def base_url(self, url: ParseResult = None):
if url is None: if url is None and self.HOST is not None:
url = self.HOST url = self.HOST
return urlunsplit((url.scheme, url.netloc, "", "", "")) return urlunsplit((url.scheme, url.netloc, "", "", ""))
def get_header(self, **header_values) -> Dict[str, str]: def get_header(self, **header_values) -> Dict[str, str]:
return { headers = {
"user-agent": main_settings["user_agent"], "user-agent": main_settings["user_agent"],
"User-Agent": main_settings["user_agent"], "User-Agent": main_settings["user_agent"],
"Connection": "keep-alive", "Connection": "keep-alive",
"Host": self.HOST.netloc,
"Referer": self.base_url(),
"Accept-Language": main_settings["language"], "Accept-Language": main_settings["language"],
**header_values
} }
if self.HOST is not None:
headers["Host"] = self.HOST.netloc
headers["Referer"] = self.base_url(url=self.HOST)
headers.update(header_values)
return headers
def rotate(self): def rotate(self):
self.session.proxies = self.rotating_proxy.rotate() self.session.proxies = self.rotating_proxy.rotate()

View File

@ -22,4 +22,6 @@ from .contact import Contact
from .parents import OuterProxy from .parents import OuterProxy
from .artwork import Artwork
DatabaseObject = TypeVar('T', bound=OuterProxy) DatabaseObject = TypeVar('T', bound=OuterProxy)

View File

@ -33,7 +33,7 @@ class Artwork:
def _calculate_deviation(*dimensions: List[int]) -> float: def _calculate_deviation(*dimensions: List[int]) -> float:
return sum(abs(d - main_settings["preferred_artwork_resolution"]) for d in dimensions) / len(dimensions) return sum(abs(d - main_settings["preferred_artwork_resolution"]) for d in dimensions) / len(dimensions)
def append(self, url: str, width: int, height: int) -> None: def append(self, url: str, width: int, height: int, **kwargs) -> None:
self._variant_mapping[hash_url(url=url)] = { self._variant_mapping[hash_url(url=url)] = {
"url": url, "url": url,
"width": width, "width": width,
@ -43,6 +43,8 @@ class Artwork:
@property @property
def best_variant(self) -> ArtworkVariant: def best_variant(self) -> ArtworkVariant:
if len(self._variant_mapping) == 0:
return None
return min(self._variant_mapping.values(), key=lambda x: x["deviation"]) return min(self._variant_mapping.values(), key=lambda x: x["deviation"])
def __merge__(self, other: Artwork, override: bool = False) -> None: def __merge__(self, other: Artwork, override: bool = False) -> None:

View File

@ -16,7 +16,7 @@ class FormattedText:
@property @property
def is_empty(self) -> bool: def is_empty(self) -> bool:
return self.doc is None return self.html == ""
def __eq__(self, other) -> False: def __eq__(self, other) -> False:
if type(other) != type(self): if type(other) != type(self):

View File

@ -3,11 +3,12 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import List, Tuple, TextIO, Union from typing import List, Tuple, TextIO, Union
import logging import logging
import random
import requests import requests
from tqdm import tqdm from tqdm import tqdm
from .parents import OuterProxy from .parents import OuterProxy
from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings from ..utils.config import main_settings, logging_settings
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
@ -29,6 +30,10 @@ class Target(OuterProxy):
_default_factories = { _default_factories = {
} }
@classmethod
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID))) -> P:
return cls(main_settings["temp_directory"] / name)
# This is automatically generated # This is automatically generated
def __init__(self, file_path: Union[Path, str], relative_to_music_dir: bool = False, **kwargs) -> None: def __init__(self, file_path: Union[Path, str], relative_to_music_dir: bool = False, **kwargs) -> None:
if not isinstance(file_path, Path): if not isinstance(file_path, Path):
@ -106,3 +111,6 @@ class Target(OuterProxy):
def delete(self): def delete(self):
self.file_path.unlink(missing_ok=True) self.file_path.unlink(missing_ok=True)
def read_bytes(self) -> bytes:
return self.file_path.read_bytes()

View File

@ -464,7 +464,7 @@ class Page:
self.post_process_hook(song, temp_target) self.post_process_hook(song, temp_target)
write_metadata_to_target(song.metadata, temp_target) write_metadata_to_target(song.metadata, temp_target, song)
r = DownloadResult() r = DownloadResult()

View File

@ -20,7 +20,7 @@ from ...utils import get_current_millis
from ...utils import dump_to_file from ...utils import dump_to_file
from ...objects import Source, DatabaseObject, ID3Timestamp from ...objects import Source, DatabaseObject, ID3Timestamp, Artwork
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
@ -501,6 +501,7 @@ class YoutubeMusic(SuperYouTube):
note=ydl_res.get("descriptions"), note=ydl_res.get("descriptions"),
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=[Artist( main_artist_list=[Artist(
name=artist_name, name=artist_name,
source_list=[Source( source_list=[Source(
@ -551,6 +552,7 @@ class YoutubeMusic(SuperYouTube):
return self.download_values_by_url[source.url] return self.download_values_by_url[source.url]
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
media = self.fetch_media_url(source) media = self.fetch_media_url(source)
@ -571,5 +573,6 @@ class YoutubeMusic(SuperYouTube):
return result return result
def __del__(self): def __del__(self):
self.ydl.__exit__() self.ydl.__exit__()

View File

@ -27,7 +27,7 @@ The further you choose to be able to go back, the higher the memory usage.
EmptyLine(), EmptyLine(),
Attribute(name="preferred_artwork_resolution", default_value=100), Attribute(name="preferred_artwork_resolution", default_value=1000),
EmptyLine(), EmptyLine(),

View File

@ -44,6 +44,7 @@ dependencies = [
"pyffmpeg~=2.4.2.18.1", "pyffmpeg~=2.4.2.18.1",
"ffmpeg-progress-yield~=0.7.8", "ffmpeg-progress-yield~=0.7.8",
"mutagen~=1.46.0", "mutagen~=1.46.0",
"pillow~=10.3.0",
"rich~=13.7.1", "rich~=13.7.1",
"mistune~=3.0.2", "mistune~=3.0.2",