Compare commits
15 Commits
fix/musify
...
8c369d79e4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c369d79e4 | ||
|
|
b09d6f2691 | ||
| 0e6fe8187a | |||
| 0343c11a62 | |||
| 9769cf4033 | |||
| 55024bd987 | |||
| d85498869d | |||
| c3350b016d | |||
| 788103a68e | |||
| 5179c64161 | |||
| 04405f88eb | |||
| 949583225a | |||
| 4e0b005170 | |||
| e3e7aea959 | |||
| 709c5ebaa8 |
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@@ -24,6 +24,7 @@
|
||||
"encyclopaedia",
|
||||
"ENDC",
|
||||
"Gitea",
|
||||
"isrc",
|
||||
"levenshtein",
|
||||
"metallum",
|
||||
"musify",
|
||||
|
||||
@@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
if __name__ == "__main__":
|
||||
commands = [
|
||||
"s: #a Psychonaut 4",
|
||||
"d: 0",
|
||||
"s: #a Crystal F",
|
||||
"d: 20",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import mutagen
|
||||
from mutagen.id3 import ID3, Frame, APIC
|
||||
from mutagen.id3 import ID3, Frame, APIC, USLT
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
import logging
|
||||
@@ -7,6 +7,7 @@ from PIL import Image
|
||||
|
||||
from ..utils.config import logging_settings, main_settings
|
||||
from ..objects import Song, Target, Metadata
|
||||
from ..objects.metadata import Mapping
|
||||
from ..connection import Connection
|
||||
|
||||
LOGGER = logging_settings["tagging_logger"]
|
||||
@@ -105,8 +106,11 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
|
||||
data=converted_target.read_bytes(),
|
||||
)
|
||||
)
|
||||
|
||||
mutagen_file = mutagen.File(target.file_path)
|
||||
id3_object.frames.delall("USLT")
|
||||
uslt_val = metadata.get_id3_value(Mapping.UNSYNCED_LYRICS)
|
||||
id3_object.frames.add(
|
||||
USLT(encoding=3, lang=u'eng', desc=u'desc', text=uslt_val)
|
||||
)
|
||||
|
||||
id3_object.add_metadata(metadata)
|
||||
id3_object.save()
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from ..utils.config import main_settings
|
||||
from ..utils.enums.album import AlbumType
|
||||
|
||||
|
||||
@dataclass
|
||||
class FetchOptions:
|
||||
download_all: bool = False
|
||||
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
|
||||
|
||||
|
||||
@dataclass
|
||||
class DownloadOptions:
|
||||
download_all: bool = False
|
||||
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
|
||||
|
||||
process_audio_if_found: bool = False
|
||||
process_metadata_if_found: bool = True
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
from typing import Tuple, Type, Dict, Set
|
||||
from typing import Tuple, Type, Dict, Set, Optional, List
|
||||
from collections import defaultdict
|
||||
|
||||
from . import FetchOptions, DownloadOptions
|
||||
from .results import SearchResults
|
||||
from ..objects import DatabaseObject, Source
|
||||
from ..objects import DatabaseObject as DataObject, Source, Album, Song, Artist, Label
|
||||
|
||||
from ..utils.string_processing import fit_to_file_system
|
||||
from ..utils.config import youtube_settings
|
||||
from ..utils.enums.source import SourcePages
|
||||
from ..utils.support_classes.download_result import DownloadResult
|
||||
from ..utils.support_classes.query import Query
|
||||
from ..utils.support_classes.download_result import DownloadResult
|
||||
from ..utils.exception.download import UrlNotFoundException
|
||||
from ..utils.shared import DEBUG_PAGES
|
||||
|
||||
@@ -34,6 +38,13 @@ SHADY_PAGES: Set[Type[Page]] = {
|
||||
Musify,
|
||||
}
|
||||
|
||||
fetch_map = {
|
||||
Song: "fetch_song",
|
||||
Album: "fetch_album",
|
||||
Artist: "fetch_artist",
|
||||
Label: "fetch_label",
|
||||
}
|
||||
|
||||
if DEBUG_PAGES:
|
||||
DEBUGGING_PAGE = Bandcamp
|
||||
print(f"Only downloading from page {DEBUGGING_PAGE}.")
|
||||
@@ -43,7 +54,10 @@ if DEBUG_PAGES:
|
||||
|
||||
|
||||
class Pages:
|
||||
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False) -> None:
|
||||
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
|
||||
self.download_options: DownloadOptions = download_options or DownloadOptions()
|
||||
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
|
||||
|
||||
# initialize all page instances
|
||||
self._page_instances: Dict[Type[Page], Page] = dict()
|
||||
self._source_to_page: Dict[SourcePages, Type[Page]] = dict()
|
||||
@@ -61,14 +75,19 @@ class Pages:
|
||||
|
||||
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
|
||||
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
|
||||
|
||||
|
||||
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
|
||||
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
|
||||
|
||||
for page_type in self.pages:
|
||||
self._page_instances[page_type] = page_type()
|
||||
self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options)
|
||||
self._source_to_page[page_type.SOURCE_TYPE] = page_type
|
||||
|
||||
|
||||
def _get_page_from_enum(self, source_page: SourcePages) -> Page:
|
||||
if source_page not in self._source_to_page:
|
||||
return None
|
||||
return self._page_instances[self._source_to_page[source_page]]
|
||||
|
||||
def search(self, query: Query) -> SearchResults:
|
||||
result = SearchResults()
|
||||
|
||||
@@ -80,22 +99,42 @@ class Pages:
|
||||
|
||||
return result
|
||||
|
||||
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1) -> DatabaseObject:
|
||||
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
|
||||
return music_object
|
||||
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
|
||||
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
|
||||
return data_object
|
||||
|
||||
for source_page in music_object.source_collection.source_pages:
|
||||
if source_page not in self._source_to_page:
|
||||
continue
|
||||
source: Source
|
||||
for source in data_object.source_collection.get_sources():
|
||||
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
|
||||
if new_data_object is not None:
|
||||
data_object.merge(new_data_object)
|
||||
|
||||
page_type = self._source_to_page[source_page]
|
||||
|
||||
if page_type in self._pages_set:
|
||||
music_object.merge(self._page_instances[page_type].fetch_details(music_object=music_object, stop_at_level=stop_at_level))
|
||||
return data_object
|
||||
|
||||
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
|
||||
page: Page = self._get_page_from_enum(source.page_enum)
|
||||
if page is None:
|
||||
return None
|
||||
|
||||
return music_object
|
||||
# getting the appropriate function for the page and the object type
|
||||
source_type = page.get_source_type(source)
|
||||
if not hasattr(page, fetch_map[source_type]):
|
||||
return None
|
||||
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
|
||||
|
||||
# fetching the data object and marking it as fetched
|
||||
data_object: DataObject = func(source=source)
|
||||
data_object.mark_as_fetched(source.hash_url)
|
||||
return data_object
|
||||
|
||||
def is_downloadable(self, music_object: DatabaseObject) -> bool:
|
||||
def fetch_from_url(self, url: str) -> Optional[DataObject]:
|
||||
source = Source.match_url(url, SourcePages.MANUAL)
|
||||
if source is None:
|
||||
return None
|
||||
|
||||
return self.fetch_from_source(source=source)
|
||||
|
||||
def is_downloadable(self, music_object: DataObject) -> bool:
|
||||
_page_types = set(self._source_to_page)
|
||||
for src in music_object.source_collection.source_pages:
|
||||
if src in self._source_to_page:
|
||||
@@ -104,25 +143,88 @@ class Pages:
|
||||
audio_pages = self._audio_pages_set.intersection(_page_types)
|
||||
return len(audio_pages) > 0
|
||||
|
||||
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
|
||||
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
|
||||
return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.")
|
||||
|
||||
self.fetch_details(music_object)
|
||||
|
||||
_page_types = set(self._source_to_page)
|
||||
for src in music_object.source_collection.source_pages:
|
||||
if src in self._source_to_page:
|
||||
_page_types.add(self._source_to_page[src])
|
||||
|
||||
audio_pages = self._audio_pages_set.intersection(_page_types)
|
||||
def _skip_object(self, data_object: DataObject) -> bool:
|
||||
if isinstance(data_object, Album):
|
||||
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
|
||||
return True
|
||||
|
||||
for download_page in audio_pages:
|
||||
return self._page_instances[download_page].download(music_object=music_object, genre=genre)
|
||||
|
||||
return DownloadResult(error_message=f"No audio source has been found for {music_object}.")
|
||||
return False
|
||||
|
||||
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DatabaseObject]:
|
||||
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
|
||||
# fetch the given object
|
||||
self.fetch_details(data_object)
|
||||
|
||||
# fetching all parent objects (e.g. if you only download a song)
|
||||
if not kwargs.get("fetched_upwards", False):
|
||||
to_fetch: List[DataObject] = [data_object]
|
||||
|
||||
while len(to_fetch) > 0:
|
||||
new_to_fetch = []
|
||||
for d in to_fetch:
|
||||
if self._skip_object(d):
|
||||
continue
|
||||
|
||||
self.fetch_details(d)
|
||||
|
||||
for c in d.get_parent_collections():
|
||||
new_to_fetch.extend(c)
|
||||
|
||||
to_fetch = new_to_fetch
|
||||
|
||||
kwargs["fetched_upwards"] = True
|
||||
|
||||
# download all children
|
||||
download_result: DownloadResult = DownloadResult()
|
||||
for c in data_object.get_children():
|
||||
for d in c:
|
||||
if self._skip_object(d):
|
||||
continue
|
||||
|
||||
download_result.merge(self.download(d, genre, **kwargs))
|
||||
|
||||
# actually download if the object is a song
|
||||
if isinstance(data_object, Song):
|
||||
"""
|
||||
TODO
|
||||
add the traced artist and album to the naming.
|
||||
I am able to do that, because duplicate values are removed later on.
|
||||
"""
|
||||
|
||||
self._download_song(data_object, naming={
|
||||
"genre": [genre],
|
||||
"audio_format": main_settings["audio_format"],
|
||||
})
|
||||
|
||||
return download_result
|
||||
|
||||
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
|
||||
# pre process the data recursively
|
||||
song.compile()
|
||||
|
||||
# manage the naming
|
||||
naming: Dict[str, List[str]] = defaultdict(list, naming)
|
||||
naming["song"].append(song.title_string)
|
||||
naming["isrc"].append(song.isrc)
|
||||
naming["album"].extend(a.title_string for a in song.album_collection)
|
||||
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
|
||||
naming["artist"].extend(a.name for a in song.main_artist_collection)
|
||||
naming["artist"].extend(a.name for a in song.feature_artist_collection)
|
||||
for a in song.album_collection:
|
||||
naming["label"].extend([l.title_string for l in a.label_collection])
|
||||
# removing duplicates from the naming, and process the strings
|
||||
for key, value in naming.items():
|
||||
# https://stackoverflow.com/a/17016257
|
||||
naming[key] = list(dict.fromkeys(items))
|
||||
naming[key] = [fit_to_file_system(i) for i in naming[key] if i is not None]
|
||||
|
||||
# get every possible path
|
||||
path_format = [*main_settings["download_path"].split("/"), main_settings["download_file"]]
|
||||
every_possible_path: Set[str] = set()
|
||||
|
||||
|
||||
return DownloadOptions()
|
||||
|
||||
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
|
||||
source = Source.match_url(url, SourcePages.MANUAL)
|
||||
|
||||
if source is None:
|
||||
|
||||
@@ -24,4 +24,4 @@ from .parents import OuterProxy
|
||||
|
||||
from .artwork import Artwork
|
||||
|
||||
DatabaseObject = TypeVar('T', bound=OuterProxy)
|
||||
DatabaseObject = OuterProxy
|
||||
|
||||
@@ -92,7 +92,7 @@ class Mapping(Enum):
|
||||
key = attribute.value
|
||||
|
||||
if key[0] == 'T':
|
||||
# a text fiel
|
||||
# a text field
|
||||
return cls.get_text_instance(key, value)
|
||||
if key[0] == "W":
|
||||
# an url field
|
||||
@@ -355,7 +355,12 @@ class Metadata:
|
||||
return None
|
||||
|
||||
list_data = self.id3_dict[field]
|
||||
|
||||
#correct duplications
|
||||
correct_list_data = list()
|
||||
for data in list_data:
|
||||
if data not in correct_list_data:
|
||||
correct_list_data.append(data)
|
||||
list_data = correct_list_data
|
||||
# convert for example the time objects to timestamps
|
||||
for i, element in enumerate(list_data):
|
||||
# for performance’s sake I don't do other checks if it is already the right type
|
||||
@@ -368,7 +373,7 @@ class Metadata:
|
||||
if type(element) == ID3Timestamp:
|
||||
list_data[i] = element.timestamp
|
||||
continue
|
||||
|
||||
|
||||
"""
|
||||
Version 2.4 of the specification prescribes that all text fields (the fields that start with a T, except for TXXX) can contain multiple values separated by a null character.
|
||||
Thus if above conditions are met, I concatenate the list,
|
||||
@@ -376,7 +381,7 @@ class Metadata:
|
||||
"""
|
||||
if field.value[0].upper() == "T" and field.value.upper() != "TXXX":
|
||||
return self.NULL_BYTE.join(list_data)
|
||||
|
||||
|
||||
return list_data[0]
|
||||
|
||||
def get_mutagen_object(self, field):
|
||||
@@ -395,6 +400,5 @@ class Metadata:
|
||||
"""
|
||||
# set the tagging timestamp to the current time
|
||||
self.__setitem__(Mapping.TAGGING_TIME, [ID3Timestamp.now()])
|
||||
|
||||
for field in self.id3_dict:
|
||||
yield self.get_mutagen_object(field)
|
||||
|
||||
@@ -99,7 +99,9 @@ class OuterProxy:
|
||||
Wraps the inner data, and provides apis, to naturally access those values.
|
||||
"""
|
||||
|
||||
_default_factories: dict = {}
|
||||
source_collection: SourceCollection
|
||||
|
||||
_default_factories: dict = {"source_collection": SourceCollection}
|
||||
_outer_attribute: Set[str] = {"options", "metadata", "indexing_values", "option_string"}
|
||||
|
||||
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
|
||||
@@ -203,6 +205,7 @@ class OuterProxy:
|
||||
if __other is None:
|
||||
return
|
||||
|
||||
a_id = self.id
|
||||
|
||||
a = self
|
||||
b = __other
|
||||
@@ -225,6 +228,8 @@ class OuterProxy:
|
||||
a._inner.__merge__(old_inner, **kwargs)
|
||||
del old_inner
|
||||
|
||||
self.id = a_id
|
||||
|
||||
def __merge__(self, __other: Optional[OuterProxy], **kwargs):
|
||||
self.merge(__other, **kwargs)
|
||||
|
||||
@@ -301,6 +306,33 @@ class OuterProxy:
|
||||
|
||||
return r
|
||||
|
||||
@property
|
||||
def root_collections(self) -> List[Collection]:
|
||||
if len(self.UPWARDS_COLLECTION_STRING_ATTRIBUTES) == 0:
|
||||
return [self]
|
||||
|
||||
r = []
|
||||
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||
r.extend(self.__getattribute__(collection_string_attribute))
|
||||
|
||||
return r
|
||||
|
||||
def _compile(self, **kwargs):
|
||||
pass
|
||||
|
||||
def compile(self, from_root=False, **kwargs):
|
||||
# compile from the root
|
||||
if not from_root:
|
||||
for c in self.root_collections:
|
||||
c.compile(from_root=True, **kwargs)
|
||||
return
|
||||
|
||||
self._compile(**kwargs)
|
||||
|
||||
for c_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||
for c in self.__getattribute__(c_attribute):
|
||||
c.compile(from_root=True, **kwargs)
|
||||
|
||||
TITEL = "id"
|
||||
@property
|
||||
def title_string(self) -> str:
|
||||
@@ -308,3 +340,11 @@ class OuterProxy:
|
||||
|
||||
def __repr__(self):
|
||||
return f"{type(self).__name__}({self.title_string})"
|
||||
|
||||
def get_child_collections(self):
|
||||
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||
yield self.__getattribute__(collection_string_attribute)
|
||||
|
||||
def get_parent_collections(self):
|
||||
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||
yield self.__getattribute__(collection_string_attribute)
|
||||
|
||||
@@ -376,6 +376,25 @@ class Album(Base):
|
||||
r += f" with {len(self.song_collection)} songs"
|
||||
return r
|
||||
|
||||
def _compile(self):
|
||||
self.analyze_implied_album_type()
|
||||
self.update_tracksort()
|
||||
|
||||
def analyze_implied_album_type(self):
|
||||
# if the song collection has only one song, it is reasonable to assume that it is a single
|
||||
if len(self.song_collection) == 1:
|
||||
self.album_type = AlbumType.SINGLE
|
||||
return
|
||||
|
||||
# if the album already has an album type, we don't need to do anything
|
||||
if self.album_type is not AlbumType.OTHER:
|
||||
return
|
||||
|
||||
# for information on EP's I looked at https://www.reddit.com/r/WeAreTheMusicMakers/comments/a354ql/whats_the_cutoff_length_between_ep_and_album/
|
||||
if len(self.song_collection) < 9:
|
||||
self.album_type = AlbumType.EP
|
||||
return
|
||||
|
||||
def update_tracksort(self):
|
||||
"""
|
||||
This updates the tracksort attributes, of the songs in
|
||||
@@ -525,6 +544,9 @@ class Artist(Base):
|
||||
self.label_collection.extend(object_list)
|
||||
return
|
||||
|
||||
def _compile(self):
|
||||
self.update_albumsort()
|
||||
|
||||
def update_albumsort(self):
|
||||
"""
|
||||
This updates the albumsort attributes, of the albums in
|
||||
@@ -535,9 +557,6 @@ class Artist(Base):
|
||||
|
||||
:return:
|
||||
"""
|
||||
if len(self.main_album_collection) <= 0:
|
||||
return
|
||||
|
||||
type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, {
|
||||
AlbumType.OTHER: 0, # if I don't know it, I add it to the first section
|
||||
AlbumType.STUDIO_ALBUM: 0,
|
||||
@@ -580,7 +599,7 @@ class Artist(Base):
|
||||
album_list.extend(sections[section_index])
|
||||
|
||||
# replace the old collection with the new one
|
||||
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album)
|
||||
self.main_album_collection._data = album_list
|
||||
|
||||
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
|
||||
@property
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, TextIO, Union
|
||||
from typing import List, Tuple, TextIO, Union, Optional
|
||||
import logging
|
||||
import random
|
||||
import requests
|
||||
@@ -31,7 +31,10 @@ class Target(OuterProxy):
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID))) -> P:
|
||||
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
|
||||
if file_extension is not None:
|
||||
name = f"{name}.{file_extension}"
|
||||
|
||||
return cls(main_settings["temp_directory"] / name)
|
||||
|
||||
# This is automatically generated
|
||||
|
||||
@@ -107,7 +107,7 @@ class Page:
|
||||
This is an abstract class, laying out the
|
||||
functionality for every other class fetching something
|
||||
"""
|
||||
|
||||
DOWNLOAD_PRIORITY: int = 0
|
||||
SOURCE_TYPE: SourcePages
|
||||
LOGGER = logging.getLogger("this shouldn't be used")
|
||||
|
||||
@@ -189,103 +189,7 @@ class Page:
|
||||
def song_search(self, song: Song) -> List[Song]:
|
||||
return []
|
||||
|
||||
def fetch_details(
|
||||
self,
|
||||
music_object: DatabaseObject,
|
||||
stop_at_level: int = 1,
|
||||
) -> DatabaseObject:
|
||||
"""
|
||||
when a music object with lacking data is passed in, it returns
|
||||
the SAME object **(no copy)** with more detailed data.
|
||||
If you for example put in, an album, it fetches the tracklist
|
||||
|
||||
:param music_object:
|
||||
:param stop_at_level:
|
||||
This says the depth of the level the scraper will recurse to.
|
||||
If this is for example set to 2, then the levels could be:
|
||||
1. Level: the album
|
||||
2. Level: every song of the album + every artist of the album
|
||||
If no additional requests are needed to get the data one level below the supposed stop level
|
||||
this gets ignored
|
||||
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
|
||||
"""
|
||||
# creating a new object, of the same type
|
||||
new_music_object: Optional[DatabaseObject] = None
|
||||
fetched_from_url: List[str] = []
|
||||
|
||||
# only certain database objects, have a source list
|
||||
if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
|
||||
source: Source
|
||||
for source in music_object.source_collection.get_sources(self.SOURCE_TYPE):
|
||||
if music_object.already_fetched_from(source.hash_url):
|
||||
continue
|
||||
|
||||
tmp = self.fetch_object_from_source(
|
||||
source=source,
|
||||
enforce_type=type(music_object),
|
||||
stop_at_level=stop_at_level,
|
||||
type_string=type(music_object).__name__,
|
||||
entity_string=music_object.option_string,
|
||||
)
|
||||
|
||||
if new_music_object is None:
|
||||
new_music_object = tmp
|
||||
else:
|
||||
new_music_object.merge(tmp)
|
||||
fetched_from_url.append(source.hash_url)
|
||||
|
||||
if new_music_object is not None:
|
||||
music_object.merge(new_music_object)
|
||||
|
||||
music_object.mark_as_fetched(*fetched_from_url)
|
||||
return music_object
|
||||
|
||||
def fetch_object_from_source(
|
||||
self,
|
||||
source: Source,
|
||||
stop_at_level: int = 2,
|
||||
enforce_type: Type[DatabaseObject] = None,
|
||||
type_string: str = "",
|
||||
entity_string: str = "",
|
||||
) -> Optional[DatabaseObject]:
|
||||
|
||||
obj_type = self.get_source_type(source)
|
||||
|
||||
if obj_type is None:
|
||||
return None
|
||||
|
||||
if enforce_type != obj_type and enforce_type is not None:
|
||||
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
|
||||
return None
|
||||
|
||||
music_object: DatabaseObject = None
|
||||
|
||||
fetch_map = {
|
||||
Song: self.fetch_song,
|
||||
Album: self.fetch_album,
|
||||
Artist: self.fetch_artist,
|
||||
Label: self.fetch_label
|
||||
}
|
||||
|
||||
if obj_type in fetch_map:
|
||||
music_object = fetch_map[obj_type](source, stop_at_level=stop_at_level)
|
||||
else:
|
||||
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
|
||||
return None
|
||||
|
||||
if stop_at_level > 0:
|
||||
trace(f"fetching {type_string} [{entity_string}] [stop_at_level={stop_at_level}]")
|
||||
|
||||
collection: Collection
|
||||
for collection_str in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||
collection = music_object.__getattribute__(collection_str)
|
||||
|
||||
for sub_element in collection:
|
||||
sub_element.merge(
|
||||
self.fetch_details(sub_element, stop_at_level=stop_at_level - 1))
|
||||
|
||||
return music_object
|
||||
|
||||
# to fetch stuff
|
||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||
return Song()
|
||||
|
||||
@@ -361,6 +265,7 @@ class Page:
|
||||
return download_result
|
||||
|
||||
def _download_song(self, song: Song, naming_dict: NamingDict):
|
||||
song.compile()
|
||||
if "genre" not in naming_dict and song.genre is not None:
|
||||
naming_dict["genre"] = song.genre
|
||||
|
||||
@@ -381,7 +286,7 @@ class Page:
|
||||
song.target_collection.append(new_target)
|
||||
|
||||
r = DownloadResult(1)
|
||||
temp_target: Target = Target.temp()
|
||||
temp_target: Target = Target.temp(file_extension=main_settings["audio_format"])
|
||||
|
||||
found_on_disc = False
|
||||
target: Target
|
||||
|
||||
@@ -49,7 +49,7 @@ class BandcampTypes(Enum):
|
||||
|
||||
|
||||
class Bandcamp(Page):
|
||||
# CHANGE
|
||||
DOWNLOAD_PRIORITY = 10
|
||||
SOURCE_TYPE = SourcePages.BANDCAMP
|
||||
LOGGER = logging_settings["bandcamp_logger"]
|
||||
|
||||
|
||||
@@ -111,7 +111,7 @@ def parse_url(url: str) -> MusifyUrl:
|
||||
|
||||
|
||||
class Musify(Page):
|
||||
# CHANGE
|
||||
DOWNLOAD_PRIORITY = 9
|
||||
SOURCE_TYPE = SourcePages.MUSIFY
|
||||
LOGGER = logging_settings["musify_logger"]
|
||||
|
||||
|
||||
@@ -145,6 +145,8 @@ class SuperYouTube(Page):
|
||||
_sponsorblock_connection: Connection = Connection()
|
||||
self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session)
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
||||
_url_type = {
|
||||
YouTubeUrlType.CHANNEL: Artist,
|
||||
|
||||
@@ -193,8 +193,7 @@ class YoutubeMusic(SuperYouTube):
|
||||
|
||||
self.start_millis = get_current_millis()
|
||||
|
||||
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
|
||||
self._fetch_from_main_page()
|
||||
self._fetch_from_main_page()
|
||||
|
||||
SuperYouTube.__init__(self, *args, **kwargs)
|
||||
|
||||
@@ -215,6 +214,8 @@ class YoutubeMusic(SuperYouTube):
|
||||
self.download_values_by_url: dict = {}
|
||||
self.not_download: Dict[str, DownloadError] = {}
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _fetch_from_main_page(self):
|
||||
"""
|
||||
===API=KEY===
|
||||
@@ -736,8 +737,9 @@ class YoutubeMusic(SuperYouTube):
|
||||
raw_headers=True,
|
||||
disable_cache=True,
|
||||
headers=media.get("headers", {}),
|
||||
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
|
||||
chunk_size=main_settings["chunk_size"],
|
||||
method="GET",
|
||||
timeout=5,
|
||||
)
|
||||
else:
|
||||
result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url]))
|
||||
|
||||
@@ -69,7 +69,7 @@ dependencies = [
|
||||
"toml~=0.10.2",
|
||||
"typing_extensions~=4.7.1",
|
||||
|
||||
"python-sponsorblock~=0.0.dev1",
|
||||
"python-sponsorblock~=0.1",
|
||||
"youtube_dl",
|
||||
]
|
||||
dynamic = [
|
||||
|
||||
Reference in New Issue
Block a user