music-kraken-core/src/music_kraken/pages/abstract.py

495 lines
17 KiB
Python
Raw Normal View History

import logging
import random
2023-04-03 08:38:12 +00:00
from copy import copy
2023-05-03 12:43:08 +00:00
from typing import Optional, Union, Type, Dict, Set, List
2023-05-23 14:21:12 +00:00
import threading
import requests
from bs4 import BeautifulSoup
2023-04-20 17:45:29 +00:00
from ..connection import Connection
2023-03-10 09:13:35 +00:00
from ..objects import (
2023-01-23 13:53:35 +00:00
Song,
Source,
Album,
Artist,
Target,
2023-03-24 14:58:21 +00:00
DatabaseObject,
2023-03-20 13:40:32 +00:00
Options,
2023-03-24 13:28:19 +00:00
Collection,
2023-04-04 08:20:54 +00:00
Label,
2023-01-23 13:53:35 +00:00
)
2023-04-18 10:00:25 +00:00
from ..utils.enums.source import SourcePages
2023-04-18 10:14:34 +00:00
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils import shared
2023-05-23 08:49:52 +00:00
from ..utils.support_classes import Query, DownloadResult, DefaultTarget
2023-04-04 17:17:58 +00:00
2023-05-23 16:09:53 +00:00
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
if type(music_object) == Label:
return _clean_label(label=music_object, collections=collections)
if type(music_object) == Artist:
return _clean_artist(artist=music_object, collections=collections)
if type(music_object) == Album:
return _clean_album(album=music_object, collections=collections)
if type(music_object) == Song:
return _clean_song(song=music_object, collections=collections)
def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]):
if collection.element_type not in collection_dict:
return
for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element, merge_into_existing=True)
collection[i] = r.current_element
if not r.was_the_same:
_clean_music_object(r.current_element, collection_dict)
def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(label.current_artist_collection, collections)
_clean_collection(label.album_collection, collections)
def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(artist.main_album_collection, collections)
_clean_collection(artist.feature_song_collection, collections)
_clean_collection(artist.label_collection, collections)
def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(album.label_collection, collections)
_clean_collection(album.song_collection, collections)
_clean_collection(album.artist_collection, collections)
def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(song.album_collection, collections)
_clean_collection(song.feature_artist_collection, collections)
_clean_collection(song.main_artist_collection, collections)
2023-05-23 14:21:12 +00:00
class Page(threading.Thread):
2023-01-23 13:53:35 +00:00
"""
This is an abstract class, laying out the
functionality for every other class fetching something
"""
2023-04-03 17:59:31 +00:00
2023-03-20 13:40:32 +00:00
SOURCE_TYPE: SourcePages
2023-05-23 14:21:12 +00:00
LOGGER = logging.getLogger("this shouldn't be used")
2023-05-23 08:49:52 +00:00
2023-05-23 14:21:12 +00:00
def __init__(self):
threading.Thread.__init__(self)
2023-05-23 16:09:53 +00:00
def run(self) -> None:
pass
2023-05-23 14:21:12 +00:00
@classmethod
def get_soup_from_response(cls, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
2023-05-23 14:21:12 +00:00
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if len(r) > 0:
return r
r = []
2023-05-23 08:49:52 +00:00
for default_query in query.default_search:
2023-05-23 14:50:54 +00:00
r.extend(self.general_search(default_query))
2023-05-23 08:49:52 +00:00
2023-05-23 14:50:54 +00:00
return r
2023-05-23 14:21:12 +00:00
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
2023-05-23 08:49:52 +00:00
return []
2023-05-23 14:21:12 +00:00
def song_search(self, song: Song) -> List[Song]:
return []
2023-01-23 13:53:35 +00:00
@classmethod
2023-03-24 14:58:21 +00:00
def fetch_details(cls, music_object: Union[Song, Album, Artist, Label], stop_at_level: int = 1) -> DatabaseObject:
2023-01-23 13:53:35 +00:00
"""
when a music object with laccing data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
If you for example put in an album, it fetches the tracklist
:param music_object:
2023-03-24 13:28:19 +00:00
:param stop_at_level:
This says the depth of the level the scraper will recurse to.
If this is for example set to 2, then the levels could be:
1. Level: the album
2. Level: every song of the album + every artist of the album
If no additional requests are needed to get the data one level below the supposed stop level
this gets ignored
2023-01-23 14:52:50 +00:00
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
2023-01-23 13:53:35 +00:00
"""
2023-04-03 17:59:31 +00:00
2023-03-24 14:58:21 +00:00
new_music_object: DatabaseObject = type(music_object)()
2023-03-29 09:34:58 +00:00
had_sources = False
2023-03-24 13:28:19 +00:00
source: Source
2023-03-27 16:41:50 +00:00
for source in music_object.source_collection.get_sources_from_page(cls.SOURCE_TYPE):
2023-04-03 17:59:31 +00:00
new_music_object.merge(
cls._fetch_object_from_source(source=source, obj_type=type(music_object), stop_at_level=stop_at_level))
2023-03-29 09:34:58 +00:00
had_sources = True
2023-04-03 17:59:31 +00:00
2023-03-29 09:34:58 +00:00
if not had_sources:
music_object.compile(merge_into=True)
return music_object
2023-01-23 13:53:35 +00:00
2023-03-24 14:58:21 +00:00
collections = {
Label: Collection(element_type=Label),
Artist: Collection(element_type=Artist),
Album: Collection(element_type=Album),
Song: Collection(element_type=Song)
}
2023-04-03 17:59:31 +00:00
2023-05-23 16:09:53 +00:00
if not isinstance(new_music_object, INDEPENDENT_DB_OBJECTS):
raise TypeError(f"Can't clean the object, because it isn't a valid type: {type(new_music_object)} | {type(INDEPENDENT_DB_OBJECTS)}")
2023-04-03 17:59:31 +00:00
2023-05-23 16:09:53 +00:00
_clean_music_object(new_music_object, collections)
2023-04-03 17:59:31 +00:00
2023-05-23 16:09:53 +00:00
music_object.merge(new_music_object)
music_object.compile(merge_into=True)
2023-01-23 14:52:50 +00:00
2023-03-24 13:28:19 +00:00
return music_object
2023-04-03 17:59:31 +00:00
2023-03-30 10:00:39 +00:00
@classmethod
2023-03-30 10:09:36 +00:00
def fetch_object_from_source(cls, source: Source, stop_at_level: int = 2):
2023-03-30 10:00:39 +00:00
obj_type = cls._get_type_of_url(source.url)
if obj_type is None:
2023-03-30 10:23:57 +00:00
return None
2023-04-03 17:59:31 +00:00
2023-03-30 10:09:36 +00:00
music_object = cls._fetch_object_from_source(source=source, obj_type=obj_type, stop_at_level=stop_at_level)
2023-04-03 17:59:31 +00:00
2023-03-30 10:09:36 +00:00
collections = {
Label: Collection(element_type=Label),
Artist: Collection(element_type=Artist),
Album: Collection(element_type=Album),
Song: Collection(element_type=Song)
}
2023-04-03 17:59:31 +00:00
cls._clean_music_object(music_object, collections)
2023-04-16 12:36:33 +00:00
2023-03-30 10:09:36 +00:00
music_object.compile(merge_into=True)
return music_object
2023-03-20 13:40:32 +00:00
2023-02-06 14:06:38 +00:00
@classmethod
2023-04-03 17:59:31 +00:00
def _fetch_object_from_source(cls, source: Source,
obj_type: Union[Type[Song], Type[Album], Type[Artist], Type[Label]],
stop_at_level: int = 1) -> Union[Song, Album, Artist, Label]:
2023-03-24 13:28:19 +00:00
if obj_type == Artist:
return cls._fetch_artist_from_source(source=source, stop_at_level=stop_at_level)
2023-04-03 17:59:31 +00:00
2023-03-24 13:28:19 +00:00
if obj_type == Song:
return cls._fetch_song_from_source(source=source, stop_at_level=stop_at_level)
2023-04-03 17:59:31 +00:00
2023-03-24 13:28:19 +00:00
if obj_type == Album:
return cls._fetch_album_from_source(source=source, stop_at_level=stop_at_level)
2023-04-03 17:59:31 +00:00
2023-03-24 13:28:19 +00:00
if obj_type == Label:
return cls._fetch_label_from_source(source=source, stop_at_level=stop_at_level)
2023-01-23 14:52:50 +00:00
2023-03-24 14:58:21 +00:00
2023-03-30 12:52:50 +00:00
@classmethod
2023-04-03 08:38:12 +00:00
def download(
2023-04-03 17:59:31 +00:00
cls,
music_object: Union[Song, Album, Artist, Label],
download_features: bool = True,
2023-04-04 08:20:54 +00:00
default_target: DefaultTarget = None,
2023-04-04 20:21:26 +00:00
genre: str = None,
2023-04-04 17:17:58 +00:00
override_existing: bool = False,
create_target_on_demand: bool = True,
2023-04-04 08:20:54 +00:00
download_all: bool = False,
2023-04-18 11:27:03 +00:00
exclude_album_type: Set[AlbumType] = shared.ALBUM_TYPE_BLACKLIST
2023-04-04 17:17:58 +00:00
) -> DownloadResult:
"""
2023-04-04 20:21:26 +00:00
:param genre: The downloader will download to THIS folder (set the value of default_target.genre to genre)
2023-04-04 17:17:58 +00:00
:param music_object:
:param download_features:
:param default_target:
:param override_existing:
:param create_target_on_demand:
:param download_all:
:param exclude_album_type:
:return total downloads, failed_downloads:
"""
2023-04-03 08:38:12 +00:00
if default_target is None:
default_target = DefaultTarget()
2023-04-04 20:21:26 +00:00
2023-04-04 08:20:54 +00:00
if download_all:
2023-04-04 20:21:26 +00:00
exclude_album_type: Set[AlbumType] = set()
elif exclude_album_type is None:
exclude_album_type = {
AlbumType.COMPILATION_ALBUM,
AlbumType.LIVE_ALBUM,
AlbumType.MIXTAPE
}
2023-04-03 17:59:31 +00:00
2023-03-30 12:52:50 +00:00
if type(music_object) is Song:
2023-04-04 17:17:58 +00:00
return cls.download_song(
music_object,
override_existing=override_existing,
2023-04-04 20:57:47 +00:00
create_target_on_demand=create_target_on_demand,
genre=genre
2023-04-04 17:17:58 +00:00
)
2023-03-30 12:52:50 +00:00
if type(music_object) is Album:
2023-04-04 20:21:26 +00:00
return cls.download_album(
music_object,
default_target=default_target,
2023-04-04 20:57:47 +00:00
override_existing=override_existing,
genre=genre
2023-04-04 20:21:26 +00:00
)
2023-03-30 12:52:50 +00:00
if type(music_object) is Artist:
2023-04-04 20:21:26 +00:00
return cls.download_artist(
music_object,
default_target=default_target,
download_features=download_features,
2023-04-04 20:57:47 +00:00
exclude_album_type=exclude_album_type,
genre=genre
2023-04-04 20:21:26 +00:00
)
2023-03-30 12:52:50 +00:00
if type(music_object) is Label:
2023-04-04 20:21:26 +00:00
return cls.download_label(
music_object,
download_features=download_features,
default_target=default_target,
exclude_album_type=exclude_album_type,
genre=genre
)
2023-04-03 17:59:31 +00:00
2023-04-04 17:17:58 +00:00
return DownloadResult(error_message=f"{type(music_object)} can't be downloaded.")
2023-04-03 17:59:31 +00:00
2023-03-30 12:52:50 +00:00
@classmethod
2023-04-04 17:17:58 +00:00
def download_label(
cls,
label: Label,
exclude_album_type: Set[AlbumType],
download_features: bool = True,
override_existing: bool = False,
2023-04-04 20:21:26 +00:00
default_target: DefaultTarget = None,
genre: str = None
2023-04-04 17:17:58 +00:00
) -> DownloadResult:
2023-04-04 17:37:50 +00:00
default_target = DefaultTarget() if default_target is None else copy(default_target)
default_target.label_object(label)
2023-04-03 17:59:31 +00:00
2023-04-04 17:17:58 +00:00
r = DownloadResult()
2023-03-31 08:34:29 +00:00
cls.fetch_details(label)
2023-03-30 12:52:50 +00:00
for artist in label.current_artist_collection:
2023-04-04 17:17:58 +00:00
r.merge(cls.download_artist(
artist,
download_features=download_features,
override_existing=override_existing,
default_target=default_target,
2023-04-04 20:21:26 +00:00
exclude_album_type=exclude_album_type,
genre=genre
2023-04-04 17:17:58 +00:00
))
2023-04-03 17:59:31 +00:00
2023-04-04 08:20:54 +00:00
album: Album
2023-03-30 12:52:50 +00:00
for album in label.album_collection:
2023-04-04 17:17:58 +00:00
if album.album_type == AlbumType.OTHER:
cls.fetch_details(album)
2023-04-04 08:20:54 +00:00
if album.album_type in exclude_album_type:
2023-04-04 17:17:58 +00:00
cls.LOGGER.info(f"Skipping {album.option_string} due to the filter. ({album.album_type})")
2023-04-04 08:20:54 +00:00
continue
2023-04-04 17:17:58 +00:00
r.merge(cls.download_album(
album,
override_existing=override_existing,
2023-04-04 20:21:26 +00:00
default_target=default_target,
genre=genre
2023-04-04 17:17:58 +00:00
))
return r
2023-03-30 12:52:50 +00:00
@classmethod
2023-04-04 17:17:58 +00:00
def download_artist(
cls,
artist: Artist,
exclude_album_type: Set[AlbumType],
download_features: bool = True,
override_existing: bool = False,
2023-04-04 20:21:26 +00:00
default_target: DefaultTarget = None,
genre: str = None
2023-04-04 17:17:58 +00:00
) -> DownloadResult:
2023-04-04 17:37:50 +00:00
default_target = DefaultTarget() if default_target is None else copy(default_target)
default_target.artist_object(artist)
2023-04-03 17:59:31 +00:00
2023-04-04 17:17:58 +00:00
r = DownloadResult()
2023-03-31 08:34:29 +00:00
cls.fetch_details(artist)
2023-04-04 17:17:58 +00:00
2023-04-04 08:20:54 +00:00
album: Album
2023-03-30 12:52:50 +00:00
for album in artist.main_album_collection:
2023-04-04 08:20:54 +00:00
if album.album_type in exclude_album_type:
2023-04-04 17:17:58 +00:00
cls.LOGGER.info(f"Skipping {album.option_string} due to the filter. ({album.album_type})")
2023-04-04 08:20:54 +00:00
continue
2023-04-04 20:21:26 +00:00
r.merge(cls.download_album(
album,
override_existing=override_existing,
default_target=default_target,
genre=genre
))
2023-04-03 17:59:31 +00:00
2023-03-30 12:52:50 +00:00
if download_features:
2023-03-31 08:34:29 +00:00
for song in artist.feature_album.song_collection:
2023-04-04 20:21:26 +00:00
r.merge(cls.download_song(
song,
override_existing=override_existing,
default_target=default_target,
genre=genre
))
2023-04-04 17:17:58 +00:00
return r
2023-03-30 12:52:50 +00:00
@classmethod
2023-04-04 17:17:58 +00:00
def download_album(
cls,
album: Album,
override_existing: bool = False,
2023-04-04 20:21:26 +00:00
default_target: DefaultTarget = None,
genre: str = None
2023-04-04 17:17:58 +00:00
) -> DownloadResult:
2023-04-04 17:37:50 +00:00
default_target = DefaultTarget() if default_target is None else copy(default_target)
default_target.album_object(album)
2023-04-03 17:59:31 +00:00
2023-04-04 17:17:58 +00:00
r = DownloadResult()
2023-03-31 08:34:29 +00:00
cls.fetch_details(album)
2023-04-03 17:59:31 +00:00
2023-04-03 10:38:09 +00:00
album.update_tracksort()
cls.LOGGER.info(f"downloading album: {album.title}")
2023-03-30 12:52:50 +00:00
for song in album.song_collection:
2023-04-04 20:21:26 +00:00
r.merge(cls.download_song(
song,
override_existing=override_existing,
default_target=default_target,
genre=genre
))
2023-04-04 17:17:58 +00:00
return r
2023-03-30 12:52:50 +00:00
2023-03-30 12:39:28 +00:00
@classmethod
2023-04-04 17:17:58 +00:00
def download_song(
cls,
song: Song,
override_existing: bool = False,
create_target_on_demand: bool = True,
2023-04-04 20:21:26 +00:00
default_target: DefaultTarget = None,
genre: str = None
2023-04-04 17:17:58 +00:00
) -> DownloadResult:
2023-04-04 20:21:26 +00:00
cls.LOGGER.debug(f"Setting genre of {song.option_string} to {genre}")
song.genre = genre
2023-04-03 17:59:31 +00:00
2023-04-04 17:37:50 +00:00
default_target = DefaultTarget() if default_target is None else copy(default_target)
default_target.song_object(song)
2023-04-03 17:59:31 +00:00
2023-03-31 08:34:29 +00:00
cls.fetch_details(song)
2023-04-03 17:59:31 +00:00
2023-03-30 12:39:28 +00:00
if song.target_collection.empty:
2023-03-31 08:34:29 +00:00
if create_target_on_demand and not song.main_artist_collection.empty and not song.album_collection.empty:
2023-04-03 08:38:12 +00:00
song.target_collection.append(default_target.target)
2023-03-31 08:34:29 +00:00
else:
2023-04-04 17:17:58 +00:00
return DownloadResult(error_message=f"No target exists for {song.title}, but create_target_on_demand is False.")
2023-04-03 17:59:31 +00:00
2023-03-30 13:28:23 +00:00
target: Target
if any(target.exists for target in song.target_collection) and not override_existing:
2023-04-05 10:25:57 +00:00
r = DownloadResult(total=1, fail=0)
2023-03-30 13:28:23 +00:00
existing_target: Target
for existing_target in song.target_collection:
if existing_target.exists:
2023-04-05 10:25:57 +00:00
r.merge(cls._post_process_targets(song=song, temp_target=existing_target))
2023-03-30 13:28:23 +00:00
break
2023-04-05 10:25:57 +00:00
return r
2023-04-03 17:59:31 +00:00
2023-03-30 12:39:28 +00:00
sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE)
if len(sources) == 0:
2023-04-04 17:17:58 +00:00
return DownloadResult(error_message=f"No source found for {song.title} as {cls.__name__}.")
2023-04-03 17:59:31 +00:00
2023-03-31 07:47:03 +00:00
temp_target: Target = Target(
path=shared.TEMP_DIR,
file=str(random.randint(0, 999999))
)
2023-04-03 17:59:31 +00:00
2023-04-04 18:58:22 +00:00
r = cls._download_song_to_targets(source=sources[0], target=temp_target, desc=song.title)
2023-04-03 17:59:31 +00:00
2023-04-04 19:18:56 +00:00
if not r.is_fatal_error:
2023-04-05 10:25:57 +00:00
r.merge(cls._post_process_targets(song, temp_target))
2023-04-05 09:54:02 +00:00
2023-04-04 18:58:22 +00:00
return r
2023-04-03 17:59:31 +00:00
@classmethod
2023-04-05 10:25:57 +00:00
def _post_process_targets(cls, song: Song, temp_target: Target) -> DownloadResult:
correct_codec(temp_target)
2023-03-30 14:50:27 +00:00
write_metadata_to_target(song.metadata, temp_target)
2023-04-03 17:59:31 +00:00
2023-04-05 10:25:57 +00:00
r = DownloadResult()
2023-03-30 14:50:27 +00:00
target: Target
for target in song.target_collection:
2023-04-05 10:25:57 +00:00
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
return r
2023-03-30 12:39:28 +00:00
2023-02-06 14:06:38 +00:00
@classmethod
def _fetch_song_from_source(cls, source: Source, stop_at_level: int = 1) -> Song:
return Song()
2023-04-03 17:59:31 +00:00
@classmethod
def _fetch_album_from_source(cls, source: Source, stop_at_level: int = 1) -> Album:
2023-03-24 13:28:19 +00:00
return Album()
2023-03-20 13:40:32 +00:00
@classmethod
def _fetch_artist_from_source(cls, source: Source, stop_at_level: int = 1) -> Artist:
2023-03-20 13:40:32 +00:00
return Artist()
2023-03-24 14:58:21 +00:00
@classmethod
def _fetch_label_from_source(cls, source: Source, stop_at_level: int = 1) -> Label:
2023-03-24 13:28:19 +00:00
return Label()
2023-03-30 10:00:39 +00:00
@classmethod
def _get_type_of_url(cls, url: str) -> Optional[Union[Type[Song], Type[Album], Type[Artist], Type[Label]]]:
return None
2023-04-03 17:59:31 +00:00
2023-03-30 12:39:28 +00:00
@classmethod
2023-04-04 18:58:22 +00:00
def _download_song_to_targets(cls, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()