Compare commits
7 Commits
4ee6fd2137
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
| 810aff4163 | |||
| 5ce76c758e | |||
| 93c9a367a2 | |||
| 17c28722fb | |||
| dd99e60afd | |||
| 274f1bce90 | |||
| b1a306f3f3 |
@@ -1,15 +1,13 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
import music_kraken
|
import music_kraken
|
||||||
|
|
||||||
import logging
|
|
||||||
print("Setting logging-level to DEBUG")
|
print("Setting logging-level to DEBUG")
|
||||||
logging.getLogger().setLevel(logging.DEBUG)
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
commands = [
|
commands = [
|
||||||
"s: #a Crystal F",
|
"s: #a Ghost Bath",
|
||||||
"10",
|
|
||||||
"1",
|
|
||||||
"3",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,13 +67,14 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
|
|||||||
id3_object = AudioMetadata(file_location=target.file_path)
|
id3_object = AudioMetadata(file_location=target.file_path)
|
||||||
|
|
||||||
LOGGER.info(str(metadata))
|
LOGGER.info(str(metadata))
|
||||||
|
## REWRITE COMPLETLY !!!!!!!!!!!!
|
||||||
if song.artwork.best_variant is not None:
|
if len(song.artwork._data) != 0:
|
||||||
best_variant = song.artwork.best_variant
|
variants = song.artwork._data.__getitem__(0)
|
||||||
|
best_variant = variants.variants.__getitem__(0)
|
||||||
|
|
||||||
r = artwork_connection.get(
|
r = artwork_connection.get(
|
||||||
url=best_variant["url"],
|
url=best_variant.url,
|
||||||
name=song.artwork.get_variant_name(best_variant),
|
name=best_variant.url,
|
||||||
)
|
)
|
||||||
|
|
||||||
temp_target: Target = Target.temp()
|
temp_target: Target = Target.temp()
|
||||||
@@ -107,7 +108,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
|
|||||||
mime="image/jpeg",
|
mime="image/jpeg",
|
||||||
type=3,
|
type=3,
|
||||||
desc=u"Cover",
|
desc=u"Cover",
|
||||||
data=converted_target.read_bytes(),
|
data=converted_target.raw_content,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
id3_object.frames.delall("USLT")
|
id3_object.frames.delall("USLT")
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from typing import List, Dict, Optional, Set
|
from typing import TYPE_CHECKING, Dict, List, Optional, Set
|
||||||
from urllib.parse import urlparse, urlunsplit, ParseResult
|
from urllib.parse import ParseResult, urlparse, urlunsplit
|
||||||
import copy
|
|
||||||
import inspect
|
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import responses
|
import responses
|
||||||
@@ -14,12 +14,15 @@ from tqdm import tqdm
|
|||||||
|
|
||||||
from .cache import Cache
|
from .cache import Cache
|
||||||
from .rotating import RotatingProxy
|
from .rotating import RotatingProxy
|
||||||
from ..objects import Target
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..objects import Target
|
||||||
|
|
||||||
from ..utils import request_trace
|
from ..utils import request_trace
|
||||||
from ..utils.string_processing import shorten_display_url
|
|
||||||
from ..utils.config import main_settings
|
from ..utils.config import main_settings
|
||||||
from ..utils.support_classes.download_result import DownloadResult
|
|
||||||
from ..utils.hacking import merge_args
|
from ..utils.hacking import merge_args
|
||||||
|
from ..utils.string_processing import shorten_display_url
|
||||||
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
|
|
||||||
|
|
||||||
class Connection:
|
class Connection:
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ from collections import defaultdict
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import re
|
import re
|
||||||
import logging
|
import logging
|
||||||
|
import subprocess
|
||||||
|
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
@@ -36,8 +37,6 @@ from ..connection import Connection
|
|||||||
|
|
||||||
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
|
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
|
||||||
|
|
||||||
artwork_connection: Connection = Connection()
|
|
||||||
|
|
||||||
ALL_PAGES: Set[Type[Page]] = {
|
ALL_PAGES: Set[Type[Page]] = {
|
||||||
# EncyclopaediaMetallum,
|
# EncyclopaediaMetallum,
|
||||||
Genius,
|
Genius,
|
||||||
@@ -92,7 +91,8 @@ class Pages:
|
|||||||
exclude_pages = exclude_pages.union(SHADY_PAGES)
|
exclude_pages = exclude_pages.union(SHADY_PAGES)
|
||||||
|
|
||||||
if not exclude_pages.issubset(ALL_PAGES):
|
if not exclude_pages.issubset(ALL_PAGES):
|
||||||
raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
|
raise ValueError(
|
||||||
|
f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
|
||||||
|
|
||||||
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
|
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
|
||||||
return tuple(sorted(page_set, key=lambda page: page.__name__))
|
return tuple(sorted(page_set, key=lambda page: page.__name__))
|
||||||
@@ -100,11 +100,14 @@ class Pages:
|
|||||||
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
|
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
|
||||||
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
|
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
|
||||||
|
|
||||||
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
|
self._audio_pages_set: Set[Type[Page]
|
||||||
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
|
] = self._pages_set.intersection(AUDIO_PAGES)
|
||||||
|
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(
|
||||||
|
self._audio_pages_set)
|
||||||
|
|
||||||
for page_type in self.pages:
|
for page_type in self.pages:
|
||||||
self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options)
|
self._page_instances[page_type] = page_type(
|
||||||
|
fetch_options=self.fetch_options, download_options=self.download_options)
|
||||||
self._source_to_page[page_type.SOURCE_TYPE] = page_type
|
self._source_to_page[page_type.SOURCE_TYPE] = page_type
|
||||||
|
|
||||||
def _get_page_from_enum(self, source_page: SourceType) -> Page:
|
def _get_page_from_enum(self, source_page: SourceType) -> Page:
|
||||||
@@ -118,7 +121,8 @@ class Pages:
|
|||||||
for page_type in self.pages:
|
for page_type in self.pages:
|
||||||
result.add(
|
result.add(
|
||||||
page=page_type,
|
page=page_type,
|
||||||
search_result=self._page_instances[page_type].search(query=query)
|
search_result=self._page_instances[page_type].search(
|
||||||
|
query=query)
|
||||||
)
|
)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@@ -131,7 +135,8 @@ class Pages:
|
|||||||
for source in data_object.source_collection.get_sources(source_type_sorting={
|
for source in data_object.source_collection.get_sources(source_type_sorting={
|
||||||
"only_with_page": True,
|
"only_with_page": True,
|
||||||
}):
|
}):
|
||||||
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
|
new_data_object = self.fetch_from_source(
|
||||||
|
source=source, stop_at_level=stop_at_level)
|
||||||
if new_data_object is not None:
|
if new_data_object is not None:
|
||||||
data_object.merge(new_data_object)
|
data_object.merge(new_data_object)
|
||||||
|
|
||||||
@@ -167,69 +172,38 @@ class Pages:
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def download_artwork_variant_to_target(self, artwork_variant: ArtworkVariant, target: Target):
|
|
||||||
|
|
||||||
r = artwork_connection.get(
|
|
||||||
url=artwork_variant["url"],
|
|
||||||
name=artwork_variant["url"],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
temp_target: Target = Target.temp()
|
|
||||||
with temp_target.open("wb") as f:
|
|
||||||
f.write(r.content)
|
|
||||||
|
|
||||||
converted_target: Target = Target.temp(file_extension=main_settings["image_format"])
|
|
||||||
with Image.open(temp_target.file_path) as img:
|
|
||||||
# crop the image if it isn't square in the middle with minimum data loss
|
|
||||||
width, height = img.size
|
|
||||||
if width != height:
|
|
||||||
if width > height:
|
|
||||||
img = img.crop((width // 2 - height // 2, 0, width // 2 + height // 2, height))
|
|
||||||
else:
|
|
||||||
img = img.crop((0, height // 2 - width // 2, width, height // 2 + width // 2))
|
|
||||||
|
|
||||||
# resize the image to the preferred resolution
|
|
||||||
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
|
|
||||||
|
|
||||||
# https://stackoverflow.com/a/59476938/16804841
|
|
||||||
if img.mode != 'RGB':
|
|
||||||
img = img.convert('RGB')
|
|
||||||
|
|
||||||
img.save(target.file_path, main_settings["image_format"])
|
|
||||||
|
|
||||||
def remove_artwork_duplicates(self) -> None:
|
|
||||||
"""
|
|
||||||
This will eliminate duplicates within the given threshold
|
|
||||||
"""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _fetch_artist_artwork(self, artist: Artist, naming: dict):
|
def _fetch_artist_artwork(self, artist: Artist, naming: dict):
|
||||||
naming: Dict[str, List[str]] = defaultdict(list, naming)
|
naming: Dict[str, List[str]] = defaultdict(list, naming)
|
||||||
naming["artist"].append(artist.name)
|
naming["artist"].append(artist.name)
|
||||||
naming["label"].extend([l.title_value for l in artist.label_collection])
|
naming["label"].extend(
|
||||||
|
[l.title_value for l in artist.label_collection])
|
||||||
# removing duplicates from the naming, and process the strings
|
# removing duplicates from the naming, and process the strings
|
||||||
for key, value in naming.items():
|
for key, value in naming.items():
|
||||||
# https://stackoverflow.com/a/17016257
|
# https://stackoverflow.com/a/17016257
|
||||||
naming[key] = list(dict.fromkeys(value))
|
naming[key] = list(dict.fromkeys(value))
|
||||||
|
|
||||||
artwork: Artwork = artist.artwork
|
artwork_collection: ArtworkCollection = artist.artwork
|
||||||
for image_number, variant in enumerate(artwork):
|
artwork_collection.compile()
|
||||||
|
for image_number, artwork in enumerate(artwork_collection):
|
||||||
|
for artwork_variant in artwork.variants:
|
||||||
naming["image_number"] = [str(image_number)]
|
naming["image_number"] = [str(image_number)]
|
||||||
|
|
||||||
url: str = variant["url"]
|
|
||||||
|
|
||||||
target = Target(
|
target = Target(
|
||||||
relative_to_music_dir=True,
|
relative_to_music_dir=True,
|
||||||
file_path=Path(self._parse_path_template(main_settings["artist_artwork_path"], naming=naming))
|
file_path=Path(self._parse_path_template(
|
||||||
|
main_settings["artist_artwork_path"], naming=naming))
|
||||||
)
|
)
|
||||||
self.download_artwork_variant_to_target(variant, target)
|
if not target.file_path.parent.exists():
|
||||||
|
target.create_path()
|
||||||
|
subprocess.Popen(["gio", "set", target.file_path.parent, "metadata::custom-icon", "file://"+str(target.file_path)])
|
||||||
|
with Image.open(artwork_variant.target.file_path) as img:
|
||||||
|
img.save(target.file_path, main_settings["image_format"])
|
||||||
|
artwork_variant.target = Target
|
||||||
|
|
||||||
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
|
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
|
||||||
# fetch the given object
|
# fetch the given object
|
||||||
self.fetch_details(data_object)
|
self.fetch_details(data_object)
|
||||||
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
|
output(
|
||||||
|
f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
|
||||||
|
|
||||||
# fetching all parent objects (e.g. if you only download a song)
|
# fetching all parent objects (e.g. if you only download a song)
|
||||||
if not kwargs.get("fetched_upwards", False):
|
if not kwargs.get("fetched_upwards", False):
|
||||||
@@ -285,13 +259,15 @@ class Pages:
|
|||||||
return set(re.findall(r"{([^}]+)}", path_template))
|
return set(re.findall(r"{([^}]+)}", path_template))
|
||||||
|
|
||||||
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
|
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
|
||||||
field_names: Set[str] = self._extract_fields_from_template(path_template)
|
field_names: Set[str] = self._extract_fields_from_template(
|
||||||
|
path_template)
|
||||||
|
|
||||||
for field in field_names:
|
for field in field_names:
|
||||||
if len(naming[field]) == 0:
|
if len(naming[field]) == 0:
|
||||||
raise MKMissingNameException(f"Missing field for {field}.")
|
raise MKMissingNameException(f"Missing field for {field}.")
|
||||||
|
|
||||||
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
|
path_template = path_template.replace(
|
||||||
|
f"{{{field}}}", naming[field][0])
|
||||||
|
|
||||||
return path_template
|
return path_template
|
||||||
|
|
||||||
@@ -310,7 +286,8 @@ class Pages:
|
|||||||
naming["song"].append(song.title_value)
|
naming["song"].append(song.title_value)
|
||||||
naming["isrc"].append(song.isrc)
|
naming["isrc"].append(song.isrc)
|
||||||
naming["album"].extend(a.title_value for a in song.album_collection)
|
naming["album"].extend(a.title_value for a in song.album_collection)
|
||||||
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
|
naming["album_type"].extend(
|
||||||
|
a.album_type.value for a in song.album_collection)
|
||||||
naming["artist"].extend(a.name for a in song.artist_collection)
|
naming["artist"].extend(a.name for a in song.artist_collection)
|
||||||
naming["artist"].extend(a.name for a in song.feature_artist_collection)
|
naming["artist"].extend(a.name for a in song.feature_artist_collection)
|
||||||
for a in song.album_collection:
|
for a in song.album_collection:
|
||||||
@@ -327,13 +304,16 @@ class Pages:
|
|||||||
song.target_collection.append(Target(
|
song.target_collection.append(Target(
|
||||||
relative_to_music_dir=True,
|
relative_to_music_dir=True,
|
||||||
file_path=Path(
|
file_path=Path(
|
||||||
self._parse_path_template(main_settings["download_path"], naming=naming),
|
self._parse_path_template(
|
||||||
self._parse_path_template(main_settings["download_file"], naming=naming),
|
main_settings["download_path"], naming=naming),
|
||||||
|
self._parse_path_template(
|
||||||
|
main_settings["download_file"], naming=naming),
|
||||||
)
|
)
|
||||||
))
|
))
|
||||||
for target in song.target_collection:
|
for target in song.target_collection:
|
||||||
if target.exists:
|
if target.exists:
|
||||||
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
output(
|
||||||
|
f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
||||||
r.found_on_disk += 1
|
r.found_on_disk += 1
|
||||||
|
|
||||||
if not self.download_options.download_again_if_found:
|
if not self.download_options.download_again_if_found:
|
||||||
@@ -354,8 +334,10 @@ class Pages:
|
|||||||
break
|
break
|
||||||
|
|
||||||
used_source = source
|
used_source = source
|
||||||
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
|
streaming_results = source.page.download_song_to_target(
|
||||||
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
|
source=source, target=tmp, desc="download")
|
||||||
|
skip_intervals = source.page.get_skip_intervals(
|
||||||
|
song=song, source=source)
|
||||||
|
|
||||||
# if something has been downloaded but it somehow failed, delete the file
|
# if something has been downloaded but it somehow failed, delete the file
|
||||||
if streaming_results.is_fatal_error and tmp.exists:
|
if streaming_results.is_fatal_error and tmp.exists:
|
||||||
@@ -379,7 +361,8 @@ class Pages:
|
|||||||
used_source.page.post_process_hook(song=song, temp_target=tmp)
|
used_source.page.post_process_hook(song=song, temp_target=tmp)
|
||||||
|
|
||||||
if not found_on_disk or self.download_options.process_metadata_if_found:
|
if not found_on_disk or self.download_options.process_metadata_if_found:
|
||||||
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
|
write_metadata_to_target(
|
||||||
|
metadata=song.metadata, target=tmp, song=song)
|
||||||
|
|
||||||
# copy the tmp target to the final locations
|
# copy the tmp target to the final locations
|
||||||
for target in song.target_collection:
|
for target in song.target_collection:
|
||||||
@@ -397,5 +380,3 @@ class Pages:
|
|||||||
_actual_page = self._source_to_page[source.source_type]
|
_actual_page = self._source_to_page[source.source_type]
|
||||||
|
|
||||||
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)
|
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)
|
||||||
|
|
||||||
|
|
||||||
@@ -2,8 +2,10 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from copy import copy
|
from copy import copy
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
from functools import cached_property
|
||||||
from typing import Dict, List, Optional, Set, Tuple, Type, TypedDict, Union
|
from typing import Dict, List, Optional, Set, Tuple, Type, TypedDict, Union
|
||||||
|
|
||||||
|
from ..connection import Connection
|
||||||
from ..utils import create_dataclass_instance, custom_hash
|
from ..utils import create_dataclass_instance, custom_hash
|
||||||
from ..utils.config import main_settings
|
from ..utils.config import main_settings
|
||||||
from ..utils.enums import PictureType
|
from ..utils.enums import PictureType
|
||||||
@@ -13,14 +15,20 @@ from .metadata import ID3Timestamp
|
|||||||
from .metadata import Mapping as id3Mapping
|
from .metadata import Mapping as id3Mapping
|
||||||
from .metadata import Metadata
|
from .metadata import Metadata
|
||||||
from .parents import OuterProxy as Base
|
from .parents import OuterProxy as Base
|
||||||
|
from .target import Target
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
import imagehash
|
||||||
|
|
||||||
|
artwork_connection: Connection = Connection(module="artwork")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ArtworkVariant:
|
class ArtworkVariant:
|
||||||
url: str
|
url: str
|
||||||
width: Optional[int] = None
|
width: Optional[int] = None
|
||||||
height: Optional[int] = None
|
heigth: Optional[int] = None
|
||||||
image_format: Optional[str] = ""
|
image_format: Optional[str] = None
|
||||||
|
|
||||||
def __hash__(self) -> int:
|
def __hash__(self) -> int:
|
||||||
return custom_hash(self.url)
|
return custom_hash(self.url)
|
||||||
@@ -31,6 +39,26 @@ class ArtworkVariant:
|
|||||||
def __contains__(self, other: str) -> bool:
|
def __contains__(self, other: str) -> bool:
|
||||||
return custom_hash(other) == hash(self.url)
|
return custom_hash(other) == hash(self.url)
|
||||||
|
|
||||||
|
def __merge__(self, other: ArtworkVariant) -> None:
|
||||||
|
for key, value in other.__dict__.items():
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if getattr(self, key) is None:
|
||||||
|
setattr(self, key, value)
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def target(self) -> Target:
|
||||||
|
return Target.temp()
|
||||||
|
|
||||||
|
def fetch(self) -> None:
|
||||||
|
global artwork_connection
|
||||||
|
|
||||||
|
r = artwork_connection.get(self.url, name=hash_url(self.url))
|
||||||
|
if r is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.target.raw_content = r.content
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Artwork:
|
class Artwork:
|
||||||
@@ -55,10 +83,9 @@ class Artwork:
|
|||||||
variant = self.search_variant(kwargs.get("url"))
|
variant = self.search_variant(kwargs.get("url"))
|
||||||
|
|
||||||
if variant is None:
|
if variant is None:
|
||||||
variant, kwargs = create_dataclass_instance(ArtworkVariant, **kwargs)
|
variant, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
|
||||||
self.variants.append(variant)
|
self.variants.append(variant)
|
||||||
|
|
||||||
variant.url = url
|
|
||||||
variant.__dict__.update(kwargs)
|
variant.__dict__.update(kwargs)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -67,6 +94,10 @@ class Artwork:
|
|||||||
return None
|
return None
|
||||||
return self.variants[0].url
|
return self.variants[0].url
|
||||||
|
|
||||||
|
def fetch(self) -> None:
|
||||||
|
for variant in self.variants:
|
||||||
|
variant.fetch()
|
||||||
|
|
||||||
|
|
||||||
class ArtworkCollection:
|
class ArtworkCollection:
|
||||||
"""
|
"""
|
||||||
@@ -91,8 +122,6 @@ class ArtworkCollection:
|
|||||||
self._data = []
|
self._data = []
|
||||||
self.extend(data)
|
self.extend(data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def search_artwork(self, url: str) -> Optional[ArtworkVariant]:
|
def search_artwork(self, url: str) -> Optional[ArtworkVariant]:
|
||||||
for artwork in self._data:
|
for artwork in self._data:
|
||||||
if url in artwork:
|
if url in artwork:
|
||||||
@@ -106,18 +135,19 @@ class ArtworkCollection:
|
|||||||
def _create_new_artwork(self, **kwargs) -> Tuple[Artwork, dict]:
|
def _create_new_artwork(self, **kwargs) -> Tuple[Artwork, dict]:
|
||||||
kwargs["artwork_type"] = kwargs.get("artwork_type", self.artwork_type)
|
kwargs["artwork_type"] = kwargs.get("artwork_type", self.artwork_type)
|
||||||
|
|
||||||
return create_dataclass_instance(ArtworkVariant, dict(**kwargs))
|
return create_dataclass_instance(Artwork, dict(**kwargs))
|
||||||
|
|
||||||
def add_data(self, url: str, **kwargs) -> None:
|
def add_data(self, url: str, **kwargs) -> Artwork:
|
||||||
kwargs["url"] = url
|
kwargs["url"] = url
|
||||||
|
|
||||||
artwork = self.search_artwork(url)
|
artwork = self.search_artwork(url)
|
||||||
|
|
||||||
if artwork is None:
|
if artwork is None:
|
||||||
artwork, kwargs = self._create_new_artwork(url=url)
|
artwork, kwargs = self._create_new_artwork(**kwargs)
|
||||||
self._data.append(artwork)
|
self._data.append(artwork)
|
||||||
|
|
||||||
artwork.add_data(url, **kwargs)
|
artwork.add_data(**kwargs)
|
||||||
|
return artwork
|
||||||
|
|
||||||
def append(self, value: Union[Artwork, ArtworkVariant, dict], **kwargs):
|
def append(self, value: Union[Artwork, ArtworkVariant, dict], **kwargs):
|
||||||
"""
|
"""
|
||||||
@@ -140,21 +170,65 @@ class ArtworkCollection:
|
|||||||
for value in values:
|
for value in values:
|
||||||
self.append(value, **kwargs)
|
self.append(value, **kwargs)
|
||||||
|
|
||||||
def compile(self) -> None:
|
def compile(self, **kwargs) -> None:
|
||||||
"""
|
"""
|
||||||
This will make the artworks ready for download
|
This will make the artworks ready for download and delete duplicates.
|
||||||
"""
|
"""
|
||||||
|
artwork_hashes: list = list()
|
||||||
|
artwork_urls: list = list()
|
||||||
for artwork in self._data:
|
for artwork in self._data:
|
||||||
for variants in artwork.variants:
|
index = 0
|
||||||
pass
|
for artwork_variant in artwork.variants:
|
||||||
pass
|
r = artwork_connection.get(
|
||||||
|
url=artwork_variant.url,
|
||||||
|
name=artwork_variant.url,
|
||||||
|
)
|
||||||
|
|
||||||
|
if artwork_variant.url in artwork_urls:
|
||||||
|
artwork.variants.pop(index)
|
||||||
|
continue
|
||||||
|
artwork_urls.append(artwork_variant.url)
|
||||||
|
|
||||||
|
target: Target = artwork_variant.target
|
||||||
|
with target.open("wb") as f:
|
||||||
|
f.write(r.content)
|
||||||
|
|
||||||
|
with Image.open(target.file_path) as img:
|
||||||
|
# https://stackoverflow.com/a/59476938/16804841
|
||||||
|
if img.mode != 'RGB':
|
||||||
|
img = img.convert('RGB')
|
||||||
|
|
||||||
|
try:
|
||||||
|
image_hash = imagehash.crop_resistant_hash(img)
|
||||||
|
except Exception as e:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if image_hash in artwork_hashes:
|
||||||
|
artwork.variants.pop(index)
|
||||||
|
target.delete()
|
||||||
|
continue
|
||||||
|
artwork_hashes.append(image_hash)
|
||||||
|
width, height = img.size
|
||||||
|
if width != height:
|
||||||
|
if width > height:
|
||||||
|
img = img.crop((width // 2 - height // 2, 0, width // 2 + height // 2, height))
|
||||||
|
else:
|
||||||
|
img = img.crop((0, height // 2 - width // 2, width, height // 2 + width // 2))
|
||||||
|
|
||||||
|
# resize the image to the preferred resolution
|
||||||
|
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
|
||||||
|
index =+ 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def __merge__(self, other: ArtworkCollection, **kwargs) -> None:
|
def __merge__(self, other: ArtworkCollection, **kwargs) -> None:
|
||||||
self.parent_artworks.update(other.parent_artworks)
|
self.parent_artworks.update(other.parent_artworks)
|
||||||
|
for other_artwork in other._data:
|
||||||
|
for other_variant in other_artwork.variants:
|
||||||
|
if self.__contains__(other_variant.url):
|
||||||
|
continue
|
||||||
|
self.append(ArtworkVariant(other_variant.url))
|
||||||
|
|
||||||
for key, value in other._variant_mapping.items():
|
|
||||||
if key not in self._variant_mapping:
|
|
||||||
self._variant_mapping[key] = value
|
|
||||||
|
|
||||||
def __hash__(self) -> int:
|
def __hash__(self) -> int:
|
||||||
return id(self)
|
return id(self)
|
||||||
@@ -165,21 +239,5 @@ class ArtworkCollection:
|
|||||||
def get_urls(self) -> Generator[str, None, None]:
|
def get_urls(self) -> Generator[str, None, None]:
|
||||||
yield from (artwork.url for artwork in self._data if artwork.url is not None)
|
yield from (artwork.url for artwork in self._data if artwork.url is not None)
|
||||||
|
|
||||||
"""
|
|
||||||
@property
|
|
||||||
def flat_empty(self) -> bool:
|
|
||||||
return len(self._variant_mapping.keys()) <= 0
|
|
||||||
|
|
||||||
def _get_best_from_list(self, artwork_variants: List[ArtworkVariant]) -> Optional[ArtworkVariant]:
|
|
||||||
return min(artwork_variants, key=lambda x: x["deviation"])
|
|
||||||
|
|
||||||
@property
|
|
||||||
def best_variant(self) -> ArtworkVariant:
|
|
||||||
if self.flat_empty:
|
|
||||||
return self._get_best_from_list([parent.best_variant for parent in self.parent_artworks])
|
|
||||||
return self._get_best_from_list(self._variant_mapping.values())
|
|
||||||
|
|
||||||
def get_variant_name(self, variant: ArtworkVariant) -> str:
|
|
||||||
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
|
|
||||||
"""
|
|
||||||
|
|
||||||
@@ -1,17 +1,17 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List, Tuple, TextIO, Union, Optional
|
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional, TextIO, Tuple, Union
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
from .parents import OuterProxy
|
from ..utils.config import logging_settings, main_settings
|
||||||
from ..utils.shared import HIGHEST_ID
|
from ..utils.shared import HIGHEST_ID
|
||||||
from ..utils.config import main_settings, logging_settings
|
|
||||||
from ..utils.string_processing import fit_to_file_system
|
from ..utils.string_processing import fit_to_file_system
|
||||||
|
from .parents import OuterProxy
|
||||||
|
|
||||||
LOGGER = logging.getLogger("target")
|
LOGGER = logging.getLogger("target")
|
||||||
|
|
||||||
@@ -31,7 +31,8 @@ class Target(OuterProxy):
|
|||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
|
def temp(cls, name: str = None, file_extension: Optional[str] = None) -> P:
|
||||||
|
name = name or str(random.randint(0, HIGHEST_ID))
|
||||||
if file_extension is not None:
|
if file_extension is not None:
|
||||||
name = f"{name}.{file_extension}"
|
name = f"{name}.{file_extension}"
|
||||||
|
|
||||||
@@ -117,3 +118,11 @@ class Target(OuterProxy):
|
|||||||
|
|
||||||
def read_bytes(self) -> bytes:
|
def read_bytes(self) -> bytes:
|
||||||
return self.file_path.read_bytes()
|
return self.file_path.read_bytes()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def raw_content(self) -> bytes:
|
||||||
|
return self.file_path.read_bytes()
|
||||||
|
|
||||||
|
@raw_content.setter
|
||||||
|
def raw_content(self, content: bytes):
|
||||||
|
self.file_path.write_bytes(content)
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import simplejson as json
|
||||||
|
from json_unescape import escape_json, unescape_json
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import List, Optional, Type
|
from typing import List, Optional, Type
|
||||||
from urllib.parse import urlencode, urlparse, urlunparse
|
from urllib.parse import urlencode, urlparse, urlunparse
|
||||||
@@ -268,7 +269,8 @@ class Genius(Page):
|
|||||||
# get the contents that are between `JSON.parse('` and `');`
|
# get the contents that are between `JSON.parse('` and `');`
|
||||||
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
|
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
|
||||||
if content is not None:
|
if content is not None:
|
||||||
content = content.replace("\\\\", "\\").replace('\\"', '"').replace("\\'", "'")
|
#IMPLEMENT FIX FROM HAZEL
|
||||||
|
content = escape_json(content)
|
||||||
data = json.loads(content)
|
data = json.loads(content)
|
||||||
|
|
||||||
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)
|
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)
|
||||||
|
|||||||
@@ -8,9 +8,10 @@ import pycountry
|
|||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..objects import (Album, Artist, ArtworkCollection, DatabaseObject,
|
from ..objects import (Album, Artist, DatabaseObject,
|
||||||
FormattedText, ID3Timestamp, Label, Lyrics, Song,
|
FormattedText, ID3Timestamp, Label, Lyrics, Song,
|
||||||
Source, Target)
|
Source, Target)
|
||||||
|
from ..objects.artwork import (Artwork, ArtworkVariant, ArtworkCollection)
|
||||||
from ..utils import shared, string_processing
|
from ..utils import shared, string_processing
|
||||||
from ..utils.config import logging_settings, main_settings
|
from ..utils.config import logging_settings, main_settings
|
||||||
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
|
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
|
||||||
@@ -1069,7 +1070,7 @@ class Musify(Page):
|
|||||||
gallery_body_content: BeautifulSoup = artwork_gallery.find(id="bodyContent")
|
gallery_body_content: BeautifulSoup = artwork_gallery.find(id="bodyContent")
|
||||||
gallery_image_element_list: List[BeautifulSoup] = gallery_body_content.find_all("img")
|
gallery_image_element_list: List[BeautifulSoup] = gallery_body_content.find_all("img")
|
||||||
for gallery_image_element in gallery_image_element_list:
|
for gallery_image_element in gallery_image_element_list:
|
||||||
artist.artwork.add_data(url=gallery_image_element.get("data-src", gallery_image_element.get("src")), width=247, heigth=247)
|
artist.artwork.append(ArtworkVariant(url=gallery_image_element.get("data-src", gallery_image_element.get("src")), width=247, heigth=247))
|
||||||
|
|
||||||
|
|
||||||
def fetch_artist(self, source: Source, **kwargs) -> Artist:
|
def fetch_artist(self, source: Source, **kwargs) -> Artist:
|
||||||
|
|||||||
@@ -441,7 +441,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
# fetch artist artwork
|
# fetch artist artwork
|
||||||
artist_thumbnails = musicImmersiveHeaderRenderer.get("thumbnail", {}).get("musicThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
|
artist_thumbnails = musicImmersiveHeaderRenderer.get("thumbnail", {}).get("musicThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
|
||||||
for artist_thumbnail in artist_thumbnails:
|
for artist_thumbnail in artist_thumbnails:
|
||||||
artist.artwork.append(**artist_thumbnail)
|
artist.artwork.append(artist_thumbnail)
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
for i, content in enumerate(renderer_list):
|
for i, content in enumerate(renderer_list):
|
||||||
@@ -493,7 +493,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
# album artwork
|
# album artwork
|
||||||
album_thumbnails = musicDetailHeaderRenderer.get("thumbnail", {}).get("croppedSquareThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
|
album_thumbnails = musicDetailHeaderRenderer.get("thumbnail", {}).get("croppedSquareThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
|
||||||
for album_thumbnail in album_thumbnails:
|
for album_thumbnail in album_thumbnails:
|
||||||
album.artwork.append(**album_thumbnail)
|
album.artwork.append(value=album_thumbnail)
|
||||||
|
|
||||||
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
|
||||||
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
|
||||||
|
|||||||
@@ -156,6 +156,7 @@ def create_dataclass_instance(t, data: dict):
|
|||||||
Tuple[Type, dict]: The created instance and a dict, containing the data, which was not used in the creation
|
Tuple[Type, dict]: The created instance and a dict, containing the data, which was not used in the creation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
data = {k: v for k, v in data.items() if hasattr(t, k)}
|
needed_data = {k: v for k, v in data.items() if k in t.__dataclass_fields__}
|
||||||
removed_data = {k: v for k, v in data.items() if not hasattr(t, k)}
|
removed_data = {k: v for k, v in data.items() if k not in t.__dataclass_fields__}
|
||||||
return t(**data), removed_data
|
|
||||||
|
return t(**needed_data), removed_data
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import re
|
||||||
import string
|
import string
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -238,4 +239,5 @@ def is_url(value: Any) -> bool:
|
|||||||
if not isinstance(value, str):
|
if not isinstance(value, str):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return re.match(URL_PATTERN, query) is not None
|
# value has to be a string
|
||||||
|
return re.match(URL_PATTERN, value) is not None
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
from dataclasses import dataclass, field
|
from __future__ import annotations
|
||||||
from typing import List, Tuple
|
|
||||||
|
|
||||||
from ...utils.config import main_settings, logging_settings
|
from dataclasses import dataclass, field
|
||||||
|
from typing import TYPE_CHECKING, List, Tuple
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ...objects import Target
|
||||||
|
|
||||||
|
from ...utils.config import logging_settings, main_settings
|
||||||
from ...utils.enums.colors import BColors
|
from ...utils.enums.colors import BColors
|
||||||
from ...objects import Target
|
|
||||||
|
|
||||||
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
|
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
|
||||||
UNIT_DIVISOR = 1024
|
UNIT_DIVISOR = 1024
|
||||||
|
|||||||
Reference in New Issue
Block a user