music-kraken-core/src/music_kraken/pages/abstract.py
2023-03-30 12:23:57 +02:00

296 lines
11 KiB
Python

from typing import Optional, Union, Type, Dict
from bs4 import BeautifulSoup
import requests
import logging
from ..utils import shared
from ..objects import (
Song,
Source,
Album,
Artist,
Lyrics,
Target,
DatabaseObject,
Options,
SourcePages,
Collection,
Label
)
LOGGER = logging.getLogger("this shouldn't be used")
class Page:
"""
This is an abstract class, laying out the
functionality for every other class fetching something
"""
API_SESSION: requests.Session = requests.Session()
API_SESSION.proxies = shared.proxies
TIMEOUT = 5
TRIES = 5
SOURCE_TYPE: SourcePages
@classmethod
def get_request(cls, url: str, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[
requests.Response]:
retry = False
try:
r = cls.API_SESSION.get(url, timeout=cls.TIMEOUT)
except requests.exceptions.Timeout:
retry = True
if not retry and r.status_code in accepted_response_codes:
return r
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at GET:{url}. ({trie}-{cls.TRIES})")
LOGGER.debug(r.content)
if trie >= cls.TRIES:
LOGGER.warning("to many tries. Aborting.")
return None
return cls.get_request(url, accepted_response_codes, trie + 1)
@classmethod
def post_request(cls, url: str, json: dict, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[
requests.Response]:
retry = False
try:
r = cls.API_SESSION.post(url, json=json, timeout=cls.TIMEOUT)
except requests.exceptions.Timeout:
retry = True
if not retry and r.status_code in accepted_response_codes:
return r
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at POST:{url}. ({trie}-{cls.TRIES})")
LOGGER.debug(r.content)
if trie >= cls.TRIES:
LOGGER.warning("to many tries. Aborting.")
return None
return cls.post_request(url, accepted_response_codes, trie + 1)
@classmethod
def get_soup_from_response(cls, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
class Query:
def __init__(self, query: str):
self.query = query
self.is_raw = False
self.artist = None
self.album = None
self.song = None
self.parse_query(query=query)
def __str__(self):
if self.is_raw:
return self.query
return f"{self.artist}; {self.album}; {self.song}"
def parse_query(self, query: str):
if not '#' in query:
self.is_raw = True
return
query = query.strip()
parameters = query.split('#')
parameters.remove('')
for parameter in parameters:
splitted = parameter.split(" ")
type_ = splitted[0]
input_ = " ".join(splitted[1:]).strip()
if type_ == "a":
self.artist = input_
continue
if type_ == "r":
self.album = input_
continue
if type_ == "t":
self.song = input_
continue
def get_str(self, string):
if string is None:
return ""
return string
artist_str = property(fget=lambda self: self.get_str(self.artist))
album_str = property(fget=lambda self: self.get_str(self.album))
song_str = property(fget=lambda self: self.get_str(self.song))
@classmethod
def search_by_query(cls, query: str) -> Options:
"""
# The Query
You can define a new parameter with "#",
the letter behind it defines the *type* of parameter, followed by a space
"#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop"
if no # is in the query it gets treated as "unspecified query"
# Functionality
Returns the best matches from this page for the query, passed in.
:param query:
:return possible_music_objects:
"""
return Options()
@classmethod
def fetch_details(cls, music_object: Union[Song, Album, Artist, Label], stop_at_level: int = 1) -> DatabaseObject:
"""
when a music object with laccing data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
If you for example put in an album, it fetches the tracklist
:param music_object:
:param stop_at_level:
This says the depth of the level the scraper will recurse to.
If this is for example set to 2, then the levels could be:
1. Level: the album
2. Level: every song of the album + every artist of the album
If no additional requests are needed to get the data one level below the supposed stop level
this gets ignored
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
"""
new_music_object: DatabaseObject = type(music_object)()
had_sources = False
source: Source
for source in music_object.source_collection.get_sources_from_page(cls.SOURCE_TYPE):
new_music_object.merge(cls._fetch_object_from_source(source=source, obj_type=type(music_object), stop_at_level=stop_at_level))
had_sources = True
if not had_sources:
music_object.compile(merge_into=True)
return music_object
collections = {
Label: Collection(element_type=Label),
Artist: Collection(element_type=Artist),
Album: Collection(element_type=Album),
Song: Collection(element_type=Song)
}
cls._clean_music_object(new_music_object, collections)
music_object.merge(new_music_object)
music_object.compile(merge_into=True)
return music_object
@classmethod
def fetch_object_from_source(cls, source: Source, stop_at_level: int = 2):
obj_type = cls._get_type_of_url(source.url)
if obj_type is None:
return None
music_object = cls._fetch_object_from_source(source=source, obj_type=obj_type, stop_at_level=stop_at_level)
collections = {
Label: Collection(element_type=Label),
Artist: Collection(element_type=Artist),
Album: Collection(element_type=Album),
Song: Collection(element_type=Song)
}
cls._clean_music_object(music_object, collections)
music_object.compile(merge_into=True)
return music_object
@classmethod
def _fetch_object_from_source(cls, source: Source, obj_type: Union[Type[Song], Type[Album], Type[Artist], Type[Label]], stop_at_level: int = 1):
if obj_type == Artist:
return cls._fetch_artist_from_source(source=source, stop_at_level=stop_at_level)
if obj_type == Song:
return cls._fetch_song_from_source(source=source, stop_at_level=stop_at_level)
if obj_type == Album:
return cls._fetch_album_from_source(source=source, stop_at_level=stop_at_level)
if obj_type == Label:
return cls._fetch_label_from_source(source=source, stop_at_level=stop_at_level)
@classmethod
def _clean_music_object(cls, music_object: Union[Label, Album, Artist, Song], collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
if type(music_object) == Label:
return cls._clean_label(label=music_object, collections=collections)
if type(music_object) == Artist:
return cls._clean_artist(artist=music_object, collections=collections)
if type(music_object) == Album:
return cls._clean_album(album=music_object, collections=collections)
if type(music_object) == Song:
return cls._clean_song(song=music_object, collections=collections)
@classmethod
def _clean_collection(cls, collection: Collection, collection_dict: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
if collection.element_type not in collection_dict:
return
for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element)
if not r.was_in_collection:
cls._clean_music_object(r.current_element, collection_dict)
continue
collection[i] = r.current_element
cls._clean_music_object(r.current_element, collection_dict)
@classmethod
def _clean_label(cls, label: Label, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
cls._clean_collection(label.current_artist_collection, collections)
cls._clean_collection(label.album_collection, collections)
@classmethod
def _clean_artist(cls, artist: Artist, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
cls._clean_collection(artist.main_album_collection, collections)
cls._clean_collection(artist.feature_song_collection, collections)
cls._clean_collection(artist.label_collection, collections)
@classmethod
def _clean_album(cls, album: Album, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
cls._clean_collection(album.label_collection, collections)
cls._clean_collection(album.song_collection, collections)
cls._clean_collection(album.artist_collection, collections)
@classmethod
def _clean_song(cls, song: Song, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
cls._clean_collection(song.album_collection, collections)
cls._clean_collection(song.feature_artist_collection, collections)
cls._clean_collection(song.main_artist_collection, collections)
@classmethod
def _fetch_song_from_source(cls, source: Source, stop_at_level: int = 1) -> Song:
return Song()
@classmethod
def _fetch_album_from_source(cls, source: Source, stop_at_level: int = 1) -> Album:
return Album()
@classmethod
def _fetch_artist_from_source(cls, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
@classmethod
def _fetch_label_from_source(cls, source: Source, stop_at_level: int = 1) -> Label:
return Label()
@classmethod
def _get_type_of_url(cls, url: str) -> Optional[Union[Type[Song], Type[Album], Type[Artist], Type[Label]]]:
return None