2023-03-30 12:39:28 +00:00
|
|
|
from typing import Optional, Union, Type, Dict, List
|
2023-03-27 14:28:34 +00:00
|
|
|
from bs4 import BeautifulSoup
|
2023-03-17 17:16:06 +00:00
|
|
|
import requests
|
|
|
|
import logging
|
|
|
|
|
|
|
|
from ..utils import shared
|
2023-03-10 09:13:35 +00:00
|
|
|
from ..objects import (
|
2023-01-23 13:53:35 +00:00
|
|
|
Song,
|
|
|
|
Source,
|
|
|
|
Album,
|
|
|
|
Artist,
|
|
|
|
Lyrics,
|
|
|
|
Target,
|
2023-03-24 14:58:21 +00:00
|
|
|
DatabaseObject,
|
2023-03-20 13:40:32 +00:00
|
|
|
Options,
|
2023-03-21 11:46:32 +00:00
|
|
|
SourcePages,
|
2023-03-24 13:28:19 +00:00
|
|
|
Collection,
|
|
|
|
Label
|
2023-01-23 13:53:35 +00:00
|
|
|
)
|
2023-03-30 13:58:29 +00:00
|
|
|
from ..tagging import write_metadata
|
2023-01-23 13:53:35 +00:00
|
|
|
|
2023-03-24 14:58:21 +00:00
|
|
|
LOGGER = logging.getLogger("this shouldn't be used")
|
|
|
|
|
2023-01-23 13:53:35 +00:00
|
|
|
|
|
|
|
class Page:
|
|
|
|
"""
|
|
|
|
This is an abstract class, laying out the
|
|
|
|
functionality for every other class fetching something
|
|
|
|
"""
|
2023-03-17 17:16:06 +00:00
|
|
|
API_SESSION: requests.Session = requests.Session()
|
|
|
|
API_SESSION.proxies = shared.proxies
|
|
|
|
TIMEOUT = 5
|
|
|
|
TRIES = 5
|
2023-03-20 13:40:32 +00:00
|
|
|
|
|
|
|
SOURCE_TYPE: SourcePages
|
2023-03-17 17:16:06 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_request(cls, url: str, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[
|
2023-03-27 14:20:16 +00:00
|
|
|
requests.Response]:
|
2023-03-27 16:41:50 +00:00
|
|
|
retry = False
|
2023-03-17 17:16:06 +00:00
|
|
|
try:
|
|
|
|
r = cls.API_SESSION.get(url, timeout=cls.TIMEOUT)
|
|
|
|
except requests.exceptions.Timeout:
|
2023-03-27 16:41:50 +00:00
|
|
|
retry = True
|
2023-03-17 17:16:06 +00:00
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
if not retry and r.status_code in accepted_response_codes:
|
2023-03-17 17:16:06 +00:00
|
|
|
return r
|
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at GET:{url}. ({trie}-{cls.TRIES})")
|
2023-03-17 17:16:06 +00:00
|
|
|
LOGGER.debug(r.content)
|
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
if trie >= cls.TRIES:
|
2023-03-17 17:16:06 +00:00
|
|
|
LOGGER.warning("to many tries. Aborting.")
|
2023-03-27 16:41:50 +00:00
|
|
|
return None
|
2023-03-17 17:16:06 +00:00
|
|
|
|
|
|
|
return cls.get_request(url, accepted_response_codes, trie + 1)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def post_request(cls, url: str, json: dict, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[
|
2023-03-27 14:20:16 +00:00
|
|
|
requests.Response]:
|
2023-03-27 16:41:50 +00:00
|
|
|
retry = False
|
2023-03-17 17:16:06 +00:00
|
|
|
try:
|
|
|
|
r = cls.API_SESSION.post(url, json=json, timeout=cls.TIMEOUT)
|
|
|
|
except requests.exceptions.Timeout:
|
2023-03-27 16:41:50 +00:00
|
|
|
retry = True
|
2023-03-17 17:16:06 +00:00
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
if not retry and r.status_code in accepted_response_codes:
|
2023-03-17 17:16:06 +00:00
|
|
|
return r
|
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at POST:{url}. ({trie}-{cls.TRIES})")
|
2023-03-17 17:16:06 +00:00
|
|
|
LOGGER.debug(r.content)
|
|
|
|
|
2023-03-27 16:41:50 +00:00
|
|
|
if trie >= cls.TRIES:
|
2023-03-17 17:16:06 +00:00
|
|
|
LOGGER.warning("to many tries. Aborting.")
|
2023-03-27 16:41:50 +00:00
|
|
|
return None
|
2023-03-17 17:16:06 +00:00
|
|
|
|
|
|
|
return cls.post_request(url, accepted_response_codes, trie + 1)
|
|
|
|
|
2023-03-27 14:28:34 +00:00
|
|
|
@classmethod
|
|
|
|
def get_soup_from_response(cls, r: requests.Response) -> BeautifulSoup:
|
|
|
|
return BeautifulSoup(r.content, "html.parser")
|
|
|
|
|
2023-01-23 23:16:10 +00:00
|
|
|
class Query:
|
|
|
|
def __init__(self, query: str):
|
|
|
|
self.query = query
|
|
|
|
self.is_raw = False
|
|
|
|
|
|
|
|
self.artist = None
|
|
|
|
self.album = None
|
|
|
|
self.song = None
|
|
|
|
|
|
|
|
self.parse_query(query=query)
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
if self.is_raw:
|
|
|
|
return self.query
|
|
|
|
return f"{self.artist}; {self.album}; {self.song}"
|
|
|
|
|
|
|
|
def parse_query(self, query: str):
|
|
|
|
if not '#' in query:
|
|
|
|
self.is_raw = True
|
|
|
|
return
|
|
|
|
|
|
|
|
query = query.strip()
|
|
|
|
parameters = query.split('#')
|
|
|
|
parameters.remove('')
|
|
|
|
|
|
|
|
for parameter in parameters:
|
|
|
|
splitted = parameter.split(" ")
|
|
|
|
type_ = splitted[0]
|
|
|
|
input_ = " ".join(splitted[1:]).strip()
|
|
|
|
|
|
|
|
if type_ == "a":
|
|
|
|
self.artist = input_
|
|
|
|
continue
|
|
|
|
if type_ == "r":
|
|
|
|
self.album = input_
|
|
|
|
continue
|
|
|
|
if type_ == "t":
|
2023-01-24 09:51:41 +00:00
|
|
|
self.song = input_
|
2023-01-23 23:16:10 +00:00
|
|
|
continue
|
|
|
|
|
2023-01-24 09:51:41 +00:00
|
|
|
def get_str(self, string):
|
|
|
|
if string is None:
|
|
|
|
return ""
|
|
|
|
return string
|
|
|
|
|
|
|
|
artist_str = property(fget=lambda self: self.get_str(self.artist))
|
|
|
|
album_str = property(fget=lambda self: self.get_str(self.album))
|
|
|
|
song_str = property(fget=lambda self: self.get_str(self.song))
|
|
|
|
|
2023-01-23 13:53:35 +00:00
|
|
|
@classmethod
|
2023-03-17 17:16:06 +00:00
|
|
|
def search_by_query(cls, query: str) -> Options:
|
2023-01-23 13:53:35 +00:00
|
|
|
"""
|
|
|
|
# The Query
|
|
|
|
You can define a new parameter with "#",
|
|
|
|
the letter behind it defines the *type* of parameter, followed by a space
|
|
|
|
"#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop"
|
|
|
|
if no # is in the query it gets treated as "unspecified query"
|
|
|
|
|
|
|
|
# Functionality
|
|
|
|
Returns the best matches from this page for the query, passed in.
|
|
|
|
|
|
|
|
:param query:
|
|
|
|
:return possible_music_objects:
|
|
|
|
"""
|
|
|
|
|
2023-03-13 13:39:46 +00:00
|
|
|
return Options()
|
2023-01-23 13:53:35 +00:00
|
|
|
|
|
|
|
@classmethod
|
2023-03-24 14:58:21 +00:00
|
|
|
def fetch_details(cls, music_object: Union[Song, Album, Artist, Label], stop_at_level: int = 1) -> DatabaseObject:
|
2023-01-23 13:53:35 +00:00
|
|
|
"""
|
|
|
|
when a music object with laccing data is passed in, it returns
|
|
|
|
the SAME object **(no copy)** with more detailed data.
|
|
|
|
If you for example put in an album, it fetches the tracklist
|
|
|
|
|
|
|
|
:param music_object:
|
2023-03-24 13:28:19 +00:00
|
|
|
:param stop_at_level:
|
|
|
|
This says the depth of the level the scraper will recurse to.
|
|
|
|
If this is for example set to 2, then the levels could be:
|
|
|
|
1. Level: the album
|
|
|
|
2. Level: every song of the album + every artist of the album
|
|
|
|
If no additional requests are needed to get the data one level below the supposed stop level
|
|
|
|
this gets ignored
|
2023-01-23 14:52:50 +00:00
|
|
|
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
|
2023-01-23 13:53:35 +00:00
|
|
|
"""
|
2023-03-24 13:28:19 +00:00
|
|
|
|
2023-03-24 14:58:21 +00:00
|
|
|
new_music_object: DatabaseObject = type(music_object)()
|
|
|
|
|
2023-03-29 09:34:58 +00:00
|
|
|
had_sources = False
|
|
|
|
|
2023-03-24 13:28:19 +00:00
|
|
|
source: Source
|
2023-03-27 16:41:50 +00:00
|
|
|
for source in music_object.source_collection.get_sources_from_page(cls.SOURCE_TYPE):
|
2023-03-24 14:58:21 +00:00
|
|
|
new_music_object.merge(cls._fetch_object_from_source(source=source, obj_type=type(music_object), stop_at_level=stop_at_level))
|
2023-03-29 09:34:58 +00:00
|
|
|
had_sources = True
|
|
|
|
|
|
|
|
if not had_sources:
|
|
|
|
music_object.compile(merge_into=True)
|
|
|
|
return music_object
|
2023-01-23 13:53:35 +00:00
|
|
|
|
2023-03-24 14:58:21 +00:00
|
|
|
collections = {
|
|
|
|
Label: Collection(element_type=Label),
|
|
|
|
Artist: Collection(element_type=Artist),
|
|
|
|
Album: Collection(element_type=Album),
|
|
|
|
Song: Collection(element_type=Song)
|
|
|
|
}
|
2023-01-23 14:52:50 +00:00
|
|
|
|
2023-03-24 14:58:21 +00:00
|
|
|
cls._clean_music_object(new_music_object, collections)
|
2023-03-24 13:28:19 +00:00
|
|
|
|
2023-03-28 08:45:41 +00:00
|
|
|
music_object.merge(new_music_object)
|
|
|
|
|
|
|
|
music_object.compile(merge_into=True)
|
2023-01-23 14:52:50 +00:00
|
|
|
|
2023-03-24 13:28:19 +00:00
|
|
|
return music_object
|
2023-03-30 10:00:39 +00:00
|
|
|
|
|
|
|
@classmethod
|
2023-03-30 10:09:36 +00:00
|
|
|
def fetch_object_from_source(cls, source: Source, stop_at_level: int = 2):
|
2023-03-30 10:00:39 +00:00
|
|
|
obj_type = cls._get_type_of_url(source.url)
|
|
|
|
if obj_type is None:
|
2023-03-30 10:23:57 +00:00
|
|
|
return None
|
2023-03-30 10:00:39 +00:00
|
|
|
|
2023-03-30 10:09:36 +00:00
|
|
|
music_object = cls._fetch_object_from_source(source=source, obj_type=obj_type, stop_at_level=stop_at_level)
|
|
|
|
|
|
|
|
collections = {
|
|
|
|
Label: Collection(element_type=Label),
|
|
|
|
Artist: Collection(element_type=Artist),
|
|
|
|
Album: Collection(element_type=Album),
|
|
|
|
Song: Collection(element_type=Song)
|
|
|
|
}
|
|
|
|
|
|
|
|
cls._clean_music_object(music_object, collections)
|
|
|
|
music_object.compile(merge_into=True)
|
|
|
|
return music_object
|
2023-03-30 10:00:39 +00:00
|
|
|
|
2023-03-20 13:40:32 +00:00
|
|
|
|
2023-02-06 14:06:38 +00:00
|
|
|
@classmethod
|
2023-03-30 12:52:50 +00:00
|
|
|
def _fetch_object_from_source(cls, source: Source, obj_type: Union[Type[Song], Type[Album], Type[Artist], Type[Label]], stop_at_level: int = 1) -> Union[Song, Album, Artist, Label]:
|
2023-03-24 13:28:19 +00:00
|
|
|
if obj_type == Artist:
|
2023-03-27 14:28:34 +00:00
|
|
|
return cls._fetch_artist_from_source(source=source, stop_at_level=stop_at_level)
|
2023-01-23 14:52:50 +00:00
|
|
|
|
2023-03-24 13:28:19 +00:00
|
|
|
if obj_type == Song:
|
2023-03-27 14:28:34 +00:00
|
|
|
return cls._fetch_song_from_source(source=source, stop_at_level=stop_at_level)
|
2023-03-20 13:40:32 +00:00
|
|
|
|
2023-03-24 13:28:19 +00:00
|
|
|
if obj_type == Album:
|
2023-03-27 14:28:34 +00:00
|
|
|
return cls._fetch_album_from_source(source=source, stop_at_level=stop_at_level)
|
2023-03-24 13:28:19 +00:00
|
|
|
|
|
|
|
if obj_type == Label:
|
2023-03-27 14:28:34 +00:00
|
|
|
return cls._fetch_label_from_source(source=source, stop_at_level=stop_at_level)
|
2023-01-23 14:52:50 +00:00
|
|
|
|
2023-03-24 14:58:21 +00:00
|
|
|
@classmethod
|
|
|
|
def _clean_music_object(cls, music_object: Union[Label, Album, Artist, Song], collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
if type(music_object) == Label:
|
|
|
|
return cls._clean_label(label=music_object, collections=collections)
|
|
|
|
if type(music_object) == Artist:
|
|
|
|
return cls._clean_artist(artist=music_object, collections=collections)
|
|
|
|
if type(music_object) == Album:
|
|
|
|
return cls._clean_album(album=music_object, collections=collections)
|
|
|
|
if type(music_object) == Song:
|
|
|
|
return cls._clean_song(song=music_object, collections=collections)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _clean_collection(cls, collection: Collection, collection_dict: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
if collection.element_type not in collection_dict:
|
|
|
|
return
|
|
|
|
|
|
|
|
for i, element in enumerate(collection):
|
|
|
|
r = collection_dict[collection.element_type].append(element)
|
|
|
|
if not r.was_in_collection:
|
|
|
|
cls._clean_music_object(r.current_element, collection_dict)
|
|
|
|
continue
|
|
|
|
|
|
|
|
collection[i] = r.current_element
|
|
|
|
cls._clean_music_object(r.current_element, collection_dict)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _clean_label(cls, label: Label, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
cls._clean_collection(label.current_artist_collection, collections)
|
|
|
|
cls._clean_collection(label.album_collection, collections)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _clean_artist(cls, artist: Artist, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
cls._clean_collection(artist.main_album_collection, collections)
|
|
|
|
cls._clean_collection(artist.feature_song_collection, collections)
|
|
|
|
cls._clean_collection(artist.label_collection, collections)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _clean_album(cls, album: Album, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
cls._clean_collection(album.label_collection, collections)
|
|
|
|
cls._clean_collection(album.song_collection, collections)
|
|
|
|
cls._clean_collection(album.artist_collection, collections)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _clean_song(cls, song: Song, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
|
|
|
|
cls._clean_collection(song.album_collection, collections)
|
|
|
|
cls._clean_collection(song.feature_artist_collection, collections)
|
|
|
|
cls._clean_collection(song.main_artist_collection, collections)
|
|
|
|
|
2023-03-30 12:52:50 +00:00
|
|
|
@classmethod
|
|
|
|
def download(cls, music_object: Union[Song, Album, Artist, Label], download_features: bool = True):
|
|
|
|
if type(music_object) is Song:
|
|
|
|
return cls.download_song(music_object)
|
|
|
|
if type(music_object) is Album:
|
|
|
|
return cls.download_album(music_object)
|
|
|
|
if type(music_object) is Artist:
|
|
|
|
return cls.download_artist(music_object, download_features=download_features)
|
|
|
|
if type(music_object) is Label:
|
|
|
|
return cls.download_label(music_object, download_features=download_features)
|
|
|
|
|
|
|
|
@classmethod
|
2023-03-30 13:58:29 +00:00
|
|
|
def download_label(cls, label: Label, download_features: bool = True, override_existing: bool = False):
|
2023-03-30 12:52:50 +00:00
|
|
|
for artist in label.current_artist_collection:
|
2023-03-30 13:58:29 +00:00
|
|
|
cls.download_artist(artist, download_features=download_features, override_existing=override_existing)
|
2023-03-30 12:52:50 +00:00
|
|
|
|
|
|
|
for album in label.album_collection:
|
2023-03-30 13:58:29 +00:00
|
|
|
cls.download_album(album, override_existing=override_existing)
|
2023-03-30 12:52:50 +00:00
|
|
|
|
|
|
|
@classmethod
|
2023-03-30 13:58:29 +00:00
|
|
|
def download_artist(cls, artist: Artist, download_features: bool = True, override_existing: bool = False):
|
2023-03-30 12:52:50 +00:00
|
|
|
for album in artist.main_album_collection:
|
2023-03-30 13:58:29 +00:00
|
|
|
cls.download_album(album, override_existing=override_existing)
|
2023-03-30 12:52:50 +00:00
|
|
|
|
|
|
|
if download_features:
|
|
|
|
for song in artist.feature_album:
|
2023-03-30 13:58:29 +00:00
|
|
|
cls.download_song(song, override_existing=override_existing)
|
2023-03-30 12:52:50 +00:00
|
|
|
|
|
|
|
@classmethod
|
2023-03-30 13:58:29 +00:00
|
|
|
def download_album(cls, album: Album, override_existing: bool = False):
|
2023-03-30 12:52:50 +00:00
|
|
|
for song in album.song_collection:
|
2023-03-30 13:58:29 +00:00
|
|
|
cls.download_song(song, override_existing=override_existing)
|
2023-03-30 12:52:50 +00:00
|
|
|
|
2023-03-30 12:39:28 +00:00
|
|
|
@classmethod
|
2023-03-30 13:58:29 +00:00
|
|
|
def download_song(cls, song: Song, override_existing: bool = False):
|
2023-03-30 12:39:28 +00:00
|
|
|
if song.target_collection.empty:
|
|
|
|
return
|
|
|
|
|
2023-03-30 13:28:23 +00:00
|
|
|
target: Target
|
2023-03-30 13:58:29 +00:00
|
|
|
if any(target.exists for target in song.target_collection) and not override_existing:
|
2023-03-30 13:28:23 +00:00
|
|
|
existing_target: Target
|
|
|
|
for existing_target in song.target_collection:
|
|
|
|
if existing_target.exists:
|
|
|
|
break
|
|
|
|
|
|
|
|
for target in song.target_collection:
|
|
|
|
if target is existing_target:
|
|
|
|
continue
|
|
|
|
|
|
|
|
existing_target.copy_content(target)
|
|
|
|
|
2023-03-30 12:39:28 +00:00
|
|
|
sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE)
|
|
|
|
if len(sources) == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
cls._download_song_to_targets(source=sources[0], target_list=song.target_collection.shallow_list)
|
2023-03-30 13:58:29 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _post_process_targets(cls, song: Song):
|
|
|
|
write_metadata(song)
|
2023-03-30 12:39:28 +00:00
|
|
|
|
|
|
|
|
2023-02-06 14:06:38 +00:00
|
|
|
@classmethod
|
2023-03-27 14:28:34 +00:00
|
|
|
def _fetch_song_from_source(cls, source: Source, stop_at_level: int = 1) -> Song:
|
|
|
|
return Song()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _fetch_album_from_source(cls, source: Source, stop_at_level: int = 1) -> Album:
|
2023-03-24 13:28:19 +00:00
|
|
|
return Album()
|
2023-03-20 13:40:32 +00:00
|
|
|
|
|
|
|
@classmethod
|
2023-03-27 14:28:34 +00:00
|
|
|
def _fetch_artist_from_source(cls, source: Source, stop_at_level: int = 1) -> Artist:
|
2023-03-20 13:40:32 +00:00
|
|
|
return Artist()
|
2023-03-24 14:58:21 +00:00
|
|
|
|
2023-03-27 14:28:34 +00:00
|
|
|
@classmethod
|
|
|
|
def _fetch_label_from_source(cls, source: Source, stop_at_level: int = 1) -> Label:
|
2023-03-24 13:28:19 +00:00
|
|
|
return Label()
|
2023-03-30 10:00:39 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _get_type_of_url(cls, url: str) -> Optional[Union[Type[Song], Type[Album], Type[Artist], Type[Label]]]:
|
|
|
|
return None
|
2023-03-30 12:39:28 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _download_song_to_targets(cls, source: Source, target_list: List[Target]):
|
|
|
|
for target in target_list:
|
|
|
|
print(f"downloading {source} to {target}")
|