music-kraken-core/music_kraken/pages/abstract.py

360 lines
12 KiB
Python
Raw Normal View History

import logging
import random
2024-01-22 17:36:16 +00:00
import re
2023-04-03 08:38:12 +00:00
from copy import copy
2024-01-15 09:56:59 +00:00
from pathlib import Path
2024-05-10 15:06:40 +00:00
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
2023-06-15 09:28:35 +00:00
from string import Formatter
2024-05-10 15:06:40 +00:00
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup
2023-04-20 17:45:29 +00:00
from ..connection import Connection
2023-03-10 09:13:35 +00:00
from ..objects import (
2023-01-23 13:53:35 +00:00
Song,
Source,
Album,
Artist,
Target,
2023-03-24 14:58:21 +00:00
DatabaseObject,
2023-03-20 13:40:32 +00:00
Options,
2023-03-24 13:28:19 +00:00
Collection,
2023-04-04 08:20:54 +00:00
Label,
2023-01-23 13:53:35 +00:00
)
2023-04-18 10:00:25 +00:00
from ..utils.enums.source import SourcePages
2023-04-18 10:14:34 +00:00
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
2023-09-10 14:27:09 +00:00
from ..utils.config import main_settings
2023-10-23 14:21:44 +00:00
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
2023-09-13 18:02:36 +00:00
from ..utils.string_processing import fit_to_file_system
2024-05-10 15:33:07 +00:00
from ..utils import trace, output, BColors
2023-04-04 17:17:58 +00:00
2023-05-23 16:09:53 +00:00
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
2024-05-10 15:06:40 +00:00
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
2023-05-24 06:50:56 +00:00
2023-06-15 09:28:35 +00:00
class NamingDict(dict):
CUSTOM_KEYS: Dict[str, str] = {
"label": "label.name",
"artist": "artist.name",
"song": "song.title",
"isrc": "song.isrc",
"album": "album.title",
"album_type": "album.album_type_string"
}
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
super().__init__(values)
2023-09-10 14:27:09 +00:00
self["audio_format"] = main_settings["audio_format"]
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
def __getitem__(self, key: str) -> str:
2023-09-13 18:02:36 +00:00
return fit_to_file_system(super().__getitem__(key))
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
def __missing__(self, key: str) -> str:
if "." not in key:
if key not in self.CUSTOM_KEYS:
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
frag_list = key.split(".")
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
return str(value)
2024-01-15 11:48:36 +00:00
2023-06-15 09:28:35 +00:00
except AttributeError:
return self.default_value_for_name(attribute_name)
2023-06-12 17:46:46 +00:00
class Page:
2023-01-23 13:53:35 +00:00
"""
This is an abstract class, laying out the
functionality for every other class fetching something
"""
2024-05-13 19:45:12 +00:00
DOWNLOAD_PRIORITY: int = 0
2023-03-20 13:40:32 +00:00
SOURCE_TYPE: SourcePages
2023-05-23 14:21:12 +00:00
LOGGER = logging.getLogger("this shouldn't be used")
2024-01-15 11:48:36 +00:00
# set this to true, if all song details can also be fetched by fetching album details
NO_ADDITIONAL_DATA_FROM_SONG = False
2023-12-29 14:43:33 +00:00
2024-05-10 15:06:40 +00:00
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
2024-01-22 17:36:16 +00:00
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
2023-05-24 08:12:03 +00:00
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
2023-05-24 06:50:56 +00:00
return None
2024-01-15 11:48:36 +00:00
2023-05-24 06:50:56 +00:00
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
2023-05-23 14:21:12 +00:00
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
2023-05-23 14:21:12 +00:00
return r
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
r = []
2023-05-23 08:49:52 +00:00
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
2024-01-15 11:48:36 +00:00
2023-05-23 14:50:54 +00:00
return r
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
def label_search(self, label: Label) -> List[Label]:
return []
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
def artist_search(self, artist: Artist) -> List[Artist]:
return []
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
def album_search(self, album: Album) -> List[Album]:
2023-05-23 08:49:52 +00:00
return []
2024-01-15 11:48:36 +00:00
2023-05-23 14:21:12 +00:00
def song_search(self, song: Song) -> List[Song]:
return []
2023-01-23 13:53:35 +00:00
# to fetch stuff
2023-05-24 06:50:56 +00:00
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
2023-03-20 13:40:32 +00:00
2023-05-24 06:50:56 +00:00
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
2023-04-03 17:59:31 +00:00
2023-05-24 06:50:56 +00:00
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
2023-01-23 14:52:50 +00:00
2023-05-24 06:50:56 +00:00
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
2023-03-24 14:58:21 +00:00
def download(
self,
music_object: DatabaseObject,
genre: str,
) -> DownloadResult:
2023-06-15 09:28:35 +00:00
naming_dict: NamingDict = NamingDict({"genre": genre})
2024-01-15 11:48:36 +00:00
2023-05-25 07:21:37 +00:00
def fill_naming_objects(naming_music_object: DatabaseObject):
2023-06-15 09:28:35 +00:00
nonlocal naming_dict
2024-01-15 11:48:36 +00:00
2023-09-14 21:35:37 +00:00
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
2023-06-12 15:40:54 +00:00
collection: Collection = getattr(naming_music_object, collection_name)
2024-01-15 11:48:36 +00:00
2023-06-12 17:46:46 +00:00
if collection.empty:
2023-05-25 07:21:37 +00:00
continue
2024-02-28 13:27:35 +00:00
2023-05-25 07:21:37 +00:00
dom_ordered_music_object: DatabaseObject = collection[0]
2023-06-15 09:28:35 +00:00
naming_dict.add_object(dom_ordered_music_object)
2023-05-25 07:21:37 +00:00
return fill_naming_objects(dom_ordered_music_object)
2024-01-15 11:48:36 +00:00
2023-05-25 07:21:37 +00:00
fill_naming_objects(music_object)
2023-05-24 23:27:05 +00:00
2024-05-10 15:06:40 +00:00
return self._download(music_object, naming_dict)
2023-05-24 23:27:05 +00:00
def _download(
self,
music_object: DatabaseObject,
naming_dict: NamingDict,
2024-05-10 15:06:40 +00:00
**kwargs
) -> DownloadResult:
2024-05-10 15:33:07 +00:00
if isinstance(music_object, Song):
output(f"Downloading {music_object.option_string} to:", color=BColors.BOLD)
else:
output(f"Downloading {music_object.option_string}...", color=BColors.BOLD)
2024-01-15 11:48:36 +00:00
2023-05-24 23:27:05 +00:00
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
2024-05-10 15:06:40 +00:00
if not self.download_options.download_all and music_object.album_type in self.download_options.album_type_blacklist:
2023-05-25 09:21:39 +00:00
return DownloadResult()
2023-05-24 23:27:05 +00:00
if not (isinstance(music_object, Song) and self.NO_ADDITIONAL_DATA_FROM_SONG):
self.fetch_details(music_object=music_object, stop_at_level=1)
2024-01-15 11:48:36 +00:00
if isinstance(music_object, Album):
music_object.update_tracksort()
2023-06-15 09:28:35 +00:00
naming_dict.add_object(music_object)
2023-05-24 23:27:05 +00:00
if isinstance(music_object, Song):
2024-05-10 15:06:40 +00:00
return self._download_song(music_object, naming_dict)
2023-05-24 23:27:05 +00:00
2023-05-25 09:21:39 +00:00
download_result: DownloadResult = DownloadResult()
2023-05-24 23:27:05 +00:00
2023-09-14 21:35:37 +00:00
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
2023-06-12 17:46:46 +00:00
collection: Collection = getattr(music_object, collection_name)
2023-05-24 23:27:05 +00:00
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
2024-05-10 15:06:40 +00:00
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy()))
2023-05-24 23:27:05 +00:00
2023-05-25 09:21:39 +00:00
return download_result
2023-05-24 23:27:05 +00:00
2024-05-10 15:06:40 +00:00
def _download_song(self, song: Song, naming_dict: NamingDict):
song.compile()
2023-06-20 17:30:48 +00:00
if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre
if song.genre is None:
song.genre = naming_dict["genre"]
2023-09-10 14:27:09 +00:00
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
2023-05-25 07:21:37 +00:00
new_target = Target(
relative_to_music_dir=True,
2024-01-15 09:56:59 +00:00
file_path=Path(
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
2023-05-25 07:21:37 +00:00
)
2023-06-15 16:30:02 +00:00
2023-03-30 12:39:28 +00:00
if song.target_collection.empty:
2023-05-25 09:21:39 +00:00
song.target_collection.append(new_target)
2023-06-15 16:30:02 +00:00
2023-06-16 10:26:02 +00:00
r = DownloadResult(1)
2024-05-13 11:28:54 +00:00
temp_target: Target = Target.temp(file_extension=main_settings["audio_format"])
2023-06-15 16:30:02 +00:00
found_on_disc = False
target: Target
for target in song.target_collection:
2024-05-10 15:33:07 +00:00
current_exists = target.exists
if current_exists:
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
2024-05-10 15:06:40 +00:00
target.copy_content(temp_target)
2023-06-15 16:30:02 +00:00
found_on_disc = True
2024-01-15 11:48:36 +00:00
2023-06-16 10:26:02 +00:00
r.found_on_disk += 1
r.add_target(target)
2024-05-10 15:33:07 +00:00
else:
output(f'- {target.file_path}', color=BColors.GREY)
2024-01-15 11:48:36 +00:00
2024-05-10 15:33:07 +00:00
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
2023-06-15 16:30:02 +00:00
2024-05-07 11:59:29 +00:00
skip_intervals = []
2023-06-20 17:30:48 +00:00
if not found_on_disc:
2024-04-29 15:19:09 +00:00
for source in sources:
2024-05-10 15:33:07 +00:00
r = self.download_song_to_target(source=source, target=temp_target, desc="downloading")
2023-04-03 17:59:31 +00:00
2024-04-29 15:19:09 +00:00
if not r.is_fatal_error:
2024-05-07 11:59:29 +00:00
skip_intervals = self.get_skip_intervals(song, source)
2024-04-29 15:19:09 +00:00
break
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
2024-05-07 11:59:29 +00:00
interval_list=skip_intervals,
2024-05-10 15:06:40 +00:00
found_on_disc=found_on_disc,
2024-04-29 15:19:09 +00:00
))
2023-04-05 09:54:02 +00:00
2023-04-04 18:58:22 +00:00
return r
2024-01-15 11:48:36 +00:00
2024-05-10 15:06:40 +00:00
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
2024-05-10 15:33:07 +00:00
if not found_on_disc or self.download_options.process_audio_if_found:
2024-05-10 15:06:40 +00:00
correct_codec(temp_target, interval_list=interval_list)
2024-01-15 11:48:36 +00:00
self.post_process_hook(song, temp_target)
2024-01-15 11:48:36 +00:00
2024-05-10 15:33:07 +00:00
if not found_on_disc or self.download_options.process_metadata_if_found:
2024-05-10 15:06:40 +00:00
write_metadata_to_target(song.metadata, temp_target, song)
2023-04-03 17:59:31 +00:00
2023-04-05 10:25:57 +00:00
r = DownloadResult()
2023-03-30 14:50:27 +00:00
target: Target
for target in song.target_collection:
2023-04-05 10:25:57 +00:00
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
2024-01-15 11:48:36 +00:00
2023-05-25 09:21:39 +00:00
temp_target.delete()
2023-06-16 10:26:02 +00:00
r.sponsor_segments += len(interval_list)
2024-01-15 11:48:36 +00:00
2023-04-05 10:25:57 +00:00
return r
2024-01-15 11:48:36 +00:00
2023-06-15 07:58:48 +00:00
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
2024-01-15 11:48:36 +00:00
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
2024-01-15 11:48:36 +00:00
2023-05-25 09:21:39 +00:00
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
2023-04-04 18:58:22 +00:00
return DownloadResult()