32 Commits

Author SHA1 Message Date
7b0b830d64 feat: removed legacy key
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2024-05-23 13:24:25 +02:00
1ba6c97f5a feat: more extensive browse id 2024-05-23 13:20:34 +02:00
c8cbfc7cb9 feat: improved output of clearing the cache 2024-05-23 13:17:14 +02:00
c131924577 Merge pull request 'fix/clean_feature_artists' (#38) from fix/clean_feature_artists into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Reviewed-on: #38
2024-05-21 12:08:04 +00:00
8cdb5c1f99 fix: tests were a mess and didn't properly test the functionality but random things that worked with implementation
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-21 13:52:20 +02:00
356ba658ce fix: lyrics enpoint could crash the whole program 2024-05-21 13:37:26 +02:00
000a6c0dba feat: added type identifier to option strings 2024-05-17 18:24:56 +02:00
83a3334f1a changed default value
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:55:58 +02:00
ab61ff7e9b feat: added the option to select all at options at the same time
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:37:49 +02:00
3cb35909d1 feat: added option to just fetch the objects in select 2024-05-17 13:31:46 +02:00
e87075a809 feat: changed syncing from event based to reference
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:39:12 +02:00
86e985acec fix: files don't contain id anymore
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:30:53 +02:00
a70a24d93e feat: minor adjustments
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:14:18 +02:00
2c1ac0f12d fix: wrong collection 2024-05-16 17:09:36 +02:00
897897dba2 feat: added recursive structure 2024-05-16 14:29:50 +02:00
adcf26b518 feat: renamed main album collection to album collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:10:00 +02:00
8ccc28daf8 draft: added feature artist collection attr 2024-05-16 14:09:34 +02:00
2b3f4d82d9 feat: renamed main_artist_collection to artist_collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:05:33 +02:00
41a91a6afe feat: removing dependence beteen artist.album and album.artist 2024-05-16 13:41:06 +02:00
82df96a193 Merge pull request 'feature/move_download_code_to_download' (#34) from feature/move_download_code_to_download into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #34
2024-05-15 15:24:30 +00:00
80ad2727de fix: stream retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-15 17:14:01 +02:00
19b83ce880 fix: saving streaming progress on retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 15:04:00 +02:00
1bf04439f0 fix: setting the genre of the song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:51:30 +02:00
bab6aeb45d fix: removed double linebreaks from formated text, plaintext
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:26:19 +02:00
98afe5047d fix: wrong creation of source types
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:21:15 +02:00
017752c4d0 feat: better download output
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:10:32 +02:00
ea4c73158e fix: audio format is replaced completely
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:58:44 +02:00
0096dfe5cb feat: copying the downloaded music into the final locations
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:17:36 +02:00
bedd0fe819 fix: runtime errors
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 13:16:11 +02:00
ac6c513d56 draft: post process song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 12:30:54 +02:00
cc14253239 draft: streaming the audio 2024-05-15 12:18:08 +02:00
14f986a497 draft: rewrote sources 2024-05-15 11:44:39 +02:00
28 changed files with 446 additions and 443 deletions

View File

@@ -25,6 +25,7 @@
"encyclopaedia", "encyclopaedia",
"ENDC", "ENDC",
"Gitea", "Gitea",
"iframe",
"isrc", "isrc",
"levenshtein", "levenshtein",
"metallum", "metallum",

View File

@@ -6,8 +6,9 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Crystal F", "s: #a I'm in a coffin",
"d: 20", "0",
"d: 0",
] ]

View File

@@ -13,7 +13,7 @@ if __name__ == "__main__":
song_2 = Song( song_2 = Song(
title = "song", title = "song",
main_artist_list=[other_artist] artist_list=[other_artist]
) )
other_artist.name = "main_artist" other_artist.name = "main_artist"
@@ -21,5 +21,5 @@ if __name__ == "__main__":
song_1.merge(song_2) song_1.merge(song_2)
print("#" * 120) print("#" * 120)
print("main", *song_1.main_artist_collection) print("main", *song_1.artist_collection)
print("feat", *song_1.feature_artist_collection) print("feat", *song_1.feature_artist_collection)

View File

@@ -10,12 +10,12 @@ from ..objects import Target
LOGGER = logging_settings["codex_logger"] LOGGER = logging_settings["codex_logger"]
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None): def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
if not target.exists: if not target.exists:
LOGGER.warning(f"Target doesn't exist: {target.file_path}") LOGGER.warning(f"Target doesn't exist: {target.file_path}")
return return
interval_list = interval_list or [] skip_intervals = skip_intervals or []
bitrate_b = int(bitrate_kb / 1024) bitrate_b = int(bitrate_kb / 1024)
@@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
start = 0 start = 0
next_start = 0 next_start = 0
for end, next_start in interval_list: for end, next_start in skip_intervals:
aselect_list.append(f"between(t,{start},{end})") aselect_list.append(f"between(t,{start},{end})")
start = next_start start = next_start
aselect_list.append(f"gte(t,{next_start})") aselect_list.append(f"gte(t,{next_start})")

View File

@@ -178,8 +178,6 @@ class Downloader:
page_count = 0 page_count = 0
for option in self.current_results.formatted_generator(): for option in self.current_results.formatted_generator():
if isinstance(option, Option): if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object)
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r) print(r)
else: else:
@@ -228,7 +226,7 @@ class Downloader:
if album is not None: if album is not None:
song.album_collection.append(album) song.album_collection.append(album)
if artist is not None: if artist is not None:
song.main_artist_collection.append(artist) song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song) return Query(raw_query=query, music_object=song)
if album is not None: if album is not None:
@@ -319,7 +317,7 @@ class Downloader:
for database_object in data_objects: for database_object in data_objects:
r = self.pages.download( r = self.pages.download(
music_object=database_object, data_object=database_object,
genre=self.genre, genre=self.genre,
**kwargs **kwargs
) )
@@ -356,21 +354,23 @@ class Downloader:
command, query = _[0], ":".join(_[1:]) command, query = _[0], ":".join(_[1:])
do_search = "s" in command do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command do_download = "d" in command
do_merge = "m" in command do_merge = "m" in command
if do_search and do_download: if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and download at the same time.") raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search: if do_search:
self.search(":".join(input_str.split(":")[1:])) self.search(":".join(input_str.split(":")[1:]))
return False return False
def get_selected_objects(q: str):
if q.strip().lower() == "all":
return list(self.current_results)
indices = [] indices = []
for possible_index in query.split(","): for possible_index in q.split(","):
possible_index = possible_index.strip() possible_index = possible_index.strip()
if possible_index == "": if possible_index == "":
continue continue
@@ -386,7 +386,9 @@ class Downloader:
indices.append(i) indices.append(i)
selected_objects = [self.current_results[i] for i in indices] return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge: if do_merge:
old_selected_objects = selected_objects old_selected_objects = selected_objects
@@ -399,6 +401,13 @@ class Downloader:
selected_objects = [a] selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.print_current_options()
return False
if do_download: if do_download:
self.download(selected_objects) self.download(selected_objects)
return False return False

View File

@@ -6,6 +6,7 @@ from typing import List, Optional
from functools import lru_cache from functools import lru_cache
import logging import logging
from ..utils import output, BColors
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
@@ -204,9 +205,12 @@ class Cache:
for path in self._dir.iterdir(): for path in self._dir.iterdir():
if path.is_dir(): if path.is_dir():
for file in path.iterdir(): for file in path.iterdir():
output(f"Deleting file {file}", color=BColors.GREY)
file.unlink() file.unlink()
output(f"Deleting folder {path}", color=BColors.HEADER)
path.rmdir() path.rmdir()
else: else:
output(f"Deleting folder {path}", color=BColors.HEADER)
path.unlink() path.unlink()
self.cached_attributes.clear() self.cached_attributes.clear()

View File

@@ -317,7 +317,7 @@ class Connection:
name = kwargs.pop("description") name = kwargs.pop("description")
if progress > 0: if progress > 0:
headers = dict() if headers is None else headers headers = kwargs.get("headers", dict())
headers["Range"] = f"bytes={target.size}-" headers["Range"] = f"bytes={target.size}-"
r = self.request( r = self.request(
@@ -366,6 +366,7 @@ class Connection:
if retry: if retry:
self.LOGGER.warning(f"Retrying stream...") self.LOGGER.warning(f"Retrying stream...")
accepted_response_codes.add(206) accepted_response_codes.add(206)
stream_kwargs["progress"] = progress
return Connection.stream_into(**stream_kwargs) return Connection.stream_into(**stream_kwargs)
return DownloadResult() return DownloadResult()

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Set
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType

View File

@@ -2,6 +2,7 @@ from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict from collections import defaultdict
from pathlib import Path from pathlib import Path
import re import re
import logging
from . import FetchOptions, DownloadOptions from . import FetchOptions, DownloadOptions
from .results import SearchResults from .results import SearchResults
@@ -16,10 +17,12 @@ from ..objects import (
Artist, Artist,
Label, Label,
) )
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
@@ -68,6 +71,8 @@ if DEBUG_PAGES:
class Pages: class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions() self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions()
@@ -117,7 +122,9 @@ class Pages:
return data_object return data_object
source: Source source: Source
for source in data_object.source_collection.get_sources(): for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level) new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None: if new_data_object is not None:
data_object.merge(new_data_object) data_object.merge(new_data_object)
@@ -125,37 +132,28 @@ class Pages:
return data_object return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
page: Page = self._get_page_from_enum(source.source_type) if not source.has_page:
if page is None:
return None return None
# getting the appropriate function for the page and the object type source_type = source.page.get_source_type(source=source)
source_type = page.get_source_type(source) if source_type is None:
if not hasattr(page, fetch_map[source_type]): self.LOGGER.debug(f"Could not determine source type for {source}.")
return None return None
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched # fetching the data object and marking it as fetched
data_object: DataObject = func(source=source) data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url) data_object.mark_as_fetched(source.hash_url)
return data_object return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]: def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, SourceType.MANUAL) source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None: if source is None:
return None return None
return self.fetch_from_source(source=source) return self.fetch_from_source(source=source)
def is_downloadable(self, music_object: DataObject) -> bool:
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
audio_pages = self._audio_pages_set.intersection(_page_types)
return len(audio_pages) > 0
def _skip_object(self, data_object: DataObject) -> bool: def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album): if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
@@ -166,6 +164,7 @@ class Pages:
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult: def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object # fetch the given object
self.fetch_details(data_object) self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song) # fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False): if not kwargs.get("fetched_upwards", False):
@@ -188,7 +187,7 @@ class Pages:
# download all children # download all children
download_result: DownloadResult = DownloadResult() download_result: DownloadResult = DownloadResult()
for c in data_object.get_children(): for c in data_object.get_child_collections():
for d in c: for d in c:
if self._skip_object(d): if self._skip_object(d):
continue continue
@@ -205,7 +204,7 @@ class Pages:
self._download_song(data_object, naming={ self._download_song(data_object, naming={
"genre": [genre], "genre": [genre],
"audio_format": main_settings["audio_format"], "audio_format": [main_settings["audio_format"]],
}) })
return download_result return download_result
@@ -222,12 +221,7 @@ class Pages:
path_template = path_template.replace(f"{{{field}}}", naming[field][0]) path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return possible_parts return path_template
def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]:
pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()]
pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True)
return list(pages)
def _download_song(self, song: Song, naming: dict) -> DownloadOptions: def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
""" """
@@ -241,23 +235,22 @@ class Pages:
# manage the naming # manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming) naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_string) naming["song"].append(song.title_value)
naming["genre"].append(song.genre)
naming["isrc"].append(song.isrc) naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_string for a in song.album_collection) naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection) naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.main_artist_collection) naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection) naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection: for a in song.album_collection:
naming["label"].extend([l.title_string for l in a.label_collection]) naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings # removing duplicates from the naming, and process the strings
for key, value in naming.items(): for key, value in naming.items():
# https://stackoverflow.com/a/17016257 # https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(items)) naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets # manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
found_on_disc = False
song.target_collection.append(Target( song.target_collection.append(Target(
relative_to_music_dir=True, relative_to_music_dir=True,
@@ -267,24 +260,64 @@ class Pages:
) )
)) ))
for target in song.target_collection: for target in song.target_collection:
if target.exists(): if target.exists:
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
found_on_disc = True
r.found_on_disk += 1 r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp) target.copy_content(tmp)
else: else:
target.create_parent_directories() target.create_path()
output(f'- {target.file_path}', color=BColors.GREY) output(f'{target.file_path}', color=BColors.GREY)
# actually download # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"): used_source: Optional[Source] = None
r = page.download_song_to_target(song, tmp, r) skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, SourceType.MANUAL) source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None: if source is None:
raise UrlNotFoundException(url=url) raise UrlNotFoundException(url=url)

View File

@@ -2,7 +2,6 @@ from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass from dataclasses import dataclass
from ..objects import DatabaseObject from ..objects import DatabaseObject
from ..utils.enums.source import SourceType
from ..pages import Page, EncyclopaediaMetallum, Musify from ..pages import Page, EncyclopaediaMetallum, Musify

View File

@@ -115,13 +115,6 @@ class Collection(Generic[T]):
self._data.append(other) self._data.append(other)
other._inner._is_in_collection.add(self) other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items(): for attribute, a in self.sync_on_append.items():
# syncing two collections by reference # syncing two collections by reference
b = other.__getattribute__(attribute) b = other.__getattribute__(attribute)
@@ -141,6 +134,13 @@ class Collection(Generic[T]):
a.extend(b_data, **kwargs) a.extend(b_data, **kwargs)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
def append(self, other: Optional[T], **kwargs): def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
@@ -160,6 +160,7 @@ class Collection(Generic[T]):
object_trace(f"Appending {other.option_string} to {self}") object_trace(f"Appending {other.option_string} to {self}")
# switching collection in the case of push to # switching collection in the case of push to
for c in self.push_to: for c in self.push_to:
r = c._find_object(other) r = c._find_object(other)

View File

@@ -38,8 +38,13 @@ class FormattedText:
def markdown(self) -> str: def markdown(self) -> str:
return md(self.html).strip() return md(self.html).strip()
@property
def plain(self) -> str:
md = self.markdown
return md.replace("\n\n", "\n")
def __str__(self) -> str: def __str__(self) -> str:
return self.markdown return self.markdown
plaintext = markdown plaintext = plain

View File

@@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
@property @property
def metadata(self) -> Metadata: def metadata(self) -> Metadata:
return Metadata({ return Metadata({
id3Mapping.UNSYNCED_LYRICS: [self.text.markdown] id3Mapping.UNSYNCED_LYRICS: [self.text.plaintext]
}) })

View File

@@ -8,6 +8,7 @@ from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
from pathlib import Path from pathlib import Path
import inspect import inspect
from .source import SourceCollection
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
@@ -338,6 +339,10 @@ class OuterProxy:
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
@property
def title_value(self) -> str:
return str(self.__getattribute__(self.TITEL))
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"

View File

@@ -95,7 +95,7 @@ class Song(Base):
target_collection: Collection[Target] target_collection: Collection[Target]
lyrics_collection: Collection[Lyrics] lyrics_collection: Collection[Lyrics]
main_artist_collection: Collection[Artist] artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist] feature_artist_collection: Collection[Artist]
album_collection: Collection[Album] album_collection: Collection[Album]
@@ -107,8 +107,8 @@ class Song(Base):
"lyrics_collection": Collection, "lyrics_collection": Collection,
"artwork": Artwork, "artwork": Artwork,
"main_artist_collection": Collection,
"album_collection": Collection, "album_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection, "feature_artist_collection": Collection,
"title": lambda: None, "title": lambda: None,
@@ -129,7 +129,7 @@ class Song(Base):
source_list: List[Source] = None, source_list: List[Source] = None,
target_list: List[Target] = None, target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None, lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None, artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, album_list: List[Album] = None,
tracksort: int = 0, tracksort: int = 0,
@@ -141,27 +141,27 @@ class Song(Base):
Base.__init__(**real_kwargs) Base.__init__(**real_kwargs)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title" TITEL = "title"
def __init_collections__(self) -> None: def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.sync_on_append = { self.album_collection.sync_on_append = {
"artist_collection": self.main_artist_collection, "artist_collection": self.artist_collection,
} }
self.album_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
"song_collection": self, "song_collection": self,
} }
self.main_artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection "album_collection": self.album_collection
} }
self.feature_artist_collection.append_object_to_attribute = { self.feature_artist_collection.extend_object_to_attribute = {
"feature_song_collection": self "album_collection": self.album_collection
} }
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
return return
@@ -203,14 +203,14 @@ class Song(Base):
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3 # metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
metadata.merge_many([a.metadata for a in self.album_collection]) metadata.merge_many([a.metadata for a in self.album_collection])
metadata.merge_many([a.metadata for a in self.main_artist_collection]) metadata.merge_many([a.metadata for a in self.artist_collection])
metadata.merge_many([a.metadata for a in self.feature_artist_collection]) metadata.merge_many([a.metadata for a in self.feature_artist_collection])
metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection]) metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection])
return metadata return metadata
def get_artist_credits(self) -> str: def get_artist_credits(self) -> str:
main_artists = ", ".join([artist.name for artist in self.main_artist_collection]) main_artists = ", ".join([artist.name for artist in self.artist_collection])
feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection]) feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection])
if len(feature_artists) == 0: if len(feature_artists) == 0:
@@ -219,10 +219,11 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "song "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}") r += get_collection_string(self.feature_artist_collection, " feat. {}" if len(self.artist_collection) > 0 else " by {}")
return r return r
@property @property
@@ -237,11 +238,6 @@ class Song(Base):
return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}" return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}"
"""
All objects dependent on Album
"""
class Album(Base): class Album(Base):
title: str title: str
unified_title: str unified_title: str
@@ -255,8 +251,9 @@ class Album(Base):
source_collection: SourceCollection source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song] song_collection: Collection[Song]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -272,8 +269,10 @@ class Album(Base):
"notes": FormattedText, "notes": FormattedText,
"source_collection": SourceCollection, "source_collection": SourceCollection,
"artist_collection": Collection,
"song_collection": Collection, "song_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -306,15 +305,18 @@ class Album(Base):
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
def __init_collections__(self): def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.song_collection.append_object_to_attribute = { self.song_collection.append_object_to_attribute = {
"album_collection": self "album_collection": self
} }
self.song_collection.sync_on_append = { self.song_collection.sync_on_append = {
"main_artist_collection": self.artist_collection "artist_collection": self.artist_collection
} }
self.artist_collection.append_object_to_attribute = { self.artist_collection.append_object_to_attribute = {
"main_album_collection": self "album_collection": self
} }
self.artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection "label_collection": self.label_collection
@@ -368,8 +370,11 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "album "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
if len(self.artist_collection) <= 0:
r += get_collection_string(self.feature_artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0: if len(self.song_collection) > 0:
@@ -379,6 +384,7 @@ class Album(Base):
def _compile(self): def _compile(self):
self.analyze_implied_album_type() self.analyze_implied_album_type()
self.update_tracksort() self.update_tracksort()
self.fix_artist_collection()
def analyze_implied_album_type(self): def analyze_implied_album_type(self):
# if the song collection has only one song, it is reasonable to assume that it is a single # if the song collection has only one song, it is reasonable to assume that it is a single
@@ -420,6 +426,16 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0) tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i tracksort_map[i].tracksort = i
def fix_artist_collection(self):
"""
I add artists, that could only be feature artists to the feature artist collection.
They get automatically moved to main artist collection, if a matching artist exists in the main artist collection or is appended to it later on.
If I am not sure for any artist, I try to analyze the most common artist in the song collection of one album.
"""
# move all artists that are in all feature_artist_collections, of every song, to the artist_collection
pass
@property @property
def copyright(self) -> str: def copyright(self) -> str:
if self.date is None: if self.date is None:
@@ -464,8 +480,7 @@ class Artist(Base):
source_collection: SourceCollection source_collection: SourceCollection
contact_collection: Collection[Contact] contact_collection: Collection[Contact]
feature_song_collection: Collection[Song] album_collection: Collection[Album]
main_album_collection: Collection[Album]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -479,8 +494,7 @@ class Artist(Base):
"general_genre": lambda: "", "general_genre": lambda: "",
"source_collection": SourceCollection, "source_collection": SourceCollection,
"feature_song_collection": Collection, "album_collection": Collection,
"main_album_collection": Collection,
"contact_collection": Collection, "contact_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -501,7 +515,7 @@ class Artist(Base):
source_list: List[Source] = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, contact_list: List[Contact] = None,
feature_song_list: List[Song] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, album_list: List[Album] = None,
label_list: List[Label] = None, label_list: List[Label] = None,
**kwargs **kwargs
) -> None: ) -> None:
@@ -511,18 +525,14 @@ class Artist(Base):
Base.__init__(**real_kwargs) Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection") DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self): def __init_collections__(self):
self.feature_song_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
"feature_artist_collection": self "feature_artist_collection": self
} }
self.main_album_collection.append_object_to_attribute = {
"artist_collection": self
}
self.label_collection.append_object_to_attribute = { self.label_collection.append_object_to_attribute = {
"current_artist_collection": self "current_artist_collection": self
} }
@@ -530,14 +540,13 @@ class Artist(Base):
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
# this doesn't really make sense # this doesn't really make sense
# self.feature_song_collection.extend(object_list)
return return
if object_type is Artist: if object_type is Artist:
return return
if object_type is Album: if object_type is Album:
self.main_album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
if object_type is Label: if object_type is Label:
@@ -550,7 +559,7 @@ class Artist(Base):
def update_albumsort(self): def update_albumsort(self):
""" """
This updates the albumsort attributes, of the albums in This updates the albumsort attributes, of the albums in
`self.main_album_collection`, and sorts the albums, if possible. `self.album_collection`, and sorts the albums, if possible.
It is advised to only call this function, once all the albums are It is advised to only call this function, once all the albums are
added to the artist. added to the artist.
@@ -568,7 +577,7 @@ class Artist(Base):
# order albums in the previously defined section # order albums in the previously defined section
album: Album album: Album
for album in self.main_album_collection: for album in self.album_collection:
sections[type_section[album.album_type]].append(album) sections[type_section[album.album_type]].append(album)
def sort_section(_section: List[Album], last_albumsort: int) -> int: def sort_section(_section: List[Album], last_albumsort: int) -> int:
@@ -599,7 +608,7 @@ class Artist(Base):
album_list.extend(sections[section_index]) album_list.extend(sections[section_index])
# replace the old collection with the new one # replace the old collection with the new one
self.main_album_collection._data = album_list self.album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property @property
@@ -621,15 +630,14 @@ class Artist(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "artist "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value
if len(self.main_album_collection) > 0: if len(self.album_collection) > 0:
r += f" with {len(self.main_album_collection)} albums" r += f" with {len(self.album_collection)} albums"
if len(self.feature_song_collection) > 0:
r += f" featured in {len(self.feature_song_collection)} songs"
r += BColors.ENDC.value r += BColors.ENDC.value
return r return r
@@ -716,4 +724,4 @@ class Label(Base):
@property @property
def option_string(self): def option_string(self):
return OPTION_FOREGROUND.value + self.name + BColors.ENDC.value return "label " + OPTION_FOREGROUND.value + self.name + BColors.ENDC.value

View File

@@ -2,19 +2,31 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import cached_property from functools import cached_property
from ..utils import generate_id from ..utils import generate_id
from ..utils.enums import SourceType from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.config import youtube_settings from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata from .metadata import Mapping, Metadata
from .parents import OuterProxy if TYPE_CHECKING:
from .collection import Collection from ..pages.abstract import Page
@@ -30,10 +42,6 @@ class Source:
def __post_init__(self): def __post_init__(self):
self.referrer_page = self.referrer_page or self.source_type self.referrer_page = self.referrer_page or self.source_type
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod @classmethod
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]: def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
""" """
@@ -44,38 +52,50 @@ class Source:
url = parsed_url.geturl() url = parsed_url.geturl()
if "musify" in parsed_url.netloc: if "musify" in parsed_url.netloc:
return cls(SourceType.MUSIFY, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.MUSIFY, url, referrer_page=referrer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourceType.YOUTUBE, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"): if url.startswith("https://www.deezer"):
return cls(SourceType.DEEZER, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"): if url.startswith("https://open.spotify.com"):
return cls(SourceType.SPOTIFY, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url: if "bandcamp" in url:
return cls(SourceType.BANDCAMP, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed_url.netloc: if "wikipedia" in parsed_url.netloc:
return cls(SourceType.WIKIPEDIA, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"): if url.startswith("https://www.metal-archives.com/"):
return cls(SourceType.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once # the less important once
if url.startswith("https://www.facebook"): if url.startswith("https://www.facebook"):
return cls(SourceType.FACEBOOK, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"): if url.startswith("https://www.instagram"):
return cls(SourceType.INSTAGRAM, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"): if url.startswith("https://twitter"):
return cls(SourceType.TWITTER, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"): if url.startswith("https://myspace.com"):
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property @property
def hash_url(self) -> str: def hash_url(self) -> str:
@@ -99,27 +119,72 @@ class Source:
page_str = property(fget=lambda self: self.source_type.value) page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection: class SourceCollection:
__change_version__ = generate_id() __change_version__ = generate_id()
_indexed_sources: Dict[str, Source] _indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourceType, List[Source]] _sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list) self._sources_by_type = defaultdict(list)
self._indexed_sources = {} self._indexed_sources = {}
self.extend(data or []) self.extend(data or [])
def has_source_page(self, *source_pages: SourceType) -> bool: def source_types(
return any(source_page in self._page_to_source_list for source_page in source_pages) self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: Args:
if not len(source_pages): only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
source_pages = self.source_pages sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
for page in source_pages: Returns:
yield from self._page_to_source_list[page] Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._sources_by_type.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._sources_by_type[source_type]
def append(self, source: Source): def append(self, source: Source):
if source is None: if source is None:
@@ -135,7 +200,7 @@ class SourceCollection:
existing_source.__merge__(source) existing_source.__merge__(source)
source = existing_source source = existing_source
else: else:
self._page_to_source_list[source.source_type].append(source) self._sources_by_type[source.source_type].append(source)
changed = False changed = False
for key in source.indexing_values: for key in source.indexing_values:
@@ -156,10 +221,6 @@ class SourceCollection:
def __merge__(self, other: SourceCollection, **kwargs): def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other) self.extend(other)
@property
def source_pages(self) -> Iterable[SourceType]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property @property
def hash_url_list(self) -> List[str]: def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()] return [hash_url(source.url) for source in self.get_sources()]
@@ -170,7 +231,7 @@ class SourceCollection:
@property @property
def homepage_list(self) -> List[str]: def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages] return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]: def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources: for index in self._indexed_sources:

View File

@@ -49,15 +49,16 @@ class DownloadOptions:
class Page: class Page:
SOURCE_TYPE: SourceType SOURCE_TYPE: SourceType
LOGGER: LOGGER LOGGER: logging.Logger
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
cls.SOURCE_TYPE.register_page(cls)
cls.LOGGER = logging.getLogger(cls.__name__) cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls) return super().__new__(cls)
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None): def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions() self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions() self.fetch_options: FetchOptions = fetch_options or FetchOptions()
@@ -145,153 +146,7 @@ class Page:
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label() return Label()
def download( # to download stuff
self,
music_object: DatabaseObject,
genre: str,
) -> DownloadResult:
naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty:
continue
dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object)
return self._download(music_object, naming_dict)
def _download(
self,
music_object: DatabaseObject,
naming_dict: NamingDict,
**kwargs
) -> DownloadResult:
if isinstance(music_object, Song):
output(f"Downloading {music_object.option_string} to:", color=BColors.BOLD)
else:
output(f"Downloading {music_object.option_string}...", color=BColors.BOLD)
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
if not self.download_options.download_all and music_object.album_type in self.download_options.album_type_blacklist:
return DownloadResult()
if not (isinstance(music_object, Song) and self.NO_ADDITIONAL_DATA_FROM_SONG):
self.fetch_details(music_object=music_object, stop_at_level=1)
if isinstance(music_object, Album):
music_object.update_tracksort()
naming_dict.add_object(music_object)
if isinstance(music_object, Song):
return self._download_song(music_object, naming_dict)
download_result: DownloadResult = DownloadResult()
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(music_object, collection_name)
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy()))
return download_result
def _download_song(self, song: Song, naming_dict: NamingDict):
song.compile()
if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre
if song.genre is None:
song.genre = naming_dict["genre"]
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
new_target = Target(
relative_to_music_dir=True,
file_path=Path(
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
)
if song.target_collection.empty:
song.target_collection.append(new_target)
r = DownloadResult(1)
temp_target: Target = Target.temp(file_extension=main_settings["audio_format"])
found_on_disc = False
target: Target
for target in song.target_collection:
current_exists = target.exists
if current_exists:
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
target.copy_content(temp_target)
found_on_disc = True
r.found_on_disk += 1
r.add_target(target)
else:
output(f'- {target.file_path}', color=BColors.GREY)
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
skip_intervals = []
if not found_on_disc:
for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc="downloading")
if not r.is_fatal_error:
skip_intervals = self.get_skip_intervals(song, source)
break
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=skip_intervals,
found_on_disc=found_on_disc,
))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
if not found_on_disc or self.download_options.process_audio_if_found:
correct_codec(temp_target, interval_list=interval_list)
self.post_process_hook(song, temp_target)
if not found_on_disc or self.download_options.process_metadata_if_found:
write_metadata_to_target(song.metadata, temp_target, song)
r = DownloadResult()
target: Target
for target in song.target_collection:
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
return r
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return [] return []

View File

@@ -51,7 +51,6 @@ class BandcampTypes(Enum):
class Bandcamp(Page): class Bandcamp(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(
@@ -63,8 +62,7 @@ class Bandcamp(Page):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
parsed_url = urlparse(source.url) path = source.parsed_url.path.replace("/", "")
path = parsed_url.path.replace("/", "")
if path == "" or path.startswith("music"): if path == "" or path.startswith("music"):
return Artist return Artist
@@ -119,7 +117,7 @@ class Bandcamp(Page):
return Song( return Song(
title=clean_song_title(name, artist_name=data["band_name"]), title=clean_song_title(name, artist_name=data["band_name"]),
source_list=source_list, source_list=source_list,
main_artist_list=[ artist_list=[
Artist( Artist(
name=data["band_name"], name=data["band_name"],
source_list=[ source_list=[
@@ -239,7 +237,7 @@ class Bandcamp(Page):
html_music_grid = soup.find("ol", {"id": "music-grid"}) html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None: if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"): for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source)) artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})): for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"] data_blob = data_blob_soup["data-blob"]
@@ -248,7 +246,7 @@ class Bandcamp(Page):
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False) dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None: if data_blob is not None:
artist.main_album_collection.extend( artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url) self._parse_artist_data_blob(json.loads(data_blob), source.url)
) )
@@ -372,7 +370,7 @@ class Bandcamp(Page):
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])] source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)], )],
main_artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)], )],

View File

@@ -7,7 +7,7 @@ from urllib.parse import urlparse, urlencode
from ..connection import Connection from ..connection import Connection
from ..utils.config import logging_settings from ..utils.config import logging_settings
from .abstract import Page from .abstract import Page
from ..utils.enums.source import SourceType from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..objects import ( from ..objects import (
@@ -52,14 +52,14 @@ def _song_from_json(artist_html=None, album_html=None, release_type=None, title=
return Song( return Song(
title=title, title=title,
main_artist_list=[ artist_list=[
_artist_from_json(artist_html=artist_html) _artist_from_json(artist_html=artist_html)
], ],
album_list=[ album_list=[
_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html) _album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html)
], ],
source_list=[ source_list=[
Source(SourceType.ENCYCLOPAEDIA_METALLUM, song_id) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, song_id)
] ]
) )
@@ -85,7 +85,7 @@ def _artist_from_json(artist_html=None, genre=None, country=None) -> Artist:
return Artist( return Artist(
name=artist_name, name=artist_name,
source_list=[ source_list=[
Source(SourceType.ENCYCLOPAEDIA_METALLUM, artist_url) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, artist_url)
] ]
) )
@@ -105,7 +105,7 @@ def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Al
title=album_name, title=album_name,
album_type=album_type, album_type=album_type,
source_list=[ source_list=[
Source(SourceType.ENCYCLOPAEDIA_METALLUM, album_url) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, album_url)
], ],
artist_list=[ artist_list=[
_artist_from_json(artist_html=artist_html) _artist_from_json(artist_html=artist_html)
@@ -207,7 +207,7 @@ def create_grid(
class EncyclopaediaMetallum(Page): class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourceType.ENCYCLOPAEDIA_METALLUM SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"] LOGGER = logging_settings["metal_archives_logger"]
def __init__(self, **kwargs): def __init__(self, **kwargs):
@@ -266,7 +266,7 @@ class EncyclopaediaMetallum(Page):
song_title = song.title.strip() song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection] album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection] artist_titles = ["*"] if song.artist_collection.empty else [artist.name.strip() for artist in song.artist_collection]
search_results = [] search_results = []
@@ -663,7 +663,7 @@ class EncyclopaediaMetallum(Page):
artist.notes = band_notes artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id) discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography) artist.album_collection.extend(discography)
return artist return artist

View File

@@ -502,9 +502,18 @@ class Musify(Page):
for video_container in video_container_list: for video_container in video_container_list:
iframe_list: List[BeautifulSoup] = video_container.findAll("iframe") iframe_list: List[BeautifulSoup] = video_container.findAll("iframe")
for iframe in iframe_list: for iframe in iframe_list:
"""
the url could look like this
https://www.youtube.com/embed/sNObCkhzOYA?si=dNVgnZMBNVlNb0P_
"""
parsed_url = urlparse(iframe["src"])
path_parts = parsed_url.path.strip("/").split("/")
if path_parts[0] != "embed" or len(path_parts) < 2:
continue
source_list.append(Source( source_list.append(Source(
SourceType.YOUTUBE, ALL_SOURCE_TYPES.YOUTUBE,
iframe["src"], f"https://music.youtube.com/watch?v={path_parts[1]}",
referrer_page=self.SOURCE_TYPE referrer_page=self.SOURCE_TYPE
)) ))
@@ -1045,7 +1054,7 @@ class Musify(Page):
if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist: if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist:
continue continue
artist.main_album_collection.append(album) artist.album_collection.append(album)
def fetch_artist(self, source: Source, **kwargs) -> Artist: def fetch_artist(self, source: Source, **kwargs) -> Artist:
""" """

View File

@@ -41,8 +41,6 @@ class YouTube(SuperYouTube):
# CHANGE # CHANGE
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
NO_ADDITIONAL_DATA_FROM_SONG = False
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(
host=get_invidious_url(), host=get_invidious_url(),
@@ -145,7 +143,7 @@ class YouTube(SuperYouTube):
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}") self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)], )],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ), notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list artist_list=artist_list
), int(data["published"]) ), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
@@ -286,7 +284,7 @@ class YouTube(SuperYouTube):
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)") self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id) album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source]) return Artist(name=artist_name, album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
""" """

View File

@@ -58,6 +58,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
song.album_collection.extend(album_list) song.album_collection.extend(album_list)
return [song] return [song]
if len(album_list) == 1:
album = album_list[0]
album.artist_collection.extend(artist_list)
album.song_collection.extend(song_list)
return [album]
"""
if len(artist_list) == 1:
artist = artist_list[0]
artist.main_album_collection.extend(album_list)
return [artist]
"""
return results return results

View File

@@ -22,20 +22,22 @@ from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file from ...utils import dump_to_file
from ...objects import Source, DatabaseObject, ID3Timestamp, Artwork
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, DatabaseObject as DataObject,
Source, Source,
SourceType, FormattedText,
ID3Timestamp,
Artwork,
Artist,
Song, Song,
Album, Album,
Label, Label,
Target, Target,
Lyrics, Lyrics,
FormattedText
) )
from ...connection import Connection from ...connection import Connection
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...utils.enums.album import AlbumType from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
@@ -176,8 +178,7 @@ ALBUM_TYPE_MAP = {
class YoutubeMusic(SuperYouTube): class YoutubeMusic(SuperYouTube):
# CHANGE # CHANGE
SOURCE_TYPE = SourceType.YOUTUBE_MUSIC SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
LOGGER = logging_settings["youtube_music_logger"]
def __init__(self, *args, ydl_opts: dict = None, **kwargs): def __init__(self, *args, ydl_opts: dict = None, **kwargs):
self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection( self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection(
@@ -348,10 +349,10 @@ class YoutubeMusic(SuperYouTube):
default='{}' default='{}'
)) or {} )) or {}
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return super().get_source_type(source) return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]: def general_search(self, search_query: str) -> List[DataObject]:
search_query = search_query.strip() search_query = search_query.strip()
urlescaped_query: str = quote(search_query.strip().replace(" ", "+")) urlescaped_query: str = quote(search_query.strip().replace(" ", "+"))
@@ -548,6 +549,11 @@ class YoutubeMusic(SuperYouTube):
return album return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str: def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
"""
1. fetches the tabs of a song, to get the browse id
2. finds the browse id of the lyrics
3. fetches the lyrics with the browse id
"""
request_data = { request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}, "context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id, "videoId": video_id,
@@ -574,6 +580,7 @@ class YoutubeMusic(SuperYouTube):
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="") pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower(): if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None) browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
if browse_id is not None:
break break
if browse_id is None: if browse_id is None:
@@ -588,6 +595,8 @@ class YoutubeMusic(SuperYouTube):
}, },
name=f"fetch_song_lyrics_{video_id}.json" name=f"fetch_song_lyrics_{video_id}.json"
) )
if r is None:
return None
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False) dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
@@ -619,7 +628,7 @@ class YoutubeMusic(SuperYouTube):
Artist( Artist(
name=name, name=name,
source_list=[Source( source_list=[Source(
SourceType.YOUTUBE_MUSIC, self.SOURCE_TYPE,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)] )]
) for name in artist_names] ) for name in artist_names]
@@ -638,9 +647,9 @@ class YoutubeMusic(SuperYouTube):
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])), artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list, artist_list=artist_list,
source_list=[Source( source_list=[Source(
SourceType.YOUTUBE_MUSIC, self.SOURCE_TYPE,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}" f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
), source], ), source],
) )
@@ -718,7 +727,6 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url[source.url] = { self.download_values_by_url[source.url] = {
"url": _best_format.get("url"), "url": _best_format.get("url"),
"chunk_size": _best_format.get("downloader_options", {}).get("http_chunk_size", main_settings["chunk_size"]),
"headers": _best_format.get("http_headers", {}), "headers": _best_format.get("http_headers", {}),
} }

View File

@@ -19,7 +19,7 @@ config = Config((
You can use Audio formats which support ID3.2 and ID3.1, You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""), but you will have cleaner Metadata using ID3.2."""),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results. Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""), The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history. Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage. The further you choose to be able to go back, the higher the memory usage.

View File

@@ -14,10 +14,11 @@ class SourceType:
page_type: Type[Page] = None page_type: Type[Page] = None
page: Page = None page: Page = None
def register_page(self, page: Page):
self.page = page
def register_page(self, page_type: Type[Page]): def __hash__(self):
self.page_type = page return hash(self.name)
self.page = page_type()
@property @property
def has_page(self) -> bool: def has_page(self) -> bool:

View File

@@ -12,10 +12,10 @@ if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
__stage__ = os.getenv("STAGE", "prod") __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and False DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False

View File

@@ -24,7 +24,7 @@ class Query:
return [self.music_object.name] return [self.music_object.name]
if isinstance(self.music_object, Song): if isinstance(self.music_object, Song):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.main_artist_collection] return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]
if isinstance(self.music_object, Album): if isinstance(self.music_object, Album):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection] return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]

View File

@@ -3,78 +3,76 @@ import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase): class TestCollection(unittest.TestCase):
@staticmethod def test_song_contains_album(self):
def complicated_object() -> Artist: """
return Artist( Tests that every song contains the album it is added to in its album_collection
name="artist", """
country=Country.by_alpha_2("DE"),
main_album_list=[ a_1 = Album(
Album(
title="album", title="album",
song_list=[ song_list= [
Song( Song(title="song"),
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
] ]
) )
a_2 = a_1.song_collection[0].album_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_song_album_relation(self): def test_album_contains_song(self):
""" """
Tests that Tests that every album contains the song it is added to in its song_collection
album = album.any_song.one_album """
is the same object s_1 = Song(
title="song",
album_list=[
Album(title="album"),
]
)
s_2 = s_1.album_collection[0].song_collection[0]
self.assertTrue(s_1.id == s_2.id)
def test_auto_add_artist_to_album_feature_artist(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default
""" """
a = self.complicated_object().main_album_collection[0] a_1 = Artist(
b = a.song_collection[0].album_collection[0] name="artist",
c = a.song_collection[1].album_collection[0] album_list=[
d = b.song_collection[0].album_collection[0] Album(title="album")
e = d.song_collection[0].album_collection[0] ]
f = e.song_collection[0].album_collection[0] )
g = f.song_collection[0].album_collection[0] a_2 = a_1.album_collection[0].feature_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id) self.assertTrue(a_1.id == a_2.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title" def test_auto_add_artist_to_album_feature_artist_push(self):
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
""" """
Tests that Tests that every artist is added to the album's feature_artist_collection per default but pulled into the album's artist_collection if a merge exitst
artist = artist.any_album.any_song.one_artist
is the same object
""" """
a = self.complicated_object() a_1 = Artist(
b = a.main_album_collection[0].artist_collection[0] name="artist",
c = b.main_album_collection[0].artist_collection[0] album_list=[
d = c.main_album_collection[0].artist_collection[0] Album(
title="album",
artist_list=[
Artist(name="artist"),
]
)
]
)
a_2 = a_1.album_collection[0].artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
def test_artist_artist_relation(self): def test_artist_artist_relation(self):
artist = Artist( """
name="artist", Tests the proper syncing between album.artist_collection and song.artist_collection
main_album_list=[ """
Album(
album = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song"), Song(title="song"),
@@ -83,16 +81,20 @@ class TestCollection(unittest.TestCase):
Artist(name="artist"), Artist(name="artist"),
] ]
) )
] a_1 = album.artist_collection[0]
) a_2 = album.song_collection[0].artist_collection[0]
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id) self.assertTrue(a_1.id == a_2.id)
def test_artist_collection_sync(self): def test_artist_collection_sync(self):
"""
tests the actual implementation of the test above
"""
album_1 = Album( album_1 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -102,7 +104,7 @@ class TestCollection(unittest.TestCase):
album_2 = Album( album_2 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -111,17 +113,7 @@ class TestCollection(unittest.TestCase):
album_1.merge(album_2) album_1.merge(album_2)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection)) self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()