feat: fixed headers

This commit is contained in:
Hellow 2024-01-22 21:39:39 +01:00
parent 3f14f933c0
commit 2a77f75e6f
7 changed files with 302 additions and 60 deletions

View File

@ -1,5 +1,8 @@
import music_kraken
import logging
print("Setting logging-level to DEBUG")
logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
normally_download = [

View File

@ -1,17 +1,18 @@
from ..utils import cli_function
from ...utils.path_manager import LOCATIONS
from ...utils import shared
from ...utils.config import main_settings
def all_paths():
return {
"Temp dir": LOCATIONS.TEMP_DIRECTORY,
"Music dir": LOCATIONS.MUSIC_DIRECTORY,
"Log file": shared.LOG_PATH,
"Temp dir": main_settings["temp_directory"],
"Music dir": main_settings["music_directory"],
"Conf dir": LOCATIONS.CONFIG_DIRECTORY,
"Conf file": LOCATIONS.CONFIG_FILE,
"FFMPEG bin": LOCATIONS.FFMPEG_BIN,
"logging file": main_settings["log_file"],
"FFMPEG bin": main_settings["ffmpeg_binary"],
"Cache Dir": main_settings["cache_directory"],
}

View File

@ -23,6 +23,9 @@ class CacheAttribute:
@property
def is_valid(self):
if isinstance(self.expires, str):
pass
# self.expires = datetime.fromisoformat(self.expires)
return datetime.now() < self.expires
def __eq__(self, other):
@ -96,7 +99,7 @@ class Cache:
return True
def set(self, content: bytes, name: str, expires_in: float = 10):
def set(self, content: bytes, name: str, expires_in: float = 10, module: str = ""):
"""
:param content:
:param module:
@ -107,10 +110,12 @@ class Cache:
if name == "":
return
module_path = self._init_module(self.module)
module = self.module if module == "" else module
module_path = self._init_module(module)
cache_attribute = CacheAttribute(
module=self.module,
module=module,
name=name,
created=datetime.now(),
expires=datetime.now() + timedelta(days=expires_in),

View File

@ -60,11 +60,6 @@ class Connection:
self.heartbeat_thread = None
self.heartbeat_interval = heartbeat_interval
@property
def user_agent(self) -> str:
return self.session.headers.get("user-agent",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36")
def start_heartbeat(self):
if self.heartbeat_interval <= 0:
self.LOGGER.warning(f"Can't start a heartbeat with {self.heartbeat_interval}s in between.")
@ -101,10 +96,13 @@ class Connection:
def get_header(self, **header_values) -> Dict[str, str]:
return {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"user-agent": main_settings["user_agent"],
"User-Agent": main_settings["user_agent"],
"Connection": "keep-alive",
# "Host": self.HOST.netloc,
"Host": self.HOST.netloc,
"authority": self.HOST.netloc,
"Referer": self.base_url(),
"Accept-Language": main_settings["language"],
**header_values
}
@ -117,16 +115,18 @@ class Connection:
refer_from_origin: bool,
url: ParseResult
) -> Dict[str, str]:
if headers is None:
headers = dict()
headers = self.get_header(**(headers or {}))
if not refer_from_origin:
headers["Referer"] = self.base_url(url=url)
return headers
def save(self, r: requests.Response, name: str, **kwargs):
self.cache.set(r.content, name, expires_in=kwargs.get("expires_in", self.cache_expiring_duration))
def save(self, r: requests.Response, name: str, error: bool = False, **kwargs):
n_kwargs = {}
if error:
n_kwargs["module"] = "failed_requests"
self.cache.set(r.content, name, expires_in=kwargs.get("expires_in", self.cache_expiring_duration), **n_kwargs)
def request(
self,
@ -135,7 +135,7 @@ class Connection:
accepted_response_codes: set,
url: str,
timeout: float,
headers: dict,
headers: Optional[dict],
refer_from_origin: bool = True,
raw_url: bool = False,
sleep_after_404: float = None,
@ -143,16 +143,28 @@ class Connection:
name: str = "",
**kwargs
) -> Optional[requests.Response]:
if name != "":
parsed_url = urlparse(url)
headers = self._update_headers(
headers=headers,
refer_from_origin=refer_from_origin,
url=parsed_url
)
disable_cache = headers.get("Cache-Control") == "no-cache" or kwargs.get("disable_cache", False)
if name != "" and not disable_cache:
cached = self.cache.get(name)
with responses.RequestsMock() as resp:
resp.add(
method=method,
url=url,
body=cached,
)
return requests.request(method=method, url=url, timeout=timeout, headers=headers, **kwargs)
if cached is not None:
with responses.RequestsMock() as resp:
resp.add(
method=method,
url=url,
body=cached,
)
return requests.request(method=method, url=url, timeout=timeout, headers=headers, **kwargs)
if sleep_after_404 is None:
sleep_after_404 = self.sleep_after_404
@ -162,16 +174,9 @@ class Connection:
if timeout is None:
timeout = self.TIMEOUT
parsed_url = urlparse(url)
headers = self._update_headers(
headers=headers,
refer_from_origin=refer_from_origin,
url=parsed_url
)
request_url = parsed_url.geturl() if not raw_url else url
r = None
connection_failed = False
try:
if self.session_is_occupied and not is_heartbeat:
@ -179,10 +184,12 @@ class Connection:
while self.session_is_occupied and not is_heartbeat:
pass
print(headers)
r: requests.Response = requests.request(method=method, url=url, timeout=timeout, headers=headers, **kwargs)
if r.status_code in accepted_response_codes:
self.save(r, name, **kwargs)
if not disable_cache:
self.save(r, name, **kwargs)
return r
if self.SEMANTIC_NOT_FOUND and r.status_code == 404:
@ -199,7 +206,13 @@ class Connection:
if not connection_failed:
self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} "
f"at {url}. ({try_count}-{self.TRIES})")
self.LOGGER.debug(r.content)
if r is not None:
self.LOGGER.debug("request headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.request.headers.items()))
self.LOGGER.debug("response headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.headers.items()))
self.LOGGER.debug(r.content)
if name != "":
self.save(r, name, error=True, **kwargs)
if sleep_after_404 != 0:
self.LOGGER.warning(f"Waiting for {sleep_after_404} seconds.")
time.sleep(sleep_after_404)
@ -219,6 +232,7 @@ class Connection:
sleep_after_404=sleep_after_404,
is_heartbeat=is_heartbeat,
name=name,
user_agent=main_settings["user_agent"],
**kwargs
)

View File

@ -2,7 +2,7 @@ from collections import defaultdict
from typing import List, Optional, Dict, Type, Union
from bs4 import BeautifulSoup
import pycountry
from urllib.parse import urlparse
from urllib.parse import urlparse, urlencode
from ..connection import Connection
from ..utils.config import logging_settings
@ -38,6 +38,10 @@ ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
"Compilation": AlbumType.COMPILATION_ALBUM
})
URL_SITE = 'https://www.metal-archives.com/'
URL_IMAGES = 'https://www.metal-archives.com/images/'
URL_CSS = 'https://www.metal-archives.com/css/'
def _song_from_json(artist_html=None, album_html=None, release_type=None, title=None, lyrics_html=None) -> Song:
song_id = None
@ -110,6 +114,99 @@ def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Al
)
def create_grid(
tableOrId: str = "#searchResultsSong",
nbrPerPage: int = 200,
ajaxUrl: str = "search/ajax-advanced/searching/songs/?songTitle=high&bandName=&releaseTitle=&lyrics=&genre=",
extraOptions: dict = None
):
"""
function createGrid(tableOrId, nbrPerPage, ajaxUrl, extraOptions) {
var table = null;
if (typeof tableOrId == "string") {
table = $(tableOrId);
} else {
table = tableOrId;
}
if (ajaxUrl == undefined) {
ajaxUrl = null;
}
var options = {
bAutoWidth: false,
bFilter: false,
bLengthChange: false,
bProcessing: true,
bServerSide: ajaxUrl != null,
iDisplayLength: nbrPerPage,
sAjaxSource: URL_SITE + ajaxUrl,
sPaginationType: 'full_numbers',
sDom: 'ipl<"block_spacer_5"><"clear"r>f<t>rip',
oLanguage: {
sProcessing: 'Loading...',
sEmptyTable: 'No records to display.',
sZeroRecords: 'No records found.'
},
"fnDrawCallback": autoScrollUp
};
if (typeof extraOptions == "object") {
for (var key in extraOptions) {
options[key] = extraOptions[key];
if (key == 'fnDrawCallback') {
var callback = options[key];
options[key] = function(o) {
autoScrollUp(o);
callback(o);
}
}
}
}
return table.dataTable(options);
}
:return:
"""
def onDrawCallback(o):
"""
this gets executed once the ajax request is done
:param o:
:return:
"""
extraOptions = extraOptions or {
"bSort": False,
"oLanguage": {
"sProcessing": 'Searching, please wait...',
"sEmptyTable": 'No matches found. Please try with different search terms.'
}
}
options = {
"bAutoWidth": False,
"bFilter": False,
"bLengthChange": False,
"bProcessing": True,
"bServerSide": ajaxUrl is not None,
"iDisplayLength": nbrPerPage,
"sAjaxSource": URL_SITE + ajaxUrl,
"sPaginationType": 'full_numbers',
"sDom": 'ipl<"block_spacer_5"><"clear"r>f<t>rip',
"oLanguage": {
"sProcessing": 'Loading...',
"sEmptyTable": 'No records to display.',
"sZeroRecords": 'No records found.'
},
"fnDrawCallback": onDrawCallback
}
for key, value in extraOptions.items():
options[key] = value
if key == 'fnDrawCallback':
callback = options[key]
options[key] = lambda o: onDrawCallback(o) and callback(o)
# implement jquery datatable
class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourcePages.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"]
@ -117,16 +214,20 @@ class EncyclopaediaMetallum(Page):
def __init__(self, **kwargs):
self.connection: Connection = Connection(
host="https://www.metal-archives.com/",
logger=self.LOGGER
logger=self.LOGGER,
module=type(self).__name__
)
super().__init__(**kwargs)
def song_search(self, song: Song) -> List[Song]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?"
"""
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?songTitle={song}&bandName={" \
"artist}&releaseTitle={album}&lyrics=&genre=&sEcho=1&iColumns=5&sColumns=&iDisplayStart=0" \
"&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&mDataProp_3=3&mDataProp_4=4&_" \
"=1674550595663"
"""
"""
The difficult question I am facing is, that if I try every artist, with every song, with every album,
@ -136,17 +237,54 @@ class EncyclopaediaMetallum(Page):
Is not good.
"""
song_title = song.title
album_titles = ["*"] if song.album_collection.empty else [album.title for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name for artist in song.main_artist_collection]
search_params = {
"songTitle": song.title,
"bandName": "*",
"releaseTitle": "*",
"lyrics": "",
"genre": "",
"sEcho": 1,
"iColumns": 5,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"mDataProp_3": 3,
"mDataProp_4": 4,
"_": 1705946986092
}
referer_params = {
"songTitle": song.title,
"bandName": "*",
"releaseTitle": "*",
"lyrics": "",
"genre": "",
}
urlencode(search_params)
song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection]
search_results = []
for artist in artist_titles:
for album in album_titles:
r = self.connection.get(
endpoint.format(song=song_title, artist=artist, album=album)
)
_search = search_params.copy()
_referer_params = referer_params.copy()
_search["bandName"] = _referer_params["bandName"] = artist
_search["releaseTitle"] = _referer_params["releaseTitle"] = album
r = self.connection.get(endpoint + urlencode(_search), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/songs?" + urlencode(_referer_params),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
}, name="song_search")
if r is None:
return []
@ -162,20 +300,59 @@ class EncyclopaediaMetallum(Page):
return search_results
def album_search(self, album: Album) -> List[Album]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/albums/?bandName={" \
"artist}&releaseTitle={album}&releaseYearFrom=&releaseMonthFrom=&releaseYearTo=&releaseMonthTo" \
"=&country=&location=&releaseLabelName=&releaseCatalogNumber=&releaseIdentifiers" \
"=&releaseRecordingInfo=&releaseDescription=&releaseNotes=&genre=&sEcho=1&iColumns=3&sColumns" \
"=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&_=1674563943747"
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/albums/?"
search_params = {
"bandName": "*",
"releaseTitle": album.title.strip(),
"releaseYearFrom": "",
"releaseMonthFrom": "",
"releaseYearTo": "",
"releaseMonthTo": "",
"country": "",
"location": "",
"releaseLabelName": "",
"releaseCatalogNumber": "",
"releaseIdentifiers": "",
"releaseRecordingInfo": "",
"releaseDescription": "",
"releaseNotes": "",
"genre": "",
"sEcho": 1,
"iColumns": 3,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"_": 1705946986092
}
referer_params = {
"bandName": "*",
"releaseTitle": album.title.strip(),
}
album_title = album.title
artist_titles = ["*"] if album.artist_collection.empty else [artist.name for artist in album.artist_collection]
artist_titles = ["*"] if album.artist_collection.empty else [artist.name.strip() for artist in album.artist_collection]
search_results = []
for artist in artist_titles:
r = self.connection.get(endpoint.format(artist=artist, album=album_title))
_search = search_params.copy()
_referer_params = referer_params.copy()
_search["bandName"] = _referer_params["bandName"] = artist
r = self.connection.get(endpoint + urlencode(_search), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/albums?" + urlencode(_referer_params),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01",
})
#r = self.connection.get(endpoint.format(artist=artist, album=album_title))
if r is None:
return []
@ -186,12 +363,37 @@ class EncyclopaediaMetallum(Page):
) for raw_album in r.json()['aaData'])
def artist_search(self, artist: Artist) -> List[Artist]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/bands/?bandName={" \
"artist}&genre=&country=&yearCreationFrom=&yearCreationTo=&bandNotes=&status=&themes=&location" \
"=&bandLabelName=&sEcho=1&iColumns=3&sColumns=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0" \
"&mDataProp_1=1&mDataProp_2=2&_=1674565459976"
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/bands/?"
r = self.connection.get(endpoint.format(artist=artist.name))
search_params = {
"bandName": artist.name.strip(),
"genre": "",
"country": "",
"yearCreationFrom": "",
"yearCreationTo": "",
"bandNotes": "",
"status": "",
"themes": "",
"location": "",
"bandLabelName": "",
"sEcho": 1,
"iColumns": 3,
"sColumns": "",
"iDisplayStart": 0,
"iDisplayLength": 200,
"mDataProp_0": 0,
"mDataProp_1": 1,
"mDataProp_2": 2,
"_": 1705946986092
}
r = self.connection.get(endpoint + urlencode(search_params), headers={
"Referer": "https://www.metal-archives.com/search/advanced/searching/bands?" + urlencode({"bandName": artist.name.strip()}),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"X-Requested-With": "XMLHttpRequest",
"Accept": "application/json, text/javascript, */*; q=0.01",
}, name="artist_search.json")
if r is None:
return []

View File

@ -58,6 +58,21 @@ If you use Tor, make sure the Tor browser is installed, and running.I can't guar
Attribute(name="show_download_errors_threshold", default_value=0.3, description="""If the percentage of failed downloads goes over this threshold,
all the error messages are shown."""),
Attribute(
name="language",
default_value="en-US,en;q=0.6",
description="The language of the program. This will be used to translate the program in the future.\n"
"Currently it just sets the Accept-Language header.\n"
"https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Language"
),
Attribute(
name="user_agent",
default_value="Mozilla/5.0 (X11; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0",
description="The user agent of the program. This will be used to translate the program in the future.\n"
"Currently it just sets the User-Agent header.\n"
"https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent"
),
EmptyLine(),
PathAttribute(name="music_directory", default_value=LOCATIONS.MUSIC_DIRECTORY.resolve(), description="The directory, all the music will be downloaded to."),
@ -121,6 +136,8 @@ class SettingsStructure(TypedDict):
tor_port: int
chunk_size: int
show_download_errors_threshold: float
language: str
user_agent: str
# paths
music_directory: Path

View File

@ -4,7 +4,7 @@ from .config import main_settings
DEBUG = True
DEBUG_LOGGING = DEBUG and True
DEBUG_YOUTUBE_INITIALIZING = DEBUG and True
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
if DEBUG: