Merge branch 'audio' of github.com:HeIIow2/music-downloader into audio

This commit is contained in:
Hellow 2023-04-03 16:46:13 +02:00
commit 255776a8fd
10 changed files with 354 additions and 211 deletions

View File

@ -72,7 +72,7 @@ def cli():
print("The given url couldn't be downloaded") print("The given url couldn't be downloaded")
return return
page = search.get_page_from_query(parsed) page = search._get_page_from_query(parsed)
if page is not None: if page is not None:
search.choose_page(page) search.choose_page(page)
return return

View File

@ -9,6 +9,7 @@ from .parents import DatabaseObject
class AppendResult: class AppendResult:
was_in_collection: bool was_in_collection: bool
current_element: DatabaseObject current_element: DatabaseObject
was_the_same: bool
class Collection: class Collection:
@ -82,12 +83,17 @@ class Collection:
if self.element_type is not None and not isinstance(element, self.element_type): if self.element_type is not None and not isinstance(element, self.element_type):
raise TypeError(f"{type(element)} is not the set type {self.element_type}") raise TypeError(f"{type(element)} is not the set type {self.element_type}")
# return if the same instance of the object is in the list
for existing in self._data:
if element is existing:
return AppendResult(True, element, True)
for name, value in element.indexing_values: for name, value in element.indexing_values:
if value in self._attribute_to_object_map[name]: if value in self._attribute_to_object_map[name]:
existing_object = self._attribute_to_object_map[name][value] existing_object = self._attribute_to_object_map[name][value]
if not merge_on_conflict: if not merge_on_conflict:
return AppendResult(True, existing_object) return AppendResult(True, existing_object, False)
# if the object does already exist # if the object does already exist
# thus merging and don't add it afterwards # thus merging and don't add it afterwards
@ -95,7 +101,7 @@ class Collection:
existing_object.merge(element) existing_object.merge(element)
# in case any relevant data has been added (e.g. it remaps the old object) # in case any relevant data has been added (e.g. it remaps the old object)
self.map_element(existing_object) self.map_element(existing_object)
return AppendResult(True, existing_object) return AppendResult(True, existing_object, False)
element.merge(existing_object) element.merge(existing_object)
@ -104,12 +110,12 @@ class Collection:
self.unmap_element(existing_object) self.unmap_element(existing_object)
self.map_element(element) self.map_element(element)
return AppendResult(True, existing_object) return AppendResult(True, existing_object, False)
self._data.append(element) self._data.append(element)
self.map_element(element) self.map_element(element)
return AppendResult(False, element) return AppendResult(False, element, False)
def extend(self, element_list: Iterable[DatabaseObject], merge_on_conflict: bool = True, def extend(self, element_list: Iterable[DatabaseObject], merge_on_conflict: bool = True,
merge_into_existing: bool = True): merge_into_existing: bool = True):

View File

@ -63,16 +63,24 @@ class Target(DatabaseObject):
with open(copy_to.file_path, "wb") as write_to: with open(copy_to.file_path, "wb") as write_to:
write_to.write(read_from.read()) write_to.write(read_from.read())
def stream_into(self, r: requests.Response): def stream_into(self, r: requests.Response) -> bool:
if r is None:
return False
self.create_path() self.create_path()
chunk_size = 1024 chunk_size = 1024
total_size = int(r.headers.get('content-length')) total_size = int(r.headers.get('content-length'))
initial_pos = 0 initial_pos = 0
with open(self.file_path,'wb') as f: with open(self.file_path,'wb') as f:
for chunk in r.iter_content(chunk_size=chunk_size): try:
size = f.write(chunk) for chunk in r.iter_content(chunk_size=chunk_size):
size = f.write(chunk)
except requests.exceptions.Timeout:
shared.DOWNLOAD_LOGGER.error("Stream timed out.")
return False
""" """
# doesn't work yet due to # doesn't work yet due to
@ -85,3 +93,5 @@ class Target(DatabaseObject):
size = f.write(chunk) size = f.write(chunk)
pbar.update(size) pbar.update(size)
""" """
return True

View File

@ -3,6 +3,8 @@ from typing import Optional, Union, Type, Dict, List
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import requests import requests
import logging import logging
from dataclasses import dataclass
from copy import copy
from ..utils import shared from ..utils import shared
from ..objects import ( from ..objects import (
@ -19,9 +21,35 @@ from ..objects import (
Label Label
) )
from ..tagging import write_metadata_to_target from ..tagging import write_metadata_to_target
from ..utils.shared import DOWNLOAD_PATH, DOWNLOAD_FILE, DEFAULT_VALUES
from ..utils.string_processing import fit_to_file_system
LOGGER = logging.getLogger("this shouldn't be used") LOGGER = logging.getLogger("this shouldn't be used")
@dataclass
class DefaultTarget:
genre: str = DEFAULT_VALUES["genre"]
label: str = DEFAULT_VALUES["label"]
artist: str = DEFAULT_VALUES["artist"]
album: str = DEFAULT_VALUES["album"]
song: str = DEFAULT_VALUES["song"]
def __setattr__(self, __name: str, __value: str) -> None:
if __name in DEFAULT_VALUES:
if self.__getattribute__(__name) == DEFAULT_VALUES[__name]:
super().__setattr__(__name, fit_to_file_system(__value))
return
super().__setattr__(__name, __value)
@property
def target(self) -> Target:
return Target(
relative_to_music_dir=True,
path=DOWNLOAD_PATH.format(genre=self.genre, label=self.label, artist=self.artist, album=self.album, song=self.song),
file=DOWNLOAD_FILE.format(genre=self.genre, label=self.label, artist=self.artist, album=self.album, song=self.song)
)
class Page: class Page:
""" """
@ -32,6 +60,7 @@ class Page:
API_SESSION.proxies = shared.proxies API_SESSION.proxies = shared.proxies
TIMEOUT = 5 TIMEOUT = 5
TRIES = 5 TRIES = 5
LOGGER = LOGGER
SOURCE_TYPE: SourcePages SOURCE_TYPE: SourcePages
@ -50,14 +79,14 @@ class Page:
return r return r
if not retry: if not retry:
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at GET:{url}. ({trie}-{cls.TRIES})") cls.LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at GET:{url}. ({trie}-{cls.TRIES})")
LOGGER.debug(r.content) cls.LOGGER.debug(r.content)
if trie >= cls.TRIES: if trie >= cls.TRIES:
LOGGER.warning("to many tries. Aborting.") cls.LOGGER.warning("to many tries. Aborting.")
return None return None
return cls.get_request(url, accepted_response_codes, trie + 1) return cls.get_request(url=url, stream=stream, accepted_response_codes=accepted_response_codes, trie=trie + 1)
@classmethod @classmethod
def post_request(cls, url: str, json: dict, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[ def post_request(cls, url: str, json: dict, accepted_response_codes: set = set((200,)), trie: int = 0) -> Optional[
@ -74,14 +103,14 @@ class Page:
return r return r
if not retry: if not retry:
LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at POST:{url}. ({trie}-{cls.TRIES})") cls.LOGGER.warning(f"{cls.__name__} responded wit {r.status_code} at POST:{url}. ({trie}-{cls.TRIES})")
LOGGER.debug(r.content) cls.LOGGER.debug(r.content)
if trie >= cls.TRIES: if trie >= cls.TRIES:
LOGGER.warning("to many tries. Aborting.") cls.LOGGER.warning("to many tries. Aborting.")
return None return None
return cls.post_request(url, json, accepted_response_codes, trie + 1) return cls.post_request(url=url, json=json, accepted_response_codes=accepted_response_codes, trie=trie + 1)
@classmethod @classmethod
def get_soup_from_response(cls, r: requests.Response) -> BeautifulSoup: def get_soup_from_response(cls, r: requests.Response) -> BeautifulSoup:
@ -251,13 +280,11 @@ class Page:
return return
for i, element in enumerate(collection): for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element) r = collection_dict[collection.element_type].append(element, merge_into_existing=True)
if not r.was_in_collection:
cls._clean_music_object(r.current_element, collection_dict)
continue
collection[i] = r.current_element collection[i] = r.current_element
cls._clean_music_object(r.current_element, collection_dict)
if not r.was_the_same:
cls._clean_music_object(r.current_element, collection_dict)
@classmethod @classmethod
def _clean_label(cls, label: Label, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]): def _clean_label(cls, label: Label, collections: Dict[Union[Type[Song], Type[Album], Type[Artist], Type[Label]], Collection]):
@ -283,56 +310,98 @@ class Page:
cls._clean_collection(song.main_artist_collection, collections) cls._clean_collection(song.main_artist_collection, collections)
@classmethod @classmethod
def download(cls, music_object: Union[Song, Album, Artist, Label], download_features: bool = True): def download(
print("downloading") cls,
print(music_object) music_object: Union[Song, Album, Artist, Label],
download_features: bool = True,
default_target: DefaultTarget = None
) -> bool:
if default_target is None:
default_target = DefaultTarget()
if type(music_object) is Song: if type(music_object) is Song:
return cls.download_song(music_object) return cls.download_song(music_object, default_target)
if type(music_object) is Album: if type(music_object) is Album:
return cls.download_album(music_object) return cls.download_album(music_object, default_target)
if type(music_object) is Artist: if type(music_object) is Artist:
return cls.download_artist(music_object, download_features=download_features) return cls.download_artist(music_object, default_target)
if type(music_object) is Label: if type(music_object) is Label:
return cls.download_label(music_object, download_features=download_features) return cls.download_label(music_object, download_features=download_features, default_target=default_target)
return False
@classmethod @classmethod
def download_label(cls, label: Label, download_features: bool = True, override_existing: bool = False): def download_label(cls, label: Label, download_features: bool = True, override_existing: bool = False, default_target: DefaultTarget = None):
if default_target is None:
default_target = DefaultTarget()
else:
default_target = copy(default_target)
default_target.label = label.name
cls.fetch_details(label) cls.fetch_details(label)
for artist in label.current_artist_collection: for artist in label.current_artist_collection:
cls.download_artist(artist, download_features=download_features, override_existing=override_existing) cls.download_artist(artist, download_features=download_features, override_existing=override_existing, default_target=default_target)
for album in label.album_collection: for album in label.album_collection:
cls.download_album(album, override_existing=override_existing) cls.download_album(album, override_existing=override_existing, default_target=default_target)
@classmethod @classmethod
def download_artist(cls, artist: Artist, download_features: bool = True, override_existing: bool = False): def download_artist(cls, artist: Artist, download_features: bool = True, override_existing: bool = False, default_target: DefaultTarget = None):
if default_target is None:
default_target = DefaultTarget()
else:
default_target = copy(default_target)
default_target.artist = artist.name
if not artist.label_collection.empty:
default_target.label = artist.label_collection[0].name
cls.fetch_details(artist) cls.fetch_details(artist)
for album in artist.main_album_collection: for album in artist.main_album_collection:
cls.download_album(album, override_existing=override_existing) cls.download_album(album, override_existing=override_existing, default_target=default_target)
if download_features: if download_features:
for song in artist.feature_album.song_collection: for song in artist.feature_album.song_collection:
cls.download_song(song, override_existing=override_existing) cls.download_song(song, override_existing=override_existing, default_target=default_target)
@classmethod @classmethod
def download_album(cls, album: Album, override_existing: bool = False): def download_album(cls, album: Album, override_existing: bool = False, default_target: DefaultTarget = None):
if default_target is None:
default_target = DefaultTarget()
else:
default_target = copy(default_target)
default_target.album = album.title
if not album.artist_collection.empty:
default_target.artist = album.artist_collection[0].name
if not album.label_collection.empty:
default_target.label = album.label_collection[0].name
cls.fetch_details(album) cls.fetch_details(album)
album.update_tracksort()
for song in album.song_collection: for song in album.song_collection:
cls.download_song(song, override_existing=override_existing) cls.download_song(song, override_existing=override_existing, default_target=default_target)
@classmethod @classmethod
def download_song(cls, song: Song, override_existing: bool = False, create_target_on_demand: bool = True): def download_song(cls, song: Song, override_existing: bool = False, create_target_on_demand: bool = True, default_target: DefaultTarget = None):
if default_target is None:
default_target = DefaultTarget()
else:
default_target = copy(default_target)
default_target.song = song.title
if not song.album_collection.empty:
default_target.album = song.album_collection[0].title
if not song.main_artist_collection.empty:
artist: Artist = song.main_artist_collection[0]
default_target.artist = artist.name
if not artist.label_collection.empty:
default_target.label = artist.label_collection[0].name
cls.fetch_details(song) cls.fetch_details(song)
if song.target_collection.empty: if song.target_collection.empty:
if create_target_on_demand and not song.main_artist_collection.empty and not song.album_collection.empty: if create_target_on_demand and not song.main_artist_collection.empty and not song.album_collection.empty:
song.target_collection.append( song.target_collection.append(default_target.target)
Target(
file=f"{song.title}.mp3",
relative_to_music_dir=True,
path=f"{song.main_artist_collection[0].name}/{song.album_collection[0].title}"
)
)
else: else:
return return
@ -348,20 +417,26 @@ class Page:
continue continue
existing_target.copy_content(target) existing_target.copy_content(target)
return True
sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE) sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE)
if len(sources) == 0: if len(sources) == 0:
return return False
print("great")
temp_target: Target = Target( temp_target: Target = Target(
path=shared.TEMP_DIR, path=shared.TEMP_DIR,
file=str(random.randint(0, 999999)) file=str(random.randint(0, 999999))
) )
cls._download_song_to_targets(source=sources[0], target=temp_target) success = True
cls._post_process_targets(song, temp_target)
if not cls._download_song_to_targets(source=sources[0], target=temp_target):
success = False
if not cls._post_process_targets(song, temp_target):
success = False
return success
@classmethod @classmethod
def _post_process_targets(cls, song: Song, temp_target: Target): def _post_process_targets(cls, song: Song, temp_target: Target):

View File

@ -0,0 +1,100 @@
from collections import defaultdict
from typing import Tuple, List, Dict, Type
from . import page_attributes
from ..abstract import Page
from ...objects import Options, DatabaseObject, Source
class MultiPageOptions:
def __init__(
self,
max_displayed_options: int = 10,
option_digits: int = 3,
derived_from: DatabaseObject = None
) -> None:
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self._length = 0
self._current_option_dict: Dict[Type[Page], Options] = defaultdict(lambda: Options())
self._derive_from = derived_from
def __getitem__(self, key: Type[Page]):
return self._current_option_dict[key]
def __setitem__(self, key: Type[Page], value: Options):
self._current_option_dict[key] = value
self._length = 0
for key in self._current_option_dict:
self._length += 1
def __len__(self) -> int:
return self._length
def get_page_str(self, page: Type[Page]) -> str:
page_name_fill = "-"
max_page_len = 21
return f"({page_attributes.PAGE_NAME_MAP[page]}) ------------------------{page.__name__:{page_name_fill}<{max_page_len}}------------"
def string_from_all_pages(self) -> str:
if self._length == 1:
for key in self._current_option_dict:
return self.string_from_single_page(key)
lines: List[str] = []
j = 0
for page, options in self._current_option_dict.items():
lines.append(self.get_page_str(page))
i = -1
option_obj: DatabaseObject
for i, option_obj in enumerate(options):
if i >= self.max_displayed_options:
lines.append("...")
break
lines.append(f"{j + i:0{self.option_digits}} {option_obj.option_string}")
j += i + 1
return "\n".join(lines)
def choose_from_all_pages(self, index: int) -> Tuple[DatabaseObject, Type[Page]]:
if self._length == 1:
for key in self._current_option_dict:
return self.choose_from_single_page(key, index), key
sum_of_length = 0
for page, options in self._current_option_dict.items():
option_len = min((len(options), self.max_displayed_options))
index_of_list = index - sum_of_length
if index_of_list < option_len:
return options[index_of_list], page
sum_of_length += option_len
raise IndexError("index is out of range")
def string_from_single_page(self, page: Type[Page]) -> str:
lines: List[str] = [self.get_page_str(page)]
option_obj: DatabaseObject
for i, option_obj in enumerate(self._current_option_dict[page]):
lines.append(f"{i:0{self.option_digits}} {option_obj.option_string}")
return "\n".join(lines)
def choose_from_single_page(self, page: Type[Page], index: int) -> DatabaseObject:
return self._current_option_dict[page][index]
def __repr__(self) -> str:
return self.string_from_all_pages()

View File

@ -3,117 +3,10 @@ from typing import Tuple, List, Set, Dict, Type, Union, Optional
from . import page_attributes from . import page_attributes
from .download import Download from .download import Download
from .multiple_options import MultiPageOptions
from ..abstract import Page from ..abstract import Page
from ...objects import Options, DatabaseObject, Source from ...objects import Options, DatabaseObject, Source
from ...utils.shared import DOWNLOAD_LOGGER as LOGGER
class MultiPageOptions:
def __init__(
self,
max_displayed_options: int = 10,
option_digits: int = 3,
database_object: DatabaseObject = None,
page: Type[Page] = None
) -> None:
self.max_displayed_options = max_displayed_options
self.option_digits: int = option_digits
self._length = 0
self._current_option_dict: Dict[Type[Page], Options] = defaultdict(lambda: Options())
self.database_object = database_object
self.page = page
if database_object is not None and page is not None:
self[page] = database_object.options
def __getitem__(self, key: Type[Page]):
return self._current_option_dict[key]
def __setitem__(self, key: Type[Page], value: Options):
self._current_option_dict[key] = value
self._length = 0
for key in self._current_option_dict:
self._length += 1
def __len__(self) -> int:
return self._length
def get_page_str(self, page: Type[Page]) -> str:
page_name_fill = "-"
max_page_len = 21
return f"({page_attributes.PAGE_NAME_MAP[page]}) ------------------------{page.__name__:{page_name_fill}<{max_page_len}}------------"
def string_from_all_pages(self) -> str:
if self._length == 1:
for key in self._current_option_dict:
return self.string_from_single_page(key)
lines: List[str] = []
j = 0
for page, options in self._current_option_dict.items():
lines.append(self.get_page_str(page))
i = -1
option_obj: DatabaseObject
for i, option_obj in enumerate(options):
if i >= self.max_displayed_options:
lines.append("...")
break
lines.append(f"{j + i:0{self.option_digits}} {option_obj.option_string}")
j += i + 1
return "\n".join(lines)
def choose_from_all_pages(self, index: int) -> Tuple[DatabaseObject, Type[Page]]:
if self._length == 1:
for key in self._current_option_dict:
return self.choose_from_single_page(key, index), key
sum_of_length = 0
for page, options in self._current_option_dict.items():
option_len = min((len(options), self.max_displayed_options))
index_of_list = index - sum_of_length
if index_of_list < option_len:
return options[index_of_list], page
sum_of_length += option_len
raise IndexError("index is out of range")
def string_from_single_page(self, page: Type[Page]) -> str:
lines: List[str] = [self.get_page_str(page)]
option_obj: DatabaseObject
for i, option_obj in enumerate(self._current_option_dict[page]):
lines.append(f"{i:0{self.option_digits}} {option_obj.option_string}")
return "\n".join(lines)
def choose_from_single_page(self, page: Type[Page], index: int) -> DatabaseObject:
return self._current_option_dict[page][index]
def __repr__(self) -> str:
return self.string_from_all_pages()
def download(self) -> bool:
if self._length != 1:
return False
if self.database_object is None or self.page is None:
return False
self.page.download(self.database_object)
return True
class Search(Download): class Search(Download):
@ -142,31 +35,22 @@ class Search(Download):
def __repr__(self): def __repr__(self):
return self._current_option.__repr__() return self._current_option.__repr__()
@property def next_options(self, derive_from: DatabaseObject = None) -> MultiPageOptions:
def next_options(self) -> MultiPageOptions:
mpo = MultiPageOptions(
max_displayed_options=self.max_displayed_options,
option_digits=self.option_digits
)
self._option_history.append(mpo)
self._current_option = mpo
return mpo
def next_options_from_music_obj(self, database_obj: DatabaseObject, page: Type[Page]) -> MultiPageOptions:
mpo = MultiPageOptions( mpo = MultiPageOptions(
max_displayed_options=self.max_displayed_options, max_displayed_options=self.max_displayed_options,
option_digits=self.option_digits, option_digits=self.option_digits,
database_object=database_obj, derived_from=derive_from
page=page
) )
self._option_history.append(mpo) self._option_history.append(mpo)
self._current_option = mpo self._current_option = mpo
return mpo return mpo
@property
def _previous_options(self) -> MultiPageOptions: def _previous_options(self) -> MultiPageOptions:
self._option_history.pop() self._option_history.pop()
self._current_option = self._option_history[-1] self._current_option = self._option_history[-1]
return self._option_history[-1] return self._option_history[-1]
def search(self, query: str): def search(self, query: str):
@ -177,40 +61,62 @@ class Search(Download):
the letter behind it defines the *type* of parameter, the letter behind it defines the *type* of parameter,
followed by a space "#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop" followed by a space "#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop"
if no # is in the query it gets treated as "unspecified query" if no # is in the query it gets treated as "unspecified query"
doesn't set derived_from thus,
can't download right after
""" """
for page in self.pages: for page in self.pages:
self._current_option[page] = page.search_by_query(query=query) self._current_option[page] = page.search_by_query(query=query)
def choose_page(self, page: Type[Page]): def choose_page(self, page: Type[Page]):
"""
doesn't set derived_from thus,
can't download right after
"""
if page not in page_attributes.ALL_PAGES: if page not in page_attributes.ALL_PAGES:
raise ValueError(f"Page \"{page.__name__}\" does not exist in page_attributes.ALL_PAGES") raise ValueError(f"Page \"{page.__name__}\" does not exist in page_attributes.ALL_PAGES")
prev_mpo = self._current_option prev_mpo = self._current_option
mpo = self.next_options mpo = self.next_options()
mpo[page] = prev_mpo[page] mpo[page] = prev_mpo[page]
def get_page_from_query(self, query: str) -> Optional[Type[Page]]: def _get_page_from_query(self, query: str) -> Optional[Type[Page]]:
"""
query can be for example:
"a" or "EncyclopaediaMetallum" to choose a page
"""
page = page_attributes.NAME_PAGE_MAP.get(query.lower().strip()) page = page_attributes.NAME_PAGE_MAP.get(query.lower().strip())
if page in self.pages: if page in self.pages:
return page return page
def _get_page_from_source(self, source: Source) -> Optional[Type[Page]]:
return page_attributes.SOURCE_PAGE_MAP.get(source.page_enum)
def choose_index(self, index: int): def choose_index(self, index: int):
db_object, page = self._current_option.choose_from_all_pages(index=index) db_object, page = self._current_option.choose_from_all_pages(index=index)
music_object = self.fetch_details(db_object) music_object = self.fetch_details(db_object)
mpo = self.next_options(derive_from=music_object)
mpo = self.next_options_from_music_obj(music_object, page) mpo[page] = music_object.options
def goto_previous(self): def goto_previous(self):
try: try:
self._current_option = self._previous_options self._previous_options()
except IndexError: except IndexError:
pass pass
def search_url(self, url: str) -> bool: def search_url(self, url: str) -> bool:
"""
sets derived_from, thus
can download directly after
"""
source = Source.match_url(url=url) source = Source.match_url(url=url)
if source is None: if source is None:
return False return False
@ -220,10 +126,22 @@ class Search(Download):
return False return False
page = page_attributes.SOURCE_PAGE_MAP[source.page_enum] page = page_attributes.SOURCE_PAGE_MAP[source.page_enum]
mpo = self.next_options mpo = self.next_options(derive_from=new_object)
mpo[page] = new_object.options mpo[page] = new_object.options
return True return True
def download_chosen(self) -> bool: def download_chosen(self) -> bool:
return self._current_option.download() if self._current_option._derive_from is None:
LOGGER.warning(f"can't download from an non choosen stuff")
return False
source: Source
for source in self._current_option._derive_from.source_collection:
page = self._get_page_from_source(source=source)
if page in self.audio_pages:
return page.download(music_object=self._current_option._derive_from)
return False

View File

@ -9,11 +9,6 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import random import random
from ..utils.shared import (
ENCYCLOPAEDIA_METALLUM_LOGGER as LOGGER,
TEMP_FOLDER
)
from .abstract import Page from .abstract import Page
from ..objects import ( from ..objects import (
DatabaseObject, DatabaseObject,
@ -90,6 +85,8 @@ class Musify(Page):
SOURCE_TYPE = SourcePages.MUSIFY SOURCE_TYPE = SourcePages.MUSIFY
LOGGER = LOGGER
@classmethod @classmethod
def parse_url(cls, url: str) -> MusifyUrl: def parse_url(cls, url: str) -> MusifyUrl:
parsed = urlparse(url) parsed = urlparse(url)
@ -568,6 +565,7 @@ class Musify(Page):
for card_soup in soup.find_all("div", {"class": "card"}): for card_soup in soup.find_all("div", {"class": "card"}):
new_album: Album = cls.parse_album_card(card_soup, artist_name) new_album: Album = cls.parse_album_card(card_soup, artist_name)
album_source: Source album_source: Source
if stop_at_level > 1: if stop_at_level > 1:
for album_source in new_album.source_collection.get_sources_from_page(cls.SOURCE_TYPE): for album_source in new_album.source_collection.get_sources_from_page(cls.SOURCE_TYPE):
new_album.merge(cls._fetch_album_from_source(album_source, stop_at_level=stop_at_level-1)) new_album.merge(cls._fetch_album_from_source(album_source, stop_at_level=stop_at_level-1))
@ -856,15 +854,15 @@ class Musify(Page):
eg. 'https://musify.club/release/linkin-park-hybrid-theory-2000-188' eg. 'https://musify.club/release/linkin-park-hybrid-theory-2000-188'
/html/musify/album_overview.html /html/musify/album_overview.html
[] tracklist - [x] tracklist
[] attributes - [ ] attributes
[] ratings - [ ] ratings
:param stop_at_level: :param stop_at_level:
:param source: :param source:
:return: :return:
""" """
album = Album(title="Hi :)") album = Album(title="Hi :)", source_list=[source])
url = cls.parse_url(source.url) url = cls.parse_url(source.url)
@ -881,6 +879,14 @@ class Musify(Page):
card_soup: BeautifulSoup card_soup: BeautifulSoup
for card_soup in cards_soup.find_all("div", {"class": "playlist__item"}): for card_soup in cards_soup.find_all("div", {"class": "playlist__item"}):
album.song_collection.append(cls.parse_song_card(card_soup)) album.song_collection.append(cls.parse_song_card(card_soup))
if stop_at_level > 1:
song: Song
for song in album.song_collection:
sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE)
for source in sources:
song.merge(cls._fetch_song_from_source(source=source))
album.update_tracksort() album.update_tracksort()
return album return album

View File

@ -4,18 +4,13 @@ import tempfile
import os import os
import configparser import configparser
from sys import platform as current_os from sys import platform as current_os
from pathlib import Path
TEMP_FOLDER = "music-downloader"
LOG_FILE = "download_logs.log" LOG_FILE = "download_logs.log"
TEMP_DATABASE_FILE = "metadata.db" TEMP_DATABASE_FILE = "metadata.db"
DATABASE_STRUCTURE_FILE = "database_structure.sql" TEMP_DIR = Path(tempfile.gettempdir(), "music-downloader")
DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/master/assets/database_structure.sql" TEMP_DIR.mkdir(exist_ok=True)
TEMP_DIR = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
if not os.path.exists(TEMP_DIR):
os.mkdir(TEMP_DIR)
TEMP_DATABASE_PATH = os.path.join(TEMP_DIR, TEMP_DATABASE_FILE)
# configure logger default # configure logger default
logging.basicConfig( logging.basicConfig(
@ -33,18 +28,17 @@ INIT_PATH_LOGGER = logging.getLogger("init_path")
DATABASE_LOGGER = logging.getLogger("database") DATABASE_LOGGER = logging.getLogger("database")
METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata") METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata")
URL_DOWNLOAD_LOGGER = logging.getLogger("AudioSource") URL_DOWNLOAD_LOGGER = logging.getLogger("AudioSource")
TAGGING_LOGGER = logging.getLogger("tagging")
YOUTUBE_LOGGER = logging.getLogger("Youtube") YOUTUBE_LOGGER = logging.getLogger("Youtube")
MUSIFY_LOGGER = logging.getLogger("Musify") MUSIFY_LOGGER = logging.getLogger("Musify")
PATH_LOGGER = logging.getLogger("create-paths") PATH_LOGGER = logging.getLogger("create-paths")
DOWNLOAD_LOGGER = logging.getLogger("download") DOWNLOAD_LOGGER = logging.getLogger("download")
LYRICS_LOGGER = logging.getLogger("lyrics") LYRICS_LOGGER = logging.getLogger("lyrics")
GENIUS_LOGGER = logging.getLogger("genius") GENIUS_LOGGER = logging.getLogger("genius")
TAGGING_LOGGER = logging.getLogger("tagging")
ENCYCLOPAEDIA_METALLUM_LOGGER = logging.getLogger("ma") ENCYCLOPAEDIA_METALLUM_LOGGER = logging.getLogger("ma")
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea" NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.join(os.path.expanduser("~"), "Music") MUSIC_DIR = Path(os.path.expanduser("~"), "Music")
if current_os == "linux": if current_os == "linux":
# XDG_USER_DIRS_FILE reference: https://freedesktop.org/wiki/Software/xdg-user-dirs/ # XDG_USER_DIRS_FILE reference: https://freedesktop.org/wiki/Software/xdg-user-dirs/
@ -58,17 +52,34 @@ if current_os == "linux":
config.read_string(data) config.read_string(data)
xdg_config = config['XDG_USER_DIRS'] xdg_config = config['XDG_USER_DIRS']
MUSIC_DIR = os.path.expandvars(xdg_config['xdg_music_dir'].strip('"')) MUSIC_DIR = os.path.expandvars(xdg_config['xdg_music_dir'].strip('"'))
except (FileNotFoundError, KeyError) as E: except (FileNotFoundError, KeyError) as E:
logger.warning(f''' logger.warning(
Missing file or No entry found for "xdg_music_dir" in: \'{XDG_USER_DIRS_FILE}\'. f"Missing file or No entry found for \"xdg_music_dir\" in: \"{XDG_USER_DIRS_FILE}\".\n" \
Will fallback on default '$HOME/Music'. f"Will fallback on default \"$HOME/Music\"."
---- )
''')
TOR = False TOR = False
proxies = { proxies = {
'http': 'socks5h://127.0.0.1:9150', 'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150' 'https': 'socks5h://127.0.0.1:9150'
} if TOR else {} } if TOR else {}
# only the sources here will get downloaded, in the order the list is ordered
AUDIO_SOURCES = ["Musify", "Youtube"] """
available variables:
- genre
- label
- artist
- album
- song
"""
DOWNLOAD_PATH = "{genre}/{artist}/{album}"
DOWNLOAD_FILE = "{song}.mp3"
DEFAULT_VALUES = {
"genre": "Various Genre",
"label": "Various Labels",
"artist": "Various Artists",
"album": "Various Album",
"song": "Various Song",
}

View File

@ -7,3 +7,16 @@ def unify(string: str) -> str:
return string.lower() return string.lower()
def fit_to_file_system(string: str) -> str:
string = string.strip()
while string[0] == ".":
if len(string) == 0:
return string
string = string[1:]
string = string.replace("/", "|").replace("\\", "|")
return string

View File

@ -12,6 +12,10 @@ def fetch_artist():
source_list=[objects.Source(objects.SourcePages.MUSIFY, "https://musify.club/artist/psychonaut-4-83193")] source_list=[objects.Source(objects.SourcePages.MUSIFY, "https://musify.club/artist/psychonaut-4-83193")]
) )
artist = objects.Artist(
source_list=[objects.Source(objects.SourcePages.MUSIFY, "https://musify.club/artist/ghost-bath-280348/")]
)
artist = Musify.fetch_details(artist) artist = Musify.fetch_details(artist)
print(artist.options) print(artist.options)
@ -33,4 +37,4 @@ def fetch_album():
print(artist.id, artist.name) print(artist.id, artist.name)
if __name__ == "__main__": if __name__ == "__main__":
search() fetch_artist()