58 Commits

Author SHA1 Message Date
810aff4163 Merge pull request 'feature/artwork_gallery' (#41) from feature/artwork_gallery into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #41
2024-07-15 09:36:21 +00:00
5ce76c758e feat: genius fixes and duplicate detection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-07-02 17:20:25 +02:00
93c9a367a2 feat: image hash implemented
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-07-01 14:59:51 +02:00
17c28722fb feat: musify ArtworkCollection simple function
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-17 14:50:17 +02:00
dd99e60afd fix: circular input
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-11 14:58:04 +02:00
274f1bce90 feat: implemented fetching of artworks on compile
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-11 14:54:36 +02:00
b1a306f3f3 fix: implemented artwork.add_data 2024-06-11 14:34:58 +02:00
4ee6fd2137 feat:a lot of nonsences
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-10 12:23:12 +02:00
2da7a48b72 feat: added compile
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:27:55 +02:00
346d273201 feat: added extend
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:17:47 +02:00
eef3ea7f07 feat: removed distracting code
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:15:23 +02:00
01dffc2443 Merge branch 'feature/artwork_gallery' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into feature/artwork_gallery
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-06 17:53:44 +02:00
4e50bb1fba draft implemented add_data 2024-06-06 17:53:17 +02:00
8e3ec0f4ed Merge branch 'feature/artwork_gallery' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into feature/artwork_gallery
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-05 13:45:25 +02:00
d447b10380 feat: youtube music album and artist artwork 2024-06-05 13:33:18 +02:00
df98a70717 feat: renamed artwork
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 12:05:38 +02:00
3118140f0f feat: fix saving img in tmp
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 09:47:02 +02:00
7d23ecac06 feat: bandcamp artist artwork
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 08:34:37 +02:00
d83e40ed83 feat: config changes
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 11:44:48 +02:00
d51e3a56fb feat: structure changes to artwork and collection objects
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-04 11:04:00 +02:00
05ee09e25f feat: musify completed
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:58:21 +02:00
1ef4b27f28 feat: added album.artwork to datastructure
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:31:23 +02:00
eb8fd5e580 feat: added artist.artwork to data structure 2024-06-04 10:13:34 +02:00
49c3734526 feat: added hooks for collection on append
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:11:46 +02:00
bc19a94e7f feat: added parent artwork options 2024-06-04 10:09:17 +02:00
5d26fdbf94 Artwork gallery Musify
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 07:58:18 +02:00
465af49057 hotfix
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-03 10:19:32 +02:00
2aa0f02fa5 Merge branch 'adding_genius' into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 13:36:10 +02:00
7b0b830d64 feat: removed legacy key
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2024-05-23 13:24:25 +02:00
1ba6c97f5a feat: more extensive browse id 2024-05-23 13:20:34 +02:00
c8cbfc7cb9 feat: improved output of clearing the cache 2024-05-23 13:17:14 +02:00
344da0a0bf fix: converting pictures to rgb before saving
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:20:26 +02:00
49dc7093c8 fix: genius fallback
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:18:43 +02:00
90f70638b4 feat: better lyrics support
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:55:08 +02:00
7b4eee858a feat: parsed script json
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:14:58 +02:00
f61b34dd40 feat: improved feature artists by also adding writer and producer to it
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:52:01 +02:00
688b4fd357 feat: getting the album tracklist
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:47:38 +02:00
769d27dc5c feat: album details
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:43:52 +02:00
f5d953d9ce feat: theoretically fetching feature songs
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:34:04 +02:00
46b64b8f8d feat: fetched the flat artist details 2024-05-21 16:23:05 +02:00
adfce16d2a feat: fetched the flat artist details 2024-05-21 16:21:58 +02:00
e4fd9faf12 feat: detecting url type 2024-05-21 15:57:09 +02:00
f6caee41a8 feat: finished searching genious
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 15:52:41 +02:00
068c749c38 feat: implemented artist search 2024-05-21 15:27:10 +02:00
c131924577 Merge pull request 'fix/clean_feature_artists' (#38) from fix/clean_feature_artists into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Reviewed-on: #38
2024-05-21 12:08:04 +00:00
8cdb5c1f99 fix: tests were a mess and didn't properly test the functionality but random things that worked with implementation
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-21 13:52:20 +02:00
356ba658ce fix: lyrics enpoint could crash the whole program 2024-05-21 13:37:26 +02:00
000a6c0dba feat: added type identifier to option strings 2024-05-17 18:24:56 +02:00
83a3334f1a changed default value
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:55:58 +02:00
ab61ff7e9b feat: added the option to select all at options at the same time
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:37:49 +02:00
3cb35909d1 feat: added option to just fetch the objects in select 2024-05-17 13:31:46 +02:00
e87075a809 feat: changed syncing from event based to reference
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:39:12 +02:00
86e985acec fix: files don't contain id anymore
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:30:53 +02:00
a70a24d93e feat: minor adjustments
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:14:18 +02:00
2c1ac0f12d fix: wrong collection 2024-05-16 17:09:36 +02:00
897897dba2 feat: added recursive structure 2024-05-16 14:29:50 +02:00
adcf26b518 feat: renamed main album collection to album collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:10:00 +02:00
8ccc28daf8 draft: added feature artist collection attr 2024-05-16 14:09:34 +02:00
31 changed files with 1257 additions and 456 deletions

View File

@@ -20,6 +20,7 @@
"APIC", "APIC",
"Bandcamp", "Bandcamp",
"bitrate", "bitrate",
"CALLSTACK",
"DEEZER", "DEEZER",
"dotenv", "dotenv",
"encyclopaedia", "encyclopaedia",
@@ -27,6 +28,7 @@
"Gitea", "Gitea",
"iframe", "iframe",
"isrc", "isrc",
"itemprop",
"levenshtein", "levenshtein",
"metallum", "metallum",
"MUSICBRAINZ", "MUSICBRAINZ",

View File

@@ -1,14 +1,13 @@
import logging
import music_kraken import music_kraken
import logging
print("Setting logging-level to DEBUG") print("Setting logging-level to DEBUG")
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a I'm in a coffin", "s: #a Ghost Bath",
"0",
"d: 0",
] ]

View File

@@ -13,7 +13,7 @@ if __name__ == "__main__":
song_2 = Song( song_2 = Song(
title = "song", title = "song",
main_artist_list=[other_artist] artist_list=[other_artist]
) )
other_artist.name = "main_artist" other_artist.name = "main_artist"

View File

@@ -1,21 +1,21 @@
import mutagen import logging
from mutagen.id3 import ID3, Frame, APIC, USLT
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import logging
import mutagen
from mutagen.id3 import APIC, ID3, USLT, Frame
from PIL import Image from PIL import Image
from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata
from ..objects.metadata import Mapping
from ..connection import Connection from ..connection import Connection
from ..objects import Metadata, Song, Target
from ..objects.metadata import Mapping
from ..utils.config import logging_settings, main_settings
LOGGER = logging_settings["tagging_logger"] LOGGER = logging_settings["tagging_logger"]
artwork_connection: Connection = Connection() artwork_connection: Connection = Connection()
class AudioMetadata: class AudioMetadata:
def __init__(self, file_location: str = None) -> None: def __init__(self, file_location: str = None) -> None:
self._file_location = None self._file_location = None
@@ -67,13 +67,14 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
id3_object = AudioMetadata(file_location=target.file_path) id3_object = AudioMetadata(file_location=target.file_path)
LOGGER.info(str(metadata)) LOGGER.info(str(metadata))
## REWRITE COMPLETLY !!!!!!!!!!!!
if song.artwork.best_variant is not None: if len(song.artwork._data) != 0:
best_variant = song.artwork.best_variant variants = song.artwork._data.__getitem__(0)
best_variant = variants.variants.__getitem__(0)
r = artwork_connection.get( r = artwork_connection.get(
url=best_variant["url"], url=best_variant.url,
name=song.artwork.get_variant_name(best_variant), name=best_variant.url,
) )
temp_target: Target = Target.temp() temp_target: Target = Target.temp()
@@ -93,6 +94,10 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
# resize the image to the preferred resolution # resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"])) img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(converted_target.file_path, "JPEG") img.save(converted_target.file_path, "JPEG")
# https://stackoverflow.com/questions/70228440/mutagen-how-can-i-correctly-embed-album-art-into-mp3-file-so-that-i-can-see-t # https://stackoverflow.com/questions/70228440/mutagen-how-can-i-correctly-embed-album-art-into-mp3-file-so-that-i-can-see-t
@@ -103,7 +108,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
mime="image/jpeg", mime="image/jpeg",
type=3, type=3,
desc=u"Cover", desc=u"Cover",
data=converted_target.read_bytes(), data=converted_target.raw_content,
) )
) )
id3_object.frames.delall("USLT") id3_object.frames.delall("USLT")

View File

@@ -354,37 +354,41 @@ class Downloader:
command, query = _[0], ":".join(_[1:]) command, query = _[0], ":".join(_[1:])
do_search = "s" in command do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command do_download = "d" in command
do_merge = "m" in command do_merge = "m" in command
if do_search and do_download: if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and download at the same time.") raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search: if do_search:
self.search(":".join(input_str.split(":")[1:])) self.search(":".join(input_str.split(":")[1:]))
return False return False
indices = [] def get_selected_objects(q: str):
for possible_index in query.split(","): if q.strip().lower() == "all":
possible_index = possible_index.strip() return list(self.current_results)
if possible_index == "":
continue
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
if i < 0 or i >= len(self.current_results): indices = []
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.") for possible_index in q.split(","):
possible_index = possible_index.strip()
indices.append(i) if possible_index == "":
continue
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
selected_objects = [self.current_results[i] for i in indices] if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge: if do_merge:
old_selected_objects = selected_objects old_selected_objects = selected_objects
@@ -397,6 +401,13 @@ class Downloader:
selected_objects = [a] selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.print_current_options()
return False
if do_download: if do_download:
self.download(selected_objects) self.download(selected_objects)
return False return False

View File

@@ -6,6 +6,7 @@ from typing import List, Optional
from functools import lru_cache from functools import lru_cache
import logging import logging
from ..utils import output, BColors
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
@@ -136,13 +137,13 @@ class Cache:
) )
self._write_attribute(cache_attribute) self._write_attribute(cache_attribute)
cache_path = fit_to_file_system(Path(module_path, name), hidden_ok=True) cache_path = fit_to_file_system(Path(module_path, name.replace("/", "_")), hidden_ok=True)
with cache_path.open("wb") as content_file: with cache_path.open("wb") as content_file:
self.logger.debug(f"writing cache to {cache_path}") self.logger.debug(f"writing cache to {cache_path}")
content_file.write(content) content_file.write(content)
def get(self, name: str) -> Optional[CacheResult]: def get(self, name: str) -> Optional[CacheResult]:
path = fit_to_file_system(Path(self._dir, self.module, name), hidden_ok=True) path = fit_to_file_system(Path(self._dir, self.module, name.replace("/", "_")), hidden_ok=True)
if not path.is_file(): if not path.is_file():
return None return None
@@ -165,7 +166,7 @@ class Cache:
if ca.name == "": if ca.name == "":
continue continue
file = fit_to_file_system(Path(self._dir, ca.module, ca.name), hidden_ok=True) file = fit_to_file_system(Path(self._dir, ca.module, ca.name.replace("/", "_")), hidden_ok=True)
if not ca.is_valid: if not ca.is_valid:
self.logger.debug(f"deleting cache {ca.id}") self.logger.debug(f"deleting cache {ca.id}")
@@ -204,9 +205,12 @@ class Cache:
for path in self._dir.iterdir(): for path in self._dir.iterdir():
if path.is_dir(): if path.is_dir():
for file in path.iterdir(): for file in path.iterdir():
output(f"Deleting file {file}", color=BColors.GREY)
file.unlink() file.unlink()
output(f"Deleting folder {path}", color=BColors.HEADER)
path.rmdir() path.rmdir()
else: else:
output(f"Deleting folder {path}", color=BColors.HEADER)
path.unlink() path.unlink()
self.cached_attributes.clear() self.cached_attributes.clear()

View File

@@ -1,12 +1,12 @@
from __future__ import annotations from __future__ import annotations
import copy
import inspect
import logging import logging
import threading import threading
import time import time
from typing import List, Dict, Optional, Set from typing import TYPE_CHECKING, Dict, List, Optional, Set
from urllib.parse import urlparse, urlunsplit, ParseResult from urllib.parse import ParseResult, urlparse, urlunsplit
import copy
import inspect
import requests import requests
import responses import responses
@@ -14,12 +14,15 @@ from tqdm import tqdm
from .cache import Cache from .cache import Cache
from .rotating import RotatingProxy from .rotating import RotatingProxy
from ..objects import Target
if TYPE_CHECKING:
from ..objects import Target
from ..utils import request_trace from ..utils import request_trace
from ..utils.string_processing import shorten_display_url
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.support_classes.download_result import DownloadResult
from ..utils.hacking import merge_args from ..utils.hacking import merge_args
from ..utils.string_processing import shorten_display_url
from ..utils.support_classes.download_result import DownloadResult
class Connection: class Connection:

View File

@@ -3,6 +3,9 @@ from collections import defaultdict
from pathlib import Path from pathlib import Path
import re import re
import logging import logging
import subprocess
from PIL import Image
from . import FetchOptions, DownloadOptions from . import FetchOptions, DownloadOptions
from .results import SearchResults from .results import SearchResults
@@ -17,6 +20,7 @@ from ..objects import (
Artist, Artist,
Label, Label,
) )
from ..objects.artwork import ArtworkVariant
from ..audio import write_metadata_to_target, correct_codec from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
@@ -29,12 +33,13 @@ from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES from ..utils.shared import DEBUG_PAGES
from ..connection import Connection
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, INDEPENDENT_DB_OBJECTS from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = { ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum, # EncyclopaediaMetallum,
Genius,
Musify, Musify,
YoutubeMusic, YoutubeMusic,
Bandcamp Bandcamp
@@ -72,33 +77,37 @@ if DEBUG_PAGES:
class Pages: class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download") self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions() self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances # initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict() self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourceType, Type[Page]] = dict() self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set() exclude_pages = exclude_pages if exclude_pages is not None else set()
if exclude_shady: if exclude_shady:
exclude_pages = exclude_pages.union(SHADY_PAGES) exclude_pages = exclude_pages.union(SHADY_PAGES)
if not exclude_pages.issubset(ALL_PAGES): if not exclude_pages.issubset(ALL_PAGES):
raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}") raise ValueError(
f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]: def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
return tuple(sorted(page_set, key=lambda page: page.__name__)) return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages) self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set) self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES) self._audio_pages_set: Set[Type[Page]
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) ] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(
self._audio_pages_set)
for page_type in self.pages: for page_type in self.pages:
self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options) self._page_instances[page_type] = page_type(
fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page: def _get_page_from_enum(self, source_page: SourceType) -> Page:
@@ -108,24 +117,26 @@ class Pages:
def search(self, query: Query) -> SearchResults: def search(self, query: Query) -> SearchResults:
result = SearchResults() result = SearchResults()
for page_type in self.pages: for page_type in self.pages:
result.add( result.add(
page=page_type, page=page_type,
search_result=self._page_instances[page_type].search(query=query) search_result=self._page_instances[page_type].search(
query=query)
) )
return result return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject: def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS): if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return data_object return data_object
source: Source source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={ for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True, "only_with_page": True,
}): }):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) new_data_object = self.fetch_from_source(
source=source, stop_at_level=stop_at_level)
if new_data_object is not None: if new_data_object is not None:
data_object.merge(new_data_object) data_object.merge(new_data_object)
@@ -134,14 +145,14 @@ class Pages:
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page: if not source.has_page:
return None return None
source_type = source.page.get_source_type(source=source) source_type = source.page.get_source_type(source=source)
if source_type is None: if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.") self.LOGGER.debug(f"Could not determine source type for {source}.")
return None return None
func = getattr(source.page, fetch_map[source_type]) func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched # fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs) data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url) data_object.mark_as_fetched(source.hash_url)
@@ -151,21 +162,49 @@ class Pages:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None: if source is None:
return None return None
return self.fetch_from_source(source=source) return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool: def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album): if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True return True
return False return False
def _fetch_artist_artwork(self, artist: Artist, naming: dict):
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["artist"].append(artist.name)
naming["label"].extend(
[l.title_value for l in artist.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
artwork_collection: ArtworkCollection = artist.artwork
artwork_collection.compile()
for image_number, artwork in enumerate(artwork_collection):
for artwork_variant in artwork.variants:
naming["image_number"] = [str(image_number)]
target = Target(
relative_to_music_dir=True,
file_path=Path(self._parse_path_template(
main_settings["artist_artwork_path"], naming=naming))
)
if not target.file_path.parent.exists():
target.create_path()
subprocess.Popen(["gio", "set", target.file_path.parent, "metadata::custom-icon", "file://"+str(target.file_path)])
with Image.open(artwork_variant.target.file_path) as img:
img.save(target.file_path, main_settings["image_format"])
artwork_variant.target = Target
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object # fetch the given object
self.fetch_details(data_object) self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD) output(
f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song) # fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False): if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object] to_fetch: List[DataObject] = [data_object]
@@ -182,9 +221,19 @@ class Pages:
new_to_fetch.extend(c) new_to_fetch.extend(c)
to_fetch = new_to_fetch to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True kwargs["fetched_upwards"] = True
naming = kwargs.get("naming", {
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
"image_format": [main_settings["image_format"]]
})
# download artist artwork
if isinstance(data_object, Artist):
self._fetch_artist_artwork(artist=data_object, naming=naming)
# download all children # download all children
download_result: DownloadResult = DownloadResult() download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections(): for c in data_object.get_child_collections():
@@ -202,10 +251,7 @@ class Pages:
I am able to do that, because duplicate values are removed later on. I am able to do that, because duplicate values are removed later on.
""" """
self._download_song(data_object, naming={ self._download_song(data_object, naming=naming)
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result return download_result
@@ -213,13 +259,15 @@ class Pages:
return set(re.findall(r"{([^}]+)}", path_template)) return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str: def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template) field_names: Set[str] = self._extract_fields_from_template(
path_template)
for field in field_names: for field in field_names:
if len(naming[field]) == 0: if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.") raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0]) path_template = path_template.replace(
f"{{{field}}}", naming[field][0])
return path_template return path_template
@@ -229,20 +277,21 @@ class Pages:
Search the song in the file system. Search the song in the file system.
""" """
r = DownloadResult(total=1) r = DownloadResult(total=1)
# pre process the data recursively # pre process the data recursively
song.compile() song.compile()
# manage the naming # manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming) naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_string) naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc) naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_string for a in song.album_collection) naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection) naming["album_type"].extend(
a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection) naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection) naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection: for a in song.album_collection:
naming["label"].extend([l.title_string for l in a.label_collection]) naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings # removing duplicates from the naming, and process the strings
for key, value in naming.items(): for key, value in naming.items():
# https://stackoverflow.com/a/17016257 # https://stackoverflow.com/a/17016257
@@ -255,13 +304,16 @@ class Pages:
song.target_collection.append(Target( song.target_collection.append(Target(
relative_to_music_dir=True, relative_to_music_dir=True,
file_path=Path( file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming), self._parse_path_template(
self._parse_path_template(main_settings["download_file"], naming=naming), main_settings["download_path"], naming=naming),
self._parse_path_template(
main_settings["download_file"], naming=naming),
) )
)) ))
for target in song.target_collection: for target in song.target_collection:
if target.exists: if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) output(
f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1 r.found_on_disk += 1
if not self.download_options.download_again_if_found: if not self.download_options.download_again_if_found:
@@ -282,8 +334,10 @@ class Pages:
break break
used_source = source used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") streaming_results = source.page.download_song_to_target(
skip_intervals = source.page.get_skip_intervals(song=song, source=source) source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(
song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file # if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists: if streaming_results.is_fatal_error and tmp.exists:
@@ -307,7 +361,8 @@ class Pages:
used_source.page.post_process_hook(song=song, temp_target=tmp) used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found: if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) write_metadata_to_target(
metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations # copy the tmp target to the final locations
for target in song.target_collection: for target in song.target_collection:
@@ -318,10 +373,10 @@ class Pages:
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL) source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None: if source is None:
raise UrlNotFoundException(url=url) raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.source_type] _actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@@ -1,27 +1,16 @@
from typing_extensions import TypeVar from typing_extensions import TypeVar
from .artwork import ArtworkCollection
from .collection import Collection
from .contact import Contact
from .country import Country
from .formatted_text import FormattedText
from .metadata import ID3Timestamp
from .metadata import Mapping as ID3Mapping
from .metadata import Metadata
from .option import Options from .option import Options
from .parents import OuterProxy
from .metadata import Metadata, Mapping as ID3Mapping, ID3Timestamp from .song import Album, Artist, Label, Lyrics, Song, Target
from .source import Source, SourceType from .source import Source, SourceType
from .song import (
Song,
Album,
Artist,
Target,
Lyrics,
Label
)
from .formatted_text import FormattedText
from .collection import Collection
from .country import Country
from .contact import Contact
from .parents import OuterProxy
from .artwork import Artwork
DatabaseObject = OuterProxy DatabaseObject = OuterProxy

View File

@@ -1,62 +1,243 @@
from __future__ import annotations from __future__ import annotations
from typing import List, Optional, Dict, Tuple, Type, Union, TypedDict from copy import copy
from dataclasses import dataclass, field
from .collection import Collection from functools import cached_property
from .metadata import ( from typing import Dict, List, Optional, Set, Tuple, Type, TypedDict, Union
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from ..utils.string_processing import unify, hash_url
from .parents import OuterProxy as Base
from ..connection import Connection
from ..utils import create_dataclass_instance, custom_hash
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.enums import PictureType
from ..utils.string_processing import hash_url, unify
from .collection import Collection
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .parents import OuterProxy as Base
from .target import Target
from PIL import Image
import imagehash
artwork_connection: Connection = Connection(module="artwork")
class ArtworkVariant(TypedDict): @dataclass
class ArtworkVariant:
url: str url: str
width: int width: Optional[int] = None
height: int heigth: Optional[int] = None
deviation: float image_format: Optional[str] = None
def __hash__(self) -> int:
return custom_hash(self.url)
class Artwork: def __eq__(self, other: ArtworkVariant) -> bool:
def __init__(self, *variants: List[ArtworkVariant]) -> None: return hash(self) == hash(other)
self._variant_mapping: Dict[str, ArtworkVariant] = {}
for variant in variants: def __contains__(self, other: str) -> bool:
self.append(**variant) return custom_hash(other) == hash(self.url)
@staticmethod def __merge__(self, other: ArtworkVariant) -> None:
def _calculate_deviation(*dimensions: List[int]) -> float: for key, value in other.__dict__.items():
return sum(abs(d - main_settings["preferred_artwork_resolution"]) for d in dimensions) / len(dimensions) if value is None:
continue
def append(self, url: str, width: int = main_settings["preferred_artwork_resolution"], height: int = main_settings["preferred_artwork_resolution"], **kwargs) -> None: if getattr(self, key) is None:
if url is None: setattr(self, key, value)
@cached_property
def target(self) -> Target:
return Target.temp()
def fetch(self) -> None:
global artwork_connection
r = artwork_connection.get(self.url, name=hash_url(self.url))
if r is None:
return return
self._variant_mapping[hash_url(url=url)] = { self.target.raw_content = r.content
"url": url,
"width": width, @dataclass
"height": height, class Artwork:
"deviation": self._calculate_deviation(width, height), variants: List[ArtworkVariant] = field(default_factory=list)
}
artwork_type: PictureType = PictureType.OTHER
def search_variant(self, url: str) -> Optional[ArtworkVariant]:
if url is None:
return None
for variant in self.variants:
if url in variant:
return variant
return None
def __contains__(self, other: str) -> bool:
return self.search_variant(other) is not None
def add_data(self, **kwargs) -> None:
variant = self.search_variant(kwargs.get("url"))
if variant is None:
variant, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
self.variants.append(variant)
variant.__dict__.update(kwargs)
@property @property
def best_variant(self) -> ArtworkVariant: def url(self) -> Optional[str]:
if len(self._variant_mapping.keys()) <= 0: if len(self.variants) <= 0:
return None return None
return min(self._variant_mapping.values(), key=lambda x: x["deviation"]) return self.variants[0].url
def get_variant_name(self, variant: ArtworkVariant) -> str: def fetch(self) -> None:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}" for variant in self.variants:
variant.fetch()
def __merge__(self, other: Artwork, **kwargs) -> None:
for key, value in other._variant_mapping.items():
if key not in self._variant_mapping:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool: class ArtworkCollection:
return any(a == b for a, b in zip(self._variant_mapping.keys(), other._variant_mapping.keys())) """
Stores all the images/artworks for one data object.
There could be duplicates before calling ArtworkCollection.compile()
_this is called before one object is downloaded automatically._
"""
artwork_type: PictureType = PictureType.OTHER
def __init__(
self,
*data: List[Artwork],
parent_artworks: Set[ArtworkCollection] = None,
crop_images: bool = True,
) -> None:
# this is used for the song artwork, to fall back to the song artwork
self.parent_artworks: Set[ArtworkCollection] = parent_artworks or set()
self.crop_images: bool = crop_images
self._data = []
self.extend(data)
def search_artwork(self, url: str) -> Optional[ArtworkVariant]:
for artwork in self._data:
if url in artwork:
return artwork
return None
def __contains__(self, other: str) -> bool:
return self.search_artwork(other) is not None
def _create_new_artwork(self, **kwargs) -> Tuple[Artwork, dict]:
kwargs["artwork_type"] = kwargs.get("artwork_type", self.artwork_type)
return create_dataclass_instance(Artwork, dict(**kwargs))
def add_data(self, url: str, **kwargs) -> Artwork:
kwargs["url"] = url
artwork = self.search_artwork(url)
if artwork is None:
artwork, kwargs = self._create_new_artwork(**kwargs)
self._data.append(artwork)
artwork.add_data(**kwargs)
return artwork
def append(self, value: Union[Artwork, ArtworkVariant, dict], **kwargs):
"""
You can append the types Artwork, ArtworkVariant or dict
the best option would be to use Artwork and avoid the other options.
"""
if isinstance(value, dict):
kwargs.update(value)
value, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
if isinstance(value, ArtworkVariant):
kwargs["variants"] = [value]
value, kwargs = create_dataclass_instance(Artwork, kwargs)
if isinstance(value, Artwork):
self._data.append(value)
return
def extend(self, values: List[Union[Artwork, ArtworkVariant, dict]], **kwargs):
for value in values:
self.append(value, **kwargs)
def compile(self, **kwargs) -> None:
"""
This will make the artworks ready for download and delete duplicates.
"""
artwork_hashes: list = list()
artwork_urls: list = list()
for artwork in self._data:
index = 0
for artwork_variant in artwork.variants:
r = artwork_connection.get(
url=artwork_variant.url,
name=artwork_variant.url,
)
if artwork_variant.url in artwork_urls:
artwork.variants.pop(index)
continue
artwork_urls.append(artwork_variant.url)
target: Target = artwork_variant.target
with target.open("wb") as f:
f.write(r.content)
with Image.open(target.file_path) as img:
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
try:
image_hash = imagehash.crop_resistant_hash(img)
except Exception as e:
continue
if image_hash in artwork_hashes:
artwork.variants.pop(index)
target.delete()
continue
artwork_hashes.append(image_hash)
width, height = img.size
if width != height:
if width > height:
img = img.crop((width // 2 - height // 2, 0, width // 2 + height // 2, height))
else:
img = img.crop((0, height // 2 - width // 2, width, height // 2 + width // 2))
# resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
index =+ 1
def __merge__(self, other: ArtworkCollection, **kwargs) -> None:
self.parent_artworks.update(other.parent_artworks)
for other_artwork in other._data:
for other_variant in other_artwork.variants:
if self.__contains__(other_variant.url):
continue
self.append(ArtworkVariant(other_variant.url))
def __hash__(self) -> int:
return id(self)
def __iter__(self) -> Generator[Artwork, None, None]:
yield from self._data
def get_urls(self) -> Generator[str, None, None]:
yield from (artwork.url for artwork in self._data if artwork.url is not None)

View File

@@ -1,16 +1,43 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
import copy import copy
from collections import defaultdict
from dataclasses import dataclass
from typing import (Any, Callable, Dict, Generator, Generic, Iterable,
Iterator, List, Optional, Set, Tuple, TypeVar, Union)
from .parents import OuterProxy from ..utils import BColors, object_trace, output
from ..utils import object_trace from .parents import InnerData, OuterProxy
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy) T = TypeVar('T', bound=OuterProxy)
@dataclass
class AppendHookArguments:
"""
This class is used to store the arguments for the append hook.
The best explanation is with an examples:
```
album = Album()
song = Song()
album.song_collection.append(song)
```
In this case, the append hook is triggered with the following arguments:
```
AppendHookArguments(
collection=album.song_collection,
new_object=song,
collection_root_objects=[album]
)
```
"""
collection: Collection
new_object: T
collection_root_objects: Set[InnerData]
class Collection(Generic[T]): class Collection(Generic[T]):
__is_collection__ = True __is_collection__ = True
@@ -27,6 +54,7 @@ class Collection(Generic[T]):
sync_on_append: Dict[str, Collection] = None, sync_on_append: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None, append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None, extend_object_to_attribute: Dict[str, Collection] = None,
append_callbacks: Set[Callable[[AppendHookArguments], None]] = None,
) -> None: ) -> None:
self._collection_for: dict = dict() self._collection_for: dict = dict()
@@ -41,6 +69,7 @@ class Collection(Generic[T]):
self.sync_on_append: Dict[str, Collection] = sync_on_append or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = [] self.pull_from: List[Collection] = []
self.push_to: List[Collection] = [] self.push_to: List[Collection] = []
self.append_callbacks: Set[Callable[[AppendHookArguments], None]] = append_callbacks or set()
# This is to cleanly unmap previously mapped items by their id # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
@@ -115,13 +144,6 @@ class Collection(Generic[T]):
self._data.append(other) self._data.append(other)
other._inner._is_in_collection.add(self) other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items(): for attribute, a in self.sync_on_append.items():
# syncing two collections by reference # syncing two collections by reference
b = other.__getattribute__(attribute) b = other.__getattribute__(attribute)
@@ -141,6 +163,21 @@ class Collection(Generic[T]):
a.extend(b_data, **kwargs) a.extend(b_data, **kwargs)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
append_hook_args = AppendHookArguments(
collection=self,
new_object=other,
collection_root_objects=self._collection_for.keys(),
)
for callback in self.append_callbacks:
callback(append_hook_args)
def append(self, other: Optional[T], **kwargs): def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
@@ -160,6 +197,7 @@ class Collection(Generic[T]):
object_trace(f"Appending {other.option_string} to {self}") object_trace(f"Appending {other.option_string} to {self}")
# switching collection in the case of push to # switching collection in the case of push to
for c in self.push_to: for c in self.push_to:
r = c._find_object(other) r = c._find_object(other)

View File

@@ -37,11 +37,19 @@ class FormattedText:
@property @property
def markdown(self) -> str: def markdown(self) -> str:
return md(self.html).strip() return md(self.html).strip()
@markdown.setter
def markdown(self, value: str) -> None:
self.html = mistune.markdown(value)
@property @property
def plain(self) -> str: def plain(self) -> str:
md = self.markdown md = self.markdown
return md.replace("\n\n", "\n") return md.replace("\n\n", "\n")
@plain.setter
def plain(self, value: str) -> None:
self.html = mistune.markdown(plain_to_markdown(value))
def __str__(self) -> str: def __str__(self) -> str:
return self.markdown return self.markdown

View File

@@ -339,6 +339,10 @@ class OuterProxy:
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
@property
def title_value(self) -> str:
return str(self.__getattribute__(self.TITEL))
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"

View File

@@ -1,35 +1,32 @@
from __future__ import annotations from __future__ import annotations
import copy
import random import random
from collections import defaultdict from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union from typing import Dict, List, Optional, Tuple, Type, Union
import copy
import pycountry import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus from ..utils.config import main_settings
from .collection import Collection from ..utils.enums.album import AlbumStatus, AlbumType
from .formatted_text import FormattedText from ..utils.enums.colors import BColors
from .lyrics import Lyrics
from .contact import Contact
from .artwork import Artwork
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from .option import Options
from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .artwork import ArtworkCollection
from .collection import AppendHookArguments, Collection
from .contact import Contact
from .country import Country, Language
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
from .parents import P
from ..utils.config import main_settings from .source import Source, SourceCollection
from ..utils.enums.colors import BColors from .target import Target
""" """
All Objects dependent All Objects dependent
@@ -89,7 +86,7 @@ class Song(Base):
genre: str genre: str
note: FormattedText note: FormattedText
tracksort: int tracksort: int
artwork: Artwork artwork: ArtworkCollection
source_collection: SourceCollection source_collection: SourceCollection
target_collection: Collection[Target] target_collection: Collection[Target]
@@ -105,11 +102,11 @@ class Song(Base):
"source_collection": SourceCollection, "source_collection": SourceCollection,
"target_collection": Collection, "target_collection": Collection,
"lyrics_collection": Collection, "lyrics_collection": Collection,
"artwork": Artwork, "artwork": ArtworkCollection,
"album_collection": Collection,
"artist_collection": Collection, "artist_collection": Collection,
"feature_artist_collection": Collection, "feature_artist_collection": Collection,
"album_collection": Collection,
"title": lambda: None, "title": lambda: None,
"unified_title": lambda: None, "unified_title": lambda: None,
@@ -129,11 +126,11 @@ class Song(Base):
source_list: List[Source] = None, source_list: List[Source] = None,
target_list: List[Target] = None, target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None, lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None, artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, album_list: List[Album] = None,
tracksort: int = 0, tracksort: int = 0,
artwork: Optional[Artwork] = None, artwork: Optional[ArtworkCollection] = None,
**kwargs **kwargs
) -> None: ) -> None:
real_kwargs = copy.copy(locals()) real_kwargs = copy.copy(locals())
@@ -144,7 +141,18 @@ class Song(Base):
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title" TITEL = "title"
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
album: Album = append_hook_arguments.new_object
song: Song
for song in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(album.artwork)
def __init_collections__(self) -> None: def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.sync_on_append = { self.album_collection.sync_on_append = {
"artist_collection": self.artist_collection, "artist_collection": self.artist_collection,
} }
@@ -153,11 +161,12 @@ class Song(Base):
"song_collection": self, "song_collection": self,
} }
self.artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection "album_collection": self.album_collection
} }
self.feature_artist_collection.extend_object_to_attribute = {
self.feature_artist_collection.push_to = [self.artist_collection] "album_collection": self.album_collection
self.artist_collection.pull_from = [self.feature_artist_collection] }
self.album_collection.append_callbacks = set((Song.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
@@ -175,6 +184,10 @@ class Song(Base):
self.album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
def _compile(self):
self.artwork.compile()
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection") INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property @property
@@ -216,10 +229,11 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "song "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}") r += get_collection_string(self.feature_artist_collection, " feat. {}" if len(self.artist_collection) > 0 else " by {}")
return r return r
@property @property
@@ -234,11 +248,6 @@ class Song(Base):
return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}" return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}"
"""
All objects dependent on Album
"""
class Album(Base): class Album(Base):
title: str title: str
unified_title: str unified_title: str
@@ -250,10 +259,12 @@ class Album(Base):
albumsort: int albumsort: int
notes: FormattedText notes: FormattedText
artwork: ArtworkCollection
source_collection: SourceCollection source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song] song_collection: Collection[Song]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -268,10 +279,12 @@ class Album(Base):
"date": ID3Timestamp, "date": ID3Timestamp,
"notes": FormattedText, "notes": FormattedText,
"artwork": lambda: ArtworkCollection(crop_images=False),
"source_collection": SourceCollection, "source_collection": SourceCollection,
"artist_collection": Collection,
"song_collection": Collection, "song_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -289,6 +302,7 @@ class Album(Base):
barcode: str = None, barcode: str = None,
albumsort: int = None, albumsort: int = None,
notes: FormattedText = None, notes: FormattedText = None,
artwork: ArtworkCollection = None,
source_list: List[Source] = None, source_list: List[Source] = None,
artist_list: List[Artist] = None, artist_list: List[Artist] = None,
song_list: List[Song] = None, song_list: List[Song] = None,
@@ -303,7 +317,17 @@ class Album(Base):
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
song: Song = append_hook_arguments.new_object
for root_object in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(root_object.artwork)
def __init_collections__(self): def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.song_collection.append_object_to_attribute = { self.song_collection.append_object_to_attribute = {
"album_collection": self "album_collection": self
} }
@@ -312,12 +336,14 @@ class Album(Base):
} }
self.artist_collection.append_object_to_attribute = { self.artist_collection.append_object_to_attribute = {
"main_album_collection": self "album_collection": self
} }
self.artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection "label_collection": self.label_collection
} }
self.song_collection.append_callbacks = set((Album.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
self.song_collection.extend(object_list) self.song_collection.extend(object_list)
@@ -366,8 +392,11 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "album "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
if len(self.artist_collection) <= 0:
r += get_collection_string(self.feature_artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0: if len(self.song_collection) > 0:
@@ -470,10 +499,12 @@ class Artist(Base):
general_genre: str general_genre: str
unformatted_location: str unformatted_location: str
artwork: ArtworkCollection
source_collection: SourceCollection source_collection: SourceCollection
contact_collection: Collection[Contact] contact_collection: Collection[Contact]
main_album_collection: Collection[Album] album_collection: Collection[Album]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -486,8 +517,10 @@ class Artist(Base):
"lyrical_themes": list, "lyrical_themes": list,
"general_genre": lambda: "", "general_genre": lambda: "",
"artwork": ArtworkCollection,
"source_collection": SourceCollection, "source_collection": SourceCollection,
"main_album_collection": Collection, "album_collection": Collection,
"contact_collection": Collection, "contact_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -504,11 +537,12 @@ class Artist(Base):
notes: FormattedText = None, notes: FormattedText = None,
lyrical_themes: List[str] = None, lyrical_themes: List[str] = None,
general_genre: str = None, general_genre: str = None,
artwork: ArtworkCollection = None,
unformatted_location: str = None, unformatted_location: str = None,
source_list: List[Source] = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, contact_list: List[Contact] = None,
feature_song_list: List[Song] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, album_list: List[Album] = None,
label_list: List[Label] = None, label_list: List[Label] = None,
**kwargs **kwargs
) -> None: ) -> None:
@@ -518,10 +552,14 @@ class Artist(Base):
Base.__init__(**real_kwargs) Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection",) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self): def __init_collections__(self):
self.album_collection.append_object_to_attribute = {
"feature_artist_collection": self
}
self.label_collection.append_object_to_attribute = { self.label_collection.append_object_to_attribute = {
"current_artist_collection": self "current_artist_collection": self
} }
@@ -535,7 +573,7 @@ class Artist(Base):
return return
if object_type is Album: if object_type is Album:
self.main_album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
if object_type is Label: if object_type is Label:
@@ -548,7 +586,7 @@ class Artist(Base):
def update_albumsort(self): def update_albumsort(self):
""" """
This updates the albumsort attributes, of the albums in This updates the albumsort attributes, of the albums in
`self.main_album_collection`, and sorts the albums, if possible. `self.album_collection`, and sorts the albums, if possible.
It is advised to only call this function, once all the albums are It is advised to only call this function, once all the albums are
added to the artist. added to the artist.
@@ -566,7 +604,7 @@ class Artist(Base):
# order albums in the previously defined section # order albums in the previously defined section
album: Album album: Album
for album in self.main_album_collection: for album in self.album_collection:
sections[type_section[album.album_type]].append(album) sections[type_section[album.album_type]].append(album)
def sort_section(_section: List[Album], last_albumsort: int) -> int: def sort_section(_section: List[Album], last_albumsort: int) -> int:
@@ -597,7 +635,7 @@ class Artist(Base):
album_list.extend(sections[section_index]) album_list.extend(sections[section_index])
# replace the old collection with the new one # replace the old collection with the new one
self.main_album_collection._data = album_list self.album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property @property
@@ -619,12 +657,13 @@ class Artist(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "artist "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value
if len(self.main_album_collection) > 0: if len(self.album_collection) > 0:
r += f" with {len(self.main_album_collection)} albums" r += f" with {len(self.album_collection)} albums"
r += BColors.ENDC.value r += BColors.ENDC.value
@@ -712,4 +751,4 @@ class Label(Base):
@property @property
def option_string(self): def option_string(self):
return OPTION_FOREGROUND.value + self.name + BColors.ENDC.value return "label " + OPTION_FOREGROUND.value + self.name + BColors.ENDC.value

View File

@@ -1,17 +1,17 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, TextIO, Union, Optional
import logging import logging
import random import random
from pathlib import Path
from typing import List, Optional, TextIO, Tuple, Union
import requests import requests
from tqdm import tqdm from tqdm import tqdm
from .parents import OuterProxy from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from .parents import OuterProxy
LOGGER = logging.getLogger("target") LOGGER = logging.getLogger("target")
@@ -31,7 +31,8 @@ class Target(OuterProxy):
} }
@classmethod @classmethod
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P: def temp(cls, name: str = None, file_extension: Optional[str] = None) -> P:
name = name or str(random.randint(0, HIGHEST_ID))
if file_extension is not None: if file_extension is not None:
name = f"{name}.{file_extension}" name = f"{name}.{file_extension}"
@@ -117,3 +118,11 @@ class Target(OuterProxy):
def read_bytes(self) -> bytes: def read_bytes(self) -> bytes:
return self.file_path.read_bytes() return self.file_path.read_bytes()
@property
def raw_content(self) -> bytes:
return self.file_path.read_bytes()
@raw_content.setter
def raw_content(self, content: bytes):
self.file_path.write_bytes(content)

View File

@@ -3,5 +3,6 @@ from .musify import Musify
from .youtube import YouTube from .youtube import YouTube
from .youtube_music import YoutubeMusic from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp from .bandcamp import Bandcamp
from .genius import Genius
from .abstract import Page, INDEPENDENT_DB_OBJECTS from .abstract import Page, INDEPENDENT_DB_OBJECTS

View File

@@ -1,33 +1,22 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import json import json
from enum import Enum from enum import Enum
from bs4 import BeautifulSoup from typing import List, Optional, Type
import pycountry from urllib.parse import urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourceType,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection from ..connection import Connection
from ..objects import (Album, Artist, ArtworkCollection, Contact,
DatabaseObject, FormattedText, ID3Timestamp, Label,
Lyrics, Song, Source, SourceType, Target)
from ..utils import dump_to_file from ..utils import dump_to_file
from ..utils.enums import SourceType, ALL_SOURCE_TYPES from ..utils.config import logging_settings, main_settings
from ..utils.support_classes.download_result import DownloadResult from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..utils.shared import DEBUG from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG: if DEBUG:
from ..utils import dump_to_file from ..utils import dump_to_file
@@ -117,7 +106,7 @@ class Bandcamp(Page):
return Song( return Song(
title=clean_song_title(name, artist_name=data["band_name"]), title=clean_song_title(name, artist_name=data["band_name"]),
source_list=source_list, source_list=source_list,
main_artist_list=[ artist_list=[
Artist( Artist(
name=data["band_name"], name=data["band_name"],
source_list=[ source_list=[
@@ -237,8 +226,13 @@ class Bandcamp(Page):
html_music_grid = soup.find("ol", {"id": "music-grid"}) html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None: if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"): for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source)) artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
# artist artwork
artist_artwork: BeautifulSoup = soup.find("img", {"class":"band-photo"})
if artist_artwork is not None:
artist.artwork.add_data(artist_artwork.get("data-src", artist_artwork.get("src")))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})): for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"] data_blob = data_blob_soup["data-blob"]
@@ -246,14 +240,14 @@ class Bandcamp(Page):
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False) dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None: if data_blob is not None:
artist.main_album_collection.extend( artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url) self._parse_artist_data_blob(json.loads(data_blob), source.url)
) )
artist.source_collection.append(source) artist.source_collection.append(source)
return artist return artist
def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]: def _parse_track_element(self, track: dict, artwork: ArtworkCollection) -> Optional[Song]:
lyrics_list: List[Lyrics] = [] lyrics_list: List[Lyrics] = []
_lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text") _lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text")
@@ -287,9 +281,15 @@ class Bandcamp(Page):
artist_source_list = [] artist_source_list = []
if "@id" in artist_data: if "@id" in artist_data:
artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"])))
album = Album( album = Album(
title=data["name"].strip(), title=data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))], source_list=source_list,
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
@@ -297,7 +297,7 @@ class Bandcamp(Page):
)] )]
) )
artwork: Artwork = Artwork() artwork: ArtworkCollection = ArtworkCollection()
def _get_artwork_url(_data: dict) -> Optional[str]: def _get_artwork_url(_data: dict) -> Optional[str]:
if "image" in _data: if "image" in _data:
@@ -308,15 +308,14 @@ class Bandcamp(Page):
_artwork_url = _get_artwork_url(data) _artwork_url = _get_artwork_url(data)
if _artwork_url is not None: if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350) artwork.add_data(url=_artwork_url, width=350, height=350)
else: else:
for album_release in data.get("albumRelease", []): for album_release in data.get("albumRelease", []):
_artwork_url = _get_artwork_url(album_release) _artwork_url = _get_artwork_url(album_release)
if _artwork_url is not None: if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350) artwork.add_data(url=_artwork_url, width=350, height=350)
break break
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])): for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
if DEBUG: if DEBUG:
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False) dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
@@ -362,17 +361,29 @@ class Bandcamp(Page):
for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items(): for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items():
mp3_url = value mp3_url = value
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url))
source_list_album: List[Source] = [source]
if "@id" in album_data:
source_list_album.append(Source(self.SOURCE_TYPE, album_data["@id"]))
source_list_artist: List[Source] = [source]
if "@id" in artist_data:
source_list_artist.append(Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"])))
song = Song( song = Song(
title=clean_song_title(data["name"], artist_name=artist_data["name"]), title=clean_song_title(data["name"], artist_name=artist_data["name"]),
source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)], source_list=source_list,
album_list=[Album( album_list=[Album(
title=album_data["name"].strip(), title=album_data["name"].strip(),
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])] source_list=source_list_album
)], )],
main_artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] source_list=source_list_artist
)], )],
lyrics_list=self._fetch_lyrics(soup=soup) lyrics_list=self._fetch_lyrics(soup=soup)
) )

View File

@@ -52,7 +52,7 @@ def _song_from_json(artist_html=None, album_html=None, release_type=None, title=
return Song( return Song(
title=title, title=title,
main_artist_list=[ artist_list=[
_artist_from_json(artist_html=artist_html) _artist_from_json(artist_html=artist_html)
], ],
album_list=[ album_list=[
@@ -663,7 +663,7 @@ class EncyclopaediaMetallum(Page):
artist.notes = band_notes artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id) discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography) artist.album_collection.extend(discography)
return artist return artist

View File

@@ -0,0 +1,288 @@
import simplejson as json
from json_unescape import escape_json, unescape_json
from enum import Enum
from typing import List, Optional, Type
from urllib.parse import urlencode, urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (Album, Artist, ArtworkCollection, Contact,
DatabaseObject, FormattedText, ID3Timestamp, Label,
Lyrics, Song, Source, SourceType, Target)
from ..utils import dump_to_file, traverse_json_path
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG:
from ..utils import dump_to_file
class Genius(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS
HOST = "genius.com"
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://genius.com/",
logger=self.LOGGER,
module="genius",
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
path = source.parsed_url.path.replace("/", "")
if path.startswith("artists"):
return Artist
if path.startswith("albums"):
return Album
return Song
def add_to_artwork(self, artwork: ArtworkCollection, url: str):
if url is None:
return
url_frags = url.split(".")
if len(url_frags) < 2:
artwork.add_data(url=url)
return
dimensions = url_frags[-2].split("x")
if len(dimensions) < 2:
artwork.add_data(url=url)
return
if len(dimensions) == 3:
dimensions = dimensions[:-1]
try:
artwork.add_data(url=url, width=int(dimensions[0]), height=int(dimensions[1]))
except ValueError:
artwork.add_data(url=url)
def parse_api_object(self, data: dict) -> Optional[DatabaseObject]:
if data is None:
return None
object_type = data.get("_type")
artwork = ArtworkCollection()
self.add_to_artwork(artwork, data.get("header_image_url"))
self.add_to_artwork(artwork, data.get("image_url"))
additional_sources: List[Source] = []
source: Source = Source(self.SOURCE_TYPE, data.get("url"), additional_data={
"id": data.get("id"),
"slug": data.get("slug"),
"api_path": data.get("api_path"),
})
notes = FormattedText()
description = data.get("description") or {}
if "html" in description:
notes.html = description["html"]
elif "markdown" in description:
notes.markdown = description["markdown"]
elif "description_preview" in data:
notes.plaintext = data["description_preview"]
if source.url is None:
return None
if object_type == "artist":
if data.get("instagram_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.INSTAGRAM, f"https://www.instagram.com/{data['instagram_name']}/"))
if data.get("facebook_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.FACEBOOK, f"https://www.facebook.com/{data['facebook_name']}/"))
if data.get("twitter_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.TWITTER, f"https://x.com/{data['twitter_name']}/"))
return Artist(
name=data["name"].strip() if data.get("name") is not None else None,
source_list=[source],
artwork=artwork,
notes=notes,
)
if object_type == "album":
self.add_to_artwork(artwork, data.get("cover_art_thumbnail_url"))
self.add_to_artwork(artwork, data.get("cover_art_url"))
for cover_art in data.get("cover_arts", []):
self.add_to_artwork(artwork, cover_art.get("image_url"))
self.add_to_artwork(artwork, cover_art.get("thumbnail_image_url"))
return Album(
title=data.get("name").strip(),
source_list=[source],
artist_list=[self.parse_api_object(data.get("artist"))],
artwork=artwork,
date=ID3Timestamp(**(data.get("release_date_components") or {})),
)
if object_type == "song":
self.add_to_artwork(artwork, data.get("song_art_image_thumbnail_url"))
self.add_to_artwork(artwork, data.get("song_art_image_url"))
main_artist_list = []
featured_artist_list = []
_artist_name = None
primary_artist = self.parse_api_object(data.get("primary_artist"))
if primary_artist is not None:
_artist_name = primary_artist.name
main_artist_list.append(primary_artist)
for feature_artist in (*(data.get("featured_artists") or []), *(data.get("producer_artists") or []), *(data.get("writer_artists") or [])):
artist = self.parse_api_object(feature_artist)
if artist is not None:
featured_artist_list.append(artist)
return Song(
title=clean_song_title(data.get("title"), artist_name=_artist_name),
source_list=[source],
artwork=artwork,
feature_artist_list=featured_artist_list,
artist_list=main_artist_list,
)
return None
def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]:
results = []
search_params = {
"q": search_query,
}
r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}")
if r is None:
return results
dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
for elements in traverse_json_path(data, "response.sections", default=[]):
hits = elements.get("hits", [])
for hit in hits:
parsed = self.parse_api_object(hit.get("result"))
if parsed is not None:
results.append(parsed)
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist: Artist = Artist()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return artist
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_artist.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
artist = self.parse_api_object(data.get("artist"))
for e in (data.get("artist_albums") or []):
r = self.parse_api_object(e)
if not isinstance(r, Album):
continue
artist.album_collection.append(r)
for e in (data.get("artist_songs") or []):
r = self.parse_api_object(e)
if not isinstance(r, Song):
continue
"""
TODO
fetch the album for these songs, because the api doesn't
return them
"""
artist.album_collection.extend(r.album_collection)
artist.source_collection.append(source)
return artist
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album: Album = Album()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return album
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_album.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
album = self.parse_api_object(data.get("album"))
for e in data.get("album_appearances", []):
r = self.parse_api_object(e.get("song"))
if not isinstance(r, Song):
continue
album.song_collection.append(r)
album.source_collection.append(source)
return album
def get_json_content_from_response(self, response, start: str, end: str) -> Optional[str]:
content = response.text
start_index = content.find(start)
if start_index < 0:
return None
start_index += len(start)
end_index = content.find(end, start_index)
if end_index < 0:
return None
return content[start_index:end_index]
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
song: Song = Song()
r = self.connection.get(source.url, name=source.url)
if r is None:
return song
# get the contents that are between `JSON.parse('` and `');`
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
if content is not None:
#IMPLEMENT FIX FROM HAZEL
content = escape_json(content)
data = json.loads(content)
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)
if lyrics_html is not None:
song.lyrics_collection.append(Lyrics(FormattedText(html=lyrics_html)))
dump_to_file("genius_song_script_json.json", content, is_json=True, exit_after_dump=False)
soup = self.get_soup_from_response(r)
for lyrics in soup.find_all("div", {"data-lyrics-container": "true"}):
lyrics_object = Lyrics(FormattedText(html=lyrics.prettify()))
song.lyrics_collection.append(lyrics_object)
song.source_collection.append(source)
return song

View File

@@ -1,34 +1,25 @@
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import List, Optional, Type, Union, Generator, Dict, Any from typing import Any, Dict, Generator, List, Optional, Type, Union
from urllib.parse import urlparse from urllib.parse import urlparse
import pycountry import pycountry
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from .abstract import Page from ..objects import (Album, Artist, DatabaseObject,
from ..utils.enums import SourceType, ALL_SOURCE_TYPES FormattedText, ID3Timestamp, Label, Lyrics, Song,
from ..utils.enums.album import AlbumType, AlbumStatus Source, Target)
from ..objects import ( from ..objects.artwork import (Artwork, ArtworkVariant, ArtworkCollection)
Artist, from ..utils import shared, string_processing
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Target,
DatabaseObject,
Lyrics,
Artwork
)
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils import string_processing, shared from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .abstract import Page
""" """
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
@@ -457,17 +448,17 @@ class Musify(Page):
for album_info in soup.find_all("ul", {"class": "album-info"}): for album_info in soup.find_all("ul", {"class": "album-info"}):
list_element: BeautifulSoup = album_info.find("li") list_element: BeautifulSoup = album_info.find("li")
if list_element is not None: if list_element is not None:
artist_soup: BeautifulSoup artist_soup: BeautifulSoup
for artist_soup in list_element.find_all("a"): for artist_soup in list_element.find_all("a"):
artist_source_list = [] artist_source_list = []
href = artist_soup["href"] href = artist_soup["href"]
if href is not None: if href is not None:
artist_source_list = [Source(self.SOURCE_TYPE, self.HOST + href)] artist_source_list = [Source(self.SOURCE_TYPE, self.HOST + href)]
artist_list.append(Artist( artist_list.append(Artist(
name=artist_soup.text.strip(), name=artist_soup.text.strip(),
source_list=artist_source_list source_list=artist_source_list
)) ))
# breadcrums # breadcrums
breadcrumb_list_element_list: List[BeautifulSoup] = soup.find_all("ol", {"class": "breadcrumb"}) breadcrumb_list_element_list: List[BeautifulSoup] = soup.find_all("ol", {"class": "breadcrumb"})
@@ -485,11 +476,11 @@ class Musify(Page):
track_name = list_points[4].text.strip() track_name = list_points[4].text.strip()
# artwork # album artwork
artwork: Artwork = Artwork() artwork: ArtworkCollection = ArtworkCollection()
album_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class": "album-img"}) album_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class": "album-img"})
for album_image_element in album_image_element_list: for album_image_element in album_image_element_list:
artwork.append(url=album_image_element.get("data-src", album_image_element.get("src"))) artwork.add_data(url=album_image_element.get("data-src", album_image_element.get("src")))
# lyrics # lyrics
lyrics_container: List[BeautifulSoup] = soup.find_all("div", {"id": "tabLyrics"}) lyrics_container: List[BeautifulSoup] = soup.find_all("div", {"id": "tabLyrics"})
@@ -754,11 +745,18 @@ class Musify(Page):
except ValueError: except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}") self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
# album artwork
album_artwork: ArtworkCollection = ArtworkCollection()
album_artwork_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for album_artwork in album_artwork_list:
album_artwork.add_data(url=album_artwork.get("data-src", album_artwork.get("src")))
return Album( return Album(
title=name, title=name,
source_list=source_list, source_list=source_list,
artist_list=artist_list, artist_list=artist_list,
date=date date=date,
artwork=album_artwork
) )
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
@@ -795,6 +793,8 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup) new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song) album.song_collection.append(new_song)
album.update_tracksort() album.update_tracksort()
return album return album
@@ -914,11 +914,18 @@ class Musify(Page):
if note_soup is not None: if note_soup is not None:
notes.html = note_soup.decode_contents() notes.html = note_soup.decode_contents()
# get artist profile artwork
main_artist_artwork: ArtworkCollection = ArtworkCollection()
artist_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for artist_image_element in artist_image_element_list:
main_artist_artwork.add_data(url=artist_image_element.get("data-src", artist_image_element.get("src")))
return Artist( return Artist(
name=name, name=name,
country=country, country=country,
source_list=source_list, source_list=source_list,
notes=notes notes=notes,
artwork=main_artist_artwork
) )
def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album: def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album:
@@ -1054,21 +1061,31 @@ class Musify(Page):
if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist: if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist:
continue continue
artist.main_album_collection.append(album) artist.album_collection.append(album)
def _fetch_artist_artwork(self, source: str, artist: Artist, **kwargs):
# artist artwork
artwork_gallery = self.get_soup_from_response(self.connection.get(source.strip().strip("/") + "/photos"))
if artwork_gallery is not None:
gallery_body_content: BeautifulSoup = artwork_gallery.find(id="bodyContent")
gallery_image_element_list: List[BeautifulSoup] = gallery_body_content.find_all("img")
for gallery_image_element in gallery_image_element_list:
artist.artwork.append(ArtworkVariant(url=gallery_image_element.get("data-src", gallery_image_element.get("src")), width=247, heigth=247))
def fetch_artist(self, source: Source, **kwargs) -> Artist: def fetch_artist(self, source: Source, **kwargs) -> Artist:
""" """
TODO TODO
[x] discography [x] discography
[x] attributes [x] attributes
[] picture gallery [x] picture gallery
""" """
url = parse_url(source.url) url = parse_url(source.url)
artist = self._fetch_initial_artist(url, source=source, **kwargs) artist = self._fetch_initial_artist(url, source=source, **kwargs)
self._fetch_artist_discography(artist, url, artist.name, **kwargs) self._fetch_artist_discography(artist, url, artist.name, **kwargs)
self._fetch_artist_artwork(url.url, artist, **kwargs)
return artist return artist
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:

View File

@@ -143,7 +143,7 @@ class YouTube(SuperYouTube):
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}") self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)], )],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ), notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list artist_list=artist_list
), int(data["published"]) ), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
@@ -284,7 +284,7 @@ class YouTube(SuperYouTube):
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)") self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id) album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source]) return Artist(name=artist_name, album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
""" """

View File

@@ -58,6 +58,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
song.album_collection.extend(album_list) song.album_collection.extend(album_list)
return [song] return [song]
if len(album_list) == 1:
album = album_list[0]
album.artist_collection.extend(artist_list)
album.song_collection.extend(song_list)
return [album]
"""
if len(artist_list) == 1:
artist = artist_list[0]
artist.main_album_collection.extend(album_list)
return [artist]
"""
return results return results

View File

@@ -1,46 +1,33 @@
from __future__ import unicode_literals, annotations from __future__ import annotations, unicode_literals
from typing import Dict, List, Optional, Set, Type import json
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
import logging import logging
import random import random
import json
from dataclasses import dataclass
import re import re
from functools import lru_cache
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Set, Type
from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
import youtube_dl import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError from youtube_dl.utils import DownloadError
from ...connection import Connection
from ...objects import Album, Artist, ArtworkCollection
from ...objects import DatabaseObject as DataObject
from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target)
from ...utils import dump_to_file, get_current_millis, traverse_json_path
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.enums.album import AlbumType
from ...utils.exception.config import SettingValueError from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
from ..abstract import Page
from ...objects import (
DatabaseObject as DataObject,
Source,
FormattedText,
ID3Timestamp,
Artwork,
Artist,
Song,
Album,
Label,
Target,
Lyrics,
)
from ...connection import Connection
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
from ..abstract import Page
from ._list_render import parse_renderer from ._list_render import parse_renderer
from ._music_object_render import parse_run_element from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube from .super_youtube import SuperYouTube
@@ -438,6 +425,7 @@ class YoutubeMusic(SuperYouTube):
data: dict = r.json() data: dict = r.json()
header = data.get("header", {}) header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
musicImmersiveHeaderRenderer = header.get("musicImmersiveHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@@ -450,6 +438,11 @@ class YoutubeMusic(SuperYouTube):
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
# fetch artist artwork
artist_thumbnails = musicImmersiveHeaderRenderer.get("thumbnail", {}).get("musicThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for artist_thumbnail in artist_thumbnails:
artist.artwork.append(artist_thumbnail)
if DEBUG: if DEBUG:
for i, content in enumerate(renderer_list): for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False) dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
@@ -496,7 +489,12 @@ class YoutubeMusic(SuperYouTube):
# album details # album details
header = data.get("header", {}) header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
# album artwork
album_thumbnails = musicDetailHeaderRenderer.get("thumbnail", {}).get("croppedSquareThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for album_thumbnail in album_thumbnails:
album.artwork.append(value=album_thumbnail)
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@@ -549,6 +547,11 @@ class YoutubeMusic(SuperYouTube):
return album return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str: def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
"""
1. fetches the tabs of a song, to get the browse id
2. finds the browse id of the lyrics
3. fetches the lyrics with the browse id
"""
request_data = { request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, "context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id, "videoId": video_id,
@@ -575,7 +578,8 @@ class YoutubeMusic(SuperYouTube):
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="") pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower(): if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None) browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break if browse_id is not None:
break
if browse_id is None: if browse_id is None:
return None return None
@@ -589,6 +593,8 @@ class YoutubeMusic(SuperYouTube):
}, },
name=f"fetch_song_lyrics_{video_id}.json" name=f"fetch_song_lyrics_{video_id}.json"
) )
if r is None:
return None
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False) dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
@@ -638,8 +644,8 @@ class YoutubeMusic(SuperYouTube):
note=ydl_res.get("descriptions"), note=ydl_res.get("descriptions"),
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])), artwork=ArtworkCollection(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list, artist_list=artist_list,
source_list=[Source( source_list=[Source(
self.SOURCE_TYPE, self.SOURCE_TYPE,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}" f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
@@ -677,7 +683,7 @@ class YoutubeMusic(SuperYouTube):
for album in song.album_list: for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []): for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail) song.artwork.add_data(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId"))) song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
@@ -719,7 +725,6 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url[source.url] = { self.download_values_by_url[source.url] = {
"url": _best_format.get("url"), "url": _best_format.get("url"),
"chunk_size": _best_format.get("downloader_options", {}).get("http_chunk_size", main_settings["chunk_size"]),
"headers": _best_format.get("http_headers", {}), "headers": _best_format.get("http_headers", {}),
} }

View File

@@ -1,15 +1,18 @@
from datetime import datetime import inspect
from pathlib import Path
import json import json
import logging import logging
import inspect from datetime import datetime
from typing import List, Union from functools import lru_cache
from pathlib import Path
from typing import Any, List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args from .hacking import merge_args
from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE, URL_PATTERN)
from .string_processing import hash_url, is_url, unify
""" """
IO functions IO functions
@@ -125,4 +128,35 @@ def get_current_millis() -> int:
def get_unix_time() -> int: def get_unix_time() -> int:
return int(datetime.now().timestamp()) return int(datetime.now().timestamp())
@lru_cache
def custom_hash(value: Any) -> int:
if is_url(value):
value = hash_url(value)
elif isinstance(value, str):
try:
value = int(value)
except ValueError:
value = unify(value)
return hash(value)
def create_dataclass_instance(t, data: dict):
"""Creates an instance of a dataclass with the given data.
It filters out all data key, which has no attribute in the dataclass.
Args:
t (Type): The dataclass type class
data (dict): the attribute to pass into the constructor
Returns:
Tuple[Type, dict]: The created instance and a dict, containing the data, which was not used in the creation
"""
needed_data = {k: v for k, v in data.items() if k in t.__dataclass_fields__}
removed_data = {k: v for k, v in data.items() if k not in t.__dataclass_fields__}
return t(**needed_data), removed_data

View File

@@ -1,11 +1,8 @@
from typing import Tuple from typing import Tuple
from .config import Config from .config import Config
from .config_files import ( from .config_files import main_config, logging_config, youtube_config
main_config,
logging_config,
youtube_config,
)
_sections: Tuple[Config, ...] = ( _sections: Tuple[Config, ...] = (
main_config.config, main_config.config,

View File

@@ -18,8 +18,9 @@ config = Config((
AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format. AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format.
You can use Audio formats which support ID3.2 and ID3.1, You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""), but you will have cleaner Metadata using ID3.2."""),
Attribute(name="image_format", default_value="jpeg", description="This Changes the format in which images are getting downloaded"),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results. Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""), The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history. Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage. The further you choose to be able to go back, the higher the memory usage.
@@ -28,6 +29,7 @@ The further you choose to be able to go back, the higher the memory usage.
EmptyLine(), EmptyLine(),
Attribute(name="preferred_artwork_resolution", default_value=1000), Attribute(name="preferred_artwork_resolution", default_value=1000),
Attribute(name="download_artist_artworks", default_value=True, description="Enables the fetching of artist galleries."),
EmptyLine(), EmptyLine(),
@@ -44,6 +46,7 @@ This means for example, the Studio Albums and EP's are always in front of Single
- album_type - album_type
The folder music kraken should put the songs into."""), The folder music kraken should put the songs into."""),
Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."), Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."),
Attribute(name="artist_artwork_path", default_value="{genre}/{artist}/{artist}_{image_number}.{image_format}", description="The Path to download artist images to."),
SelectAttribute(name="album_type_blacklist", default_value=[ SelectAttribute(name="album_type_blacklist", default_value=[
"Compilation Album", "Compilation Album",
"Live Album", "Live Album",
@@ -152,10 +155,13 @@ class SettingsStructure(TypedDict):
# artwork # artwork
preferred_artwork_resolution: int preferred_artwork_resolution: int
image_format: str
download_artist_artworks: bool
# paths # paths
music_directory: Path music_directory: Path
temp_directory: Path temp_directory: Path
artist_artwork_path: Path
log_file: Path log_file: Path
not_a_genre_regex: List[str] not_a_genre_regex: List[str]
ffmpeg_binary: Path ffmpeg_binary: Path

View File

@@ -1,7 +1,11 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, TYPE_CHECKING, Type from enum import Enum
from typing import TYPE_CHECKING, Optional, Type
from mutagen.id3 import PictureType
if TYPE_CHECKING: if TYPE_CHECKING:
from ...pages.abstract import Page from ...pages.abstract import Page
@@ -52,3 +56,73 @@ class ALL_SOURCE_TYPES:
MANUAL = SourceType(name="manual") MANUAL = SourceType(name="manual")
PRESET = SourceType(name="preset") PRESET = SourceType(name="preset")
class PictureType(Enum):
"""Enumeration of image types defined by the ID3 standard for the APIC
frame, but also reused in WMA/FLAC/VorbisComment.
This is copied from mutagen.id3.PictureType
"""
OTHER = 0
FILE_ICON = 1
"""32x32 pixels 'file icon' (PNG only)"""
OTHER_FILE_ICON = 2
"""Other file icon"""
COVER_FRONT = 3
"""Cover (front)"""
COVER_BACK = 4
"""Cover (back)"""
LEAFLET_PAGE = 5
"""Leaflet page"""
MEDIA = 6
"""Media (e.g. label side of CD)"""
LEAD_ARTIST = 7
"""Lead artist/lead performer/soloist"""
ARTIST = 8
"""Artist/performer"""
CONDUCTOR = 9
"""Conductor"""
BAND = 10
"""Band/Orchestra"""
COMPOSER = 11
"""Composer"""
LYRICIST = 12
"""Lyricist/text writer"""
RECORDING_LOCATION = 13
"""Recording Location"""
DURING_RECORDING = 14
"""During recording"""
DURING_PERFORMANCE = 15
"""During performance"""
SCREEN_CAPTURE = 16
"""Movie/video screen capture"""
FISH = 17
"""A bright colored fish"""
ILLUSTRATION = 18
"""Illustration"""
BAND_LOGOTYPE = 19
"""Band/artist logotype"""
PUBLISHER_LOGOTYPE = 20
"""Publisher/Studio logotype"""

View File

@@ -12,14 +12,14 @@ if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
__stage__ = os.getenv("STAGE", "prod") __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and False DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False DEBUG_DUMP = DEBUG and True
DEBUG_PRINT_ID = DEBUG and True DEBUG_PRINT_ID = DEBUG and True
if DEBUG: if DEBUG:

View File

@@ -1,13 +1,15 @@
from typing import Tuple, Union, Optional import re
from pathlib import Path
import string import string
from functools import lru_cache from functools import lru_cache
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs from transliterate import translit
from transliterate.exceptions import LanguageDetectionError
from .shared import URL_PATTERN
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)", "(official video)",
@@ -229,3 +231,13 @@ def shorten_display_url(url: str, max_length: int = 150, chars_at_end: int = 4,
return url return url
return url[:max_length] + shorten_string + url[-chars_at_end:] return url[:max_length] + shorten_string + url[-chars_at_end:]
def is_url(value: Any) -> bool:
if isinstance(value, ParseResult):
return True
if not isinstance(value, str):
return True
# value has to be a string
return re.match(URL_PATTERN, value) is not None

View File

@@ -1,9 +1,13 @@
from dataclasses import dataclass, field from __future__ import annotations
from typing import List, Tuple
from ...utils.config import main_settings, logging_settings from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Tuple
if TYPE_CHECKING:
from ...objects import Target
from ...utils.config import logging_settings, main_settings
from ...utils.enums.colors import BColors from ...utils.enums.colors import BColors
from ...objects import Target
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"] UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
UNIT_DIVISOR = 1024 UNIT_DIVISOR = 1024

View File

@@ -3,96 +3,98 @@ import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase): class TestCollection(unittest.TestCase):
@staticmethod def test_song_contains_album(self):
def complicated_object() -> Artist: """
return Artist( Tests that every song contains the album it is added to in its album_collection
name="artist", """
country=Country.by_alpha_2("DE"),
main_album_list=[ a_1 = Album(
Album( title="album",
title="album", song_list= [
song_list=[ Song(title="song"),
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
] ]
) )
a_2 = a_1.song_collection[0].album_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_song_album_relation(self): def test_album_contains_song(self):
""" """
Tests that Tests that every album contains the song it is added to in its song_collection
album = album.any_song.one_album """
is the same object s_1 = Song(
title="song",
album_list=[
Album(title="album"),
]
)
s_2 = s_1.album_collection[0].song_collection[0]
self.assertTrue(s_1.id == s_2.id)
def test_auto_add_artist_to_album_feature_artist(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default
""" """
a = self.complicated_object().main_album_collection[0] a_1 = Artist(
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
def test_artist_artist_relation(self):
artist = Artist(
name="artist", name="artist",
main_album_list=[ album_list=[
Album(title="album")
]
)
a_2 = a_1.album_collection[0].feature_artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_auto_add_artist_to_album_feature_artist_push(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default but pulled into the album's artist_collection if a merge exitst
"""
a_1 = Artist(
name="artist",
album_list=[
Album( Album(
title="album", title="album",
song_list=[
Song(title="song"),
],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
] ]
) )
] ]
) )
a_2 = a_1.album_collection[0].artist_collection[0]
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].artist_collection[0].id) self.assertTrue(a_1.id == a_2.id)
def test_artist_artist_relation(self):
"""
Tests the proper syncing between album.artist_collection and song.artist_collection
"""
album = Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
a_1 = album.artist_collection[0]
a_2 = album.song_collection[0].artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_artist_collection_sync(self): def test_artist_collection_sync(self):
"""
tests the actual implementation of the test above
"""
album_1 = Album( album_1 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -102,7 +104,7 @@ class TestCollection(unittest.TestCase):
album_2 = Album( album_2 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -113,15 +115,5 @@ class TestCollection(unittest.TestCase):
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection)) self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].artist_collection[0]
c = b.main_album_collection[0].song_collection[0].artist_collection[0]
d = c.main_album_collection[0].song_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()