301 lines
10 KiB
Python
301 lines
10 KiB
Python
import logging
|
|
import random
|
|
import re
|
|
from copy import copy
|
|
from pathlib import Path
|
|
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
|
|
from string import Formatter
|
|
from dataclasses import dataclass, field
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
|
|
from ..connection import Connection
|
|
from ..objects import (
|
|
Song,
|
|
Source,
|
|
Album,
|
|
Artist,
|
|
Target,
|
|
DatabaseObject,
|
|
Options,
|
|
Collection,
|
|
Label,
|
|
)
|
|
from ..utils.enums import SourceType
|
|
from ..utils.enums.album import AlbumType
|
|
from ..audio import write_metadata_to_target, correct_codec
|
|
from ..utils.config import main_settings
|
|
from ..utils.support_classes.query import Query
|
|
from ..utils.support_classes.download_result import DownloadResult
|
|
from ..utils.string_processing import fit_to_file_system
|
|
from ..utils import trace, output, BColors
|
|
|
|
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
|
|
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
|
|
|
|
@dataclass
|
|
class FetchOptions:
|
|
download_all: bool = False
|
|
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
|
|
|
|
@dataclass
|
|
class DownloadOptions:
|
|
download_all: bool = False
|
|
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
|
|
|
|
process_audio_if_found: bool = False
|
|
process_metadata_if_found: bool = True
|
|
|
|
class Page:
|
|
SOURCE_TYPE: SourceType
|
|
LOGGER: LOGGER
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
cls.SOURCE_TYPE.register_page(cls)
|
|
cls.LOGGER = logging.getLogger(cls.__name__)
|
|
|
|
return super().__new__(cls)
|
|
|
|
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
|
|
self.download_options: DownloadOptions = download_options or DownloadOptions()
|
|
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
|
|
|
|
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
|
|
"""
|
|
Perform a regex search on the given string, using a single or a list of
|
|
patterns returning the first matching group.
|
|
In case of failure return a default value or raise a WARNING or a
|
|
RegexNotFoundError, depending on fatal, specifying the field name.
|
|
"""
|
|
|
|
if isinstance(pattern, str):
|
|
mobj = re.search(pattern, string, flags)
|
|
else:
|
|
for p in pattern:
|
|
mobj = re.search(p, string, flags)
|
|
if mobj:
|
|
break
|
|
|
|
if mobj:
|
|
if group is None:
|
|
# return the first matching group
|
|
return next(g for g in mobj.groups() if g is not None)
|
|
elif isinstance(group, (list, tuple)):
|
|
return tuple(mobj.group(g) for g in group)
|
|
else:
|
|
return mobj.group(group)
|
|
|
|
return default
|
|
|
|
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
|
return None
|
|
|
|
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
|
|
return BeautifulSoup(r.content, "html.parser")
|
|
|
|
# to search stuff
|
|
def search(self, query: Query) -> List[DatabaseObject]:
|
|
music_object = query.music_object
|
|
|
|
search_functions = {
|
|
Song: self.song_search,
|
|
Album: self.album_search,
|
|
Artist: self.artist_search,
|
|
Label: self.label_search
|
|
}
|
|
|
|
if type(music_object) in search_functions:
|
|
r = search_functions[type(music_object)](music_object)
|
|
if r is not None and len(r) > 0:
|
|
return r
|
|
|
|
r = []
|
|
for default_query in query.default_search:
|
|
for single_option in self.general_search(default_query):
|
|
r.append(single_option)
|
|
|
|
return r
|
|
|
|
def general_search(self, search_query: str) -> List[DatabaseObject]:
|
|
return []
|
|
|
|
def label_search(self, label: Label) -> List[Label]:
|
|
return []
|
|
|
|
def artist_search(self, artist: Artist) -> List[Artist]:
|
|
return []
|
|
|
|
def album_search(self, album: Album) -> List[Album]:
|
|
return []
|
|
|
|
def song_search(self, song: Song) -> List[Song]:
|
|
return []
|
|
|
|
# to fetch stuff
|
|
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
|
return Song()
|
|
|
|
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
|
return Album()
|
|
|
|
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
|
return Artist()
|
|
|
|
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
|
|
return Label()
|
|
|
|
def download(
|
|
self,
|
|
music_object: DatabaseObject,
|
|
genre: str,
|
|
) -> DownloadResult:
|
|
naming_dict: NamingDict = NamingDict({"genre": genre})
|
|
|
|
def fill_naming_objects(naming_music_object: DatabaseObject):
|
|
nonlocal naming_dict
|
|
|
|
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
|
|
collection: Collection = getattr(naming_music_object, collection_name)
|
|
|
|
if collection.empty:
|
|
continue
|
|
|
|
dom_ordered_music_object: DatabaseObject = collection[0]
|
|
naming_dict.add_object(dom_ordered_music_object)
|
|
return fill_naming_objects(dom_ordered_music_object)
|
|
|
|
fill_naming_objects(music_object)
|
|
|
|
return self._download(music_object, naming_dict)
|
|
|
|
def _download(
|
|
self,
|
|
music_object: DatabaseObject,
|
|
naming_dict: NamingDict,
|
|
**kwargs
|
|
) -> DownloadResult:
|
|
if isinstance(music_object, Song):
|
|
output(f"Downloading {music_object.option_string} to:", color=BColors.BOLD)
|
|
else:
|
|
output(f"Downloading {music_object.option_string}...", color=BColors.BOLD)
|
|
|
|
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
|
|
if isinstance(music_object, Album):
|
|
if not self.download_options.download_all and music_object.album_type in self.download_options.album_type_blacklist:
|
|
return DownloadResult()
|
|
|
|
if not (isinstance(music_object, Song) and self.NO_ADDITIONAL_DATA_FROM_SONG):
|
|
self.fetch_details(music_object=music_object, stop_at_level=1)
|
|
|
|
if isinstance(music_object, Album):
|
|
music_object.update_tracksort()
|
|
|
|
naming_dict.add_object(music_object)
|
|
|
|
if isinstance(music_object, Song):
|
|
return self._download_song(music_object, naming_dict)
|
|
|
|
download_result: DownloadResult = DownloadResult()
|
|
|
|
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
|
collection: Collection = getattr(music_object, collection_name)
|
|
|
|
sub_ordered_music_object: DatabaseObject
|
|
for sub_ordered_music_object in collection:
|
|
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy()))
|
|
|
|
return download_result
|
|
|
|
def _download_song(self, song: Song, naming_dict: NamingDict):
|
|
song.compile()
|
|
if "genre" not in naming_dict and song.genre is not None:
|
|
naming_dict["genre"] = song.genre
|
|
|
|
if song.genre is None:
|
|
song.genre = naming_dict["genre"]
|
|
|
|
path_parts = Formatter().parse(main_settings["download_path"])
|
|
file_parts = Formatter().parse(main_settings["download_file"])
|
|
new_target = Target(
|
|
relative_to_music_dir=True,
|
|
file_path=Path(
|
|
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
|
|
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
|
|
)
|
|
)
|
|
|
|
if song.target_collection.empty:
|
|
song.target_collection.append(new_target)
|
|
|
|
r = DownloadResult(1)
|
|
temp_target: Target = Target.temp(file_extension=main_settings["audio_format"])
|
|
|
|
found_on_disc = False
|
|
target: Target
|
|
for target in song.target_collection:
|
|
current_exists = target.exists
|
|
|
|
if current_exists:
|
|
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
|
target.copy_content(temp_target)
|
|
found_on_disc = True
|
|
|
|
r.found_on_disk += 1
|
|
r.add_target(target)
|
|
else:
|
|
output(f'- {target.file_path}', color=BColors.GREY)
|
|
|
|
|
|
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
|
|
|
|
skip_intervals = []
|
|
if not found_on_disc:
|
|
for source in sources:
|
|
r = self.download_song_to_target(source=source, target=temp_target, desc="downloading")
|
|
|
|
if not r.is_fatal_error:
|
|
skip_intervals = self.get_skip_intervals(song, source)
|
|
break
|
|
|
|
if temp_target.exists:
|
|
r.merge(self._post_process_targets(
|
|
song=song,
|
|
temp_target=temp_target,
|
|
interval_list=skip_intervals,
|
|
found_on_disc=found_on_disc,
|
|
))
|
|
|
|
return r
|
|
|
|
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
|
|
if not found_on_disc or self.download_options.process_audio_if_found:
|
|
correct_codec(temp_target, interval_list=interval_list)
|
|
|
|
self.post_process_hook(song, temp_target)
|
|
|
|
if not found_on_disc or self.download_options.process_metadata_if_found:
|
|
write_metadata_to_target(song.metadata, temp_target, song)
|
|
|
|
r = DownloadResult()
|
|
|
|
target: Target
|
|
for target in song.target_collection:
|
|
if temp_target is not target:
|
|
temp_target.copy_content(target)
|
|
r.add_target(target)
|
|
|
|
temp_target.delete()
|
|
r.sponsor_segments += len(interval_list)
|
|
|
|
return r
|
|
|
|
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
|
|
return []
|
|
|
|
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
|
|
pass
|
|
|
|
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
|
return DownloadResult()
|