61 Commits

Author SHA1 Message Date
c131924577 Merge pull request 'fix/clean_feature_artists' (#38) from fix/clean_feature_artists into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Reviewed-on: #38
2024-05-21 12:08:04 +00:00
8cdb5c1f99 fix: tests were a mess and didn't properly test the functionality but random things that worked with implementation
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-21 13:52:20 +02:00
356ba658ce fix: lyrics enpoint could crash the whole program 2024-05-21 13:37:26 +02:00
000a6c0dba feat: added type identifier to option strings 2024-05-17 18:24:56 +02:00
83a3334f1a changed default value
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:55:58 +02:00
ab61ff7e9b feat: added the option to select all at options at the same time
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:37:49 +02:00
3cb35909d1 feat: added option to just fetch the objects in select 2024-05-17 13:31:46 +02:00
e87075a809 feat: changed syncing from event based to reference
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:39:12 +02:00
86e985acec fix: files don't contain id anymore
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:30:53 +02:00
a70a24d93e feat: minor adjustments
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:14:18 +02:00
2c1ac0f12d fix: wrong collection 2024-05-16 17:09:36 +02:00
897897dba2 feat: added recursive structure 2024-05-16 14:29:50 +02:00
adcf26b518 feat: renamed main album collection to album collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:10:00 +02:00
8ccc28daf8 draft: added feature artist collection attr 2024-05-16 14:09:34 +02:00
2b3f4d82d9 feat: renamed main_artist_collection to artist_collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:05:33 +02:00
41a91a6afe feat: removing dependence beteen artist.album and album.artist 2024-05-16 13:41:06 +02:00
82df96a193 Merge pull request 'feature/move_download_code_to_download' (#34) from feature/move_download_code_to_download into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #34
2024-05-15 15:24:30 +00:00
80ad2727de fix: stream retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-15 17:14:01 +02:00
19b83ce880 fix: saving streaming progress on retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 15:04:00 +02:00
1bf04439f0 fix: setting the genre of the song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:51:30 +02:00
bab6aeb45d fix: removed double linebreaks from formated text, plaintext
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:26:19 +02:00
98afe5047d fix: wrong creation of source types
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:21:15 +02:00
017752c4d0 feat: better download output
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:10:32 +02:00
ea4c73158e fix: audio format is replaced completely
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:58:44 +02:00
0096dfe5cb feat: copying the downloaded music into the final locations
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:17:36 +02:00
bedd0fe819 fix: runtime errors
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 13:16:11 +02:00
ac6c513d56 draft: post process song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 12:30:54 +02:00
cc14253239 draft: streaming the audio 2024-05-15 12:18:08 +02:00
14f986a497 draft: rewrote sources 2024-05-15 11:44:39 +02:00
da8887b279 draft: rewriting soure
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-14 15:18:17 +02:00
Hellow
bb32fc7647 draft: rewriting downloading
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-14 00:28:05 +02:00
Hellow
8c369d79e4 draft: rewriting downloading
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 21:51:32 +02:00
Hellow
b09d6f2691 draft: rewriting downloading 2024-05-13 21:45:12 +02:00
0e6fe8187a feat: fetch_from_url
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 18:09:11 +02:00
0343c11a62 feat: migrated fetch details and from source
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 18:03:20 +02:00
9769cf4033 Merge pull request 'fix/caching_signatures' (#32) from fix/caching_signatures into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #32
2024-05-13 15:15:37 +00:00
55024bd987 fix: key error
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-13 17:15:15 +02:00
d85498869d feat: tracksort and albumsort + some other stuff
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 14:22:33 +02:00
c3350b016d fix: timeout for yt music stream
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 13:39:57 +02:00
788103a68e fix: removed invalid stuff
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 13:28:54 +02:00
5179c64161 Merge branch 'experimental' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:53:39 +02:00
04405f88eb Merge branch 'fix/musify_scrapes_year_as_artist' into experimental 2024-05-10 17:52:11 +02:00
acd183c90e fix: bandcamp
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-10 17:39:30 +02:00
7186f06ce6 feat: improved interface
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:33:07 +02:00
6e354af0d1 feat: added proper settings
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:06:40 +02:00
155f239c8a feat: changed ids for audio tempfiles to random id instead of increment id
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:32:14 +02:00
36db651dfa fix: cleaning the song name deleted the song if the song name was the same as the artist name
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:25:11 +02:00
8426f6e2ea fix: filtered another year
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:20:22 +02:00
75d0a83d14 fix: changed dependency
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-09 10:57:55 +02:00
Hellow
2af577c0cd fix: removed empty objects
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-08 21:06:40 +02:00
Hellow
3780f05e58 feat: added launch.json 2024-05-08 16:48:27 +02:00
Hellow
a0305a7a6e fix: don't add year as artist 2024-05-08 16:47:56 +02:00
949583225a Merge pull request 'Correct duplicate values' (#22) from issue16 into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #22
2024-05-08 12:33:34 +00:00
4e0b005170 Merge branch 'experimental' into issue16
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-08 12:33:56 +02:00
e3d7ed8837 Merge pull request 'fix/musify_artist_spam' (#27) from fix/musify_artist_spam into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #27
2024-05-08 10:31:23 +00:00
e3e7aea959 fix for lyrics
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-08 12:27:56 +02:00
9d4e3e8545 fix: bounds get respected
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-08 12:23:16 +02:00
9c63e8e55a fix: correct collections
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-08 12:09:41 +02:00
12c0bf6b83 Merge pull request 'ci: make tags release to the music-kraken pypi package instead of music-kraken-stable' (#24) from ci/remove-stable-package into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #24
2024-05-07 21:17:11 +00:00
ac9a74138c ci: make tags release to the music-kraken pypi package instead of music-kraken-stable
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-07 16:07:45 +00:00
709c5ebaa8 Correct duplicate values
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-07 12:34:24 +02:00
40 changed files with 929 additions and 826 deletions

22
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python Debugger: Download script",
"type": "debugpy",
"request": "launch",
"program": "development/actual_donwload.py",
"console": "integratedTerminal"
}
]
}

View File

@@ -19,14 +19,20 @@
"albumsort", "albumsort",
"APIC", "APIC",
"Bandcamp", "Bandcamp",
"bitrate",
"DEEZER",
"dotenv", "dotenv",
"encyclopaedia", "encyclopaedia",
"ENDC", "ENDC",
"Gitea", "Gitea",
"iframe",
"isrc",
"levenshtein", "levenshtein",
"metallum", "metallum",
"MUSICBRAINZ",
"musify", "musify",
"OKBLUE", "OKBLUE",
"OKGREEN",
"pathvalidate", "pathvalidate",
"Referer", "Referer",
"sponsorblock", "sponsorblock",

View File

@@ -11,7 +11,6 @@ steps:
build-stable: build-stable:
image: python image: python
commands: commands:
- sed -i 's/name = "music-kraken"/name = "music-kraken-stable"/' pyproject.toml
- python -m pip install -r requirements-dev.txt - python -m pip install -r requirements-dev.txt
- python3 -m build - python3 -m build
environment: environment:

View File

@@ -6,8 +6,9 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Crystal F", "s: #a I'm in a coffin",
"10" "0",
"d: 0",
] ]

View File

@@ -13,7 +13,7 @@ if __name__ == "__main__":
song_2 = Song( song_2 = Song(
title = "song", title = "song",
main_artist_list=[other_artist] artist_list=[other_artist]
) )
other_artist.name = "main_artist" other_artist.name = "main_artist"
@@ -21,5 +21,5 @@ if __name__ == "__main__":
song_1.merge(song_2) song_1.merge(song_2)
print("#" * 120) print("#" * 120)
print("main", *song_1.main_artist_collection) print("main", *song_1.artist_collection)
print("feat", *song_1.feature_artist_collection) print("feat", *song_1.feature_artist_collection)

View File

@@ -10,12 +10,12 @@ from ..objects import Target
LOGGER = logging_settings["codex_logger"] LOGGER = logging_settings["codex_logger"]
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None): def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
if not target.exists: if not target.exists:
LOGGER.warning(f"Target doesn't exist: {target.file_path}") LOGGER.warning(f"Target doesn't exist: {target.file_path}")
return return
interval_list = interval_list or [] skip_intervals = skip_intervals or []
bitrate_b = int(bitrate_kb / 1024) bitrate_b = int(bitrate_kb / 1024)
@@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
start = 0 start = 0
next_start = 0 next_start = 0
for end, next_start in interval_list: for end, next_start in skip_intervals:
aselect_list.append(f"between(t,{start},{end})") aselect_list.append(f"between(t,{start},{end})")
start = next_start start = next_start
aselect_list.append(f"gte(t,{next_start})") aselect_list.append(f"gte(t,{next_start})")
@@ -47,7 +47,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
# run the ffmpeg command with a progressbar # run the ffmpeg command with a progressbar
ff = FfmpegProgress(ffmpeg_command) ff = FfmpegProgress(ffmpeg_command)
with tqdm(total=100, desc=f"removing {len(interval_list)} segments") as pbar: with tqdm(total=100, desc=f"processing") as pbar:
for progress in ff.run_command_with_progress(): for progress in ff.run_command_with_progress():
pbar.update(progress-pbar.n) pbar.update(progress-pbar.n)

View File

@@ -1,5 +1,5 @@
import mutagen import mutagen
from mutagen.id3 import ID3, Frame, APIC from mutagen.id3 import ID3, Frame, APIC, USLT
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import logging import logging
@@ -7,6 +7,7 @@ from PIL import Image
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata from ..objects import Song, Target, Metadata
from ..objects.metadata import Mapping
from ..connection import Connection from ..connection import Connection
LOGGER = logging_settings["tagging_logger"] LOGGER = logging_settings["tagging_logger"]
@@ -105,8 +106,11 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
data=converted_target.read_bytes(), data=converted_target.read_bytes(),
) )
) )
id3_object.frames.delall("USLT")
mutagen_file = mutagen.File(target.file_path) uslt_val = metadata.get_id3_value(Mapping.UNSYNCED_LYRICS)
id3_object.frames.add(
USLT(encoding=3, lang=u'eng', desc=u'desc', text=uslt_val)
)
id3_object.add_metadata(metadata) id3_object.add_metadata(metadata)
id3_object.save() id3_object.save()

View File

@@ -166,9 +166,9 @@ class Downloader:
self.genre = genre or get_genre() self.genre = genre or get_genre()
self.process_metadata_anyway = process_metadata_anyway self.process_metadata_anyway = process_metadata_anyway
print() output()
print(f"Downloading to: \"{self.genre}\"") output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER)
print() output()
def print_current_options(self): def print_current_options(self):
self.page_dict = dict() self.page_dict = dict()
@@ -178,8 +178,6 @@ class Downloader:
page_count = 0 page_count = 0
for option in self.current_results.formatted_generator(): for option in self.current_results.formatted_generator():
if isinstance(option, Option): if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object)
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r) print(r)
else: else:
@@ -228,7 +226,7 @@ class Downloader:
if album is not None: if album is not None:
song.album_collection.append(album) song.album_collection.append(album)
if artist is not None: if artist is not None:
song.main_artist_collection.append(artist) song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song) return Query(raw_query=query, music_object=song)
if album is not None: if album is not None:
@@ -312,16 +310,14 @@ class Downloader:
def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool:
output() output()
if len(data_objects) == 1: if len(data_objects) > 1:
output(f"Downloading {data_objects[0].option_string}...", color=BColors.BOLD) output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
else:
output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
_result_map: Dict[DatabaseObject, DownloadResult] = dict() _result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects: for database_object in data_objects:
r = self.pages.download( r = self.pages.download(
music_object=database_object, data_object=database_object,
genre=self.genre, genre=self.genre,
**kwargs **kwargs
) )
@@ -358,37 +354,41 @@ class Downloader:
command, query = _[0], ":".join(_[1:]) command, query = _[0], ":".join(_[1:])
do_search = "s" in command do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command do_download = "d" in command
do_merge = "m" in command do_merge = "m" in command
if do_search and do_download: if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and download at the same time.") raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search: if do_search:
self.search(":".join(input_str.split(":")[1:])) self.search(":".join(input_str.split(":")[1:]))
return False return False
indices = [] def get_selected_objects(q: str):
for possible_index in query.split(","): if q.strip().lower() == "all":
possible_index = possible_index.strip() return list(self.current_results)
if possible_index == "":
continue
i = 0 indices = []
if possible_index.isdigit(): for possible_index in q.split(","):
i = int(possible_index) possible_index = possible_index.strip()
else: if possible_index == "":
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.") continue
if i < 0 and i >= len(self.current_results): i = 0
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results)}.") try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
indices.append(i) if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
selected_objects = [self.current_results[i] for i in indices] indices.append(i)
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge: if do_merge:
old_selected_objects = selected_objects old_selected_objects = selected_objects
@@ -401,6 +401,13 @@ class Downloader:
selected_objects = [a] selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.print_current_options()
return False
if do_download: if do_download:
self.download(selected_objects) self.download(selected_objects)
return False return False

View File

@@ -317,7 +317,7 @@ class Connection:
name = kwargs.pop("description") name = kwargs.pop("description")
if progress > 0: if progress > 0:
headers = dict() if headers is None else headers headers = kwargs.get("headers", dict())
headers["Range"] = f"bytes={target.size}-" headers["Range"] = f"bytes={target.size}-"
r = self.request( r = self.request(
@@ -366,6 +366,7 @@ class Connection:
if retry: if retry:
self.LOGGER.warning(f"Retrying stream...") self.LOGGER.warning(f"Retrying stream...")
accepted_response_codes.add(206) accepted_response_codes.add(206)
stream_kwargs["progress"] = progress
return Connection.stream_into(**stream_kwargs) return Connection.stream_into(**stream_kwargs)
return DownloadResult() return DownloadResult()

View File

@@ -0,0 +1,21 @@
from dataclasses import dataclass, field
from typing import Set
from ..utils.config import main_settings
from ..utils.enums.album import AlbumType
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
download_again_if_found: bool = False
process_audio_if_found: bool = False
process_metadata_if_found: bool = True

View File

@@ -1,12 +1,32 @@
from typing import Tuple, Type, Dict, Set from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict
from pathlib import Path
import re
import logging
from . import FetchOptions, DownloadOptions
from .results import SearchResults from .results import SearchResults
from ..objects import DatabaseObject, Source from ..objects import (
DatabaseObject as DataObject,
from ..utils.config import youtube_settings Collection,
from ..utils.enums.source import SourcePages Target,
Source,
Options,
Song,
Album,
Artist,
Label,
)
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES from ..utils.shared import DEBUG_PAGES
@@ -34,6 +54,13 @@ SHADY_PAGES: Set[Type[Page]] = {
Musify, Musify,
} }
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
if DEBUG_PAGES: if DEBUG_PAGES:
DEBUGGING_PAGE = Bandcamp DEBUGGING_PAGE = Bandcamp
print(f"Only downloading from page {DEBUGGING_PAGE}.") print(f"Only downloading from page {DEBUGGING_PAGE}.")
@@ -43,10 +70,15 @@ if DEBUG_PAGES:
class Pages: class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False) -> None: def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances # initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict() self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourcePages, Type[Page]] = dict() self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set() exclude_pages = exclude_pages if exclude_pages is not None else set()
@@ -66,9 +98,14 @@ class Pages:
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set) self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
for page_type in self.pages: for page_type in self.pages:
self._page_instances[page_type] = page_type() self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page:
if source_page not in self._source_to_page:
return None
return self._page_instances[self._source_to_page[source_page]]
def search(self, query: Query) -> SearchResults: def search(self, query: Query) -> SearchResults:
result = SearchResults() result = SearchResults()
@@ -80,54 +117,211 @@ class Pages:
return result return result
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1) -> DatabaseObject: def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS): if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return music_object return data_object
for source_page in music_object.source_collection.source_pages: source: Source
if source_page not in self._source_to_page: for source in data_object.source_collection.get_sources(source_type_sorting={
continue "only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
page_type = self._source_to_page[source_page] return data_object
if page_type in self._pages_set: def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
music_object.merge(self._page_instances[page_type].fetch_details(music_object=music_object, stop_at_level=stop_at_level)) if not source.has_page:
return None
return music_object source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
def is_downloadable(self, music_object: DatabaseObject) -> bool: func = getattr(source.page, fetch_map[source_type])
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
audio_pages = self._audio_pages_set.intersection(_page_types) # fetching the data object and marking it as fetched
return len(audio_pages) > 0 data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult: def fetch_from_url(self, url: str) -> Optional[DataObject]:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS): source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.") if source is None:
return None
self.fetch_details(music_object) return self.fetch_from_source(source=source)
_page_types = set(self._source_to_page) def _skip_object(self, data_object: DataObject) -> bool:
for src in music_object.source_collection.source_pages: if isinstance(data_object, Album):
if src in self._source_to_page: if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
_page_types.add(self._source_to_page[src]) return True
audio_pages = self._audio_pages_set.intersection(_page_types) return False
for download_page in audio_pages: def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
return self._page_instances[download_page].download(music_object=music_object, genre=genre, download_all=download_all, process_metadata_anyway=process_metadata_anyway) # fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
return DownloadResult(error_message=f"No audio source has been found for {music_object}.") # fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DatabaseObject]: while len(to_fetch) > 0:
source = Source.match_url(url, SourcePages.MANUAL) new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None: if source is None:
raise UrlNotFoundException(url=url) raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.page_enum] _actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level) return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@@ -2,7 +2,6 @@ from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass from dataclasses import dataclass
from ..objects import DatabaseObject from ..objects import DatabaseObject
from ..utils.enums.source import SourcePages
from ..pages import Page, EncyclopaediaMetallum, Musify from ..pages import Page, EncyclopaediaMetallum, Musify
@@ -28,6 +27,9 @@ class Results:
self._by_index = dict() self._by_index = dict()
self._page_by_index = dict() self._page_by_index = dict()
def __len__(self) -> int:
return max(self._by_index.keys())
def __getitem__(self, index: int): def __getitem__(self, index: int):
return self._by_index[index] return self._by_index[index]

View File

@@ -3,7 +3,7 @@ from .option import Options
from .metadata import Metadata, Mapping as ID3Mapping, ID3Timestamp from .metadata import Metadata, Mapping as ID3Mapping, ID3Timestamp
from .source import Source, SourcePages, SourceTypes from .source import Source, SourceType
from .song import ( from .song import (
Song, Song,
@@ -24,4 +24,4 @@ from .parents import OuterProxy
from .artwork import Artwork from .artwork import Artwork
DatabaseObject = TypeVar('T', bound=OuterProxy) DatabaseObject = OuterProxy

View File

@@ -2,6 +2,8 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
import copy
from .parents import OuterProxy from .parents import OuterProxy
from ..utils import object_trace from ..utils import object_trace
from ..utils import output, BColors from ..utils import output, BColors
@@ -47,8 +49,15 @@ class Collection(Generic[T]):
self.extend(data) self.extend(data)
def __hash__(self) -> int:
return id(self)
@property
def collection_names(self) -> List[str]:
return list(set(self._collection_for.values()))
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})" return f"Collection({' | '.join(self.collection_names)} {id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs): def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap: if not no_unmap:
@@ -104,13 +113,7 @@ class Collection(Generic[T]):
""" """
self._data.append(other) self._data.append(other)
other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items(): for attribute, a in self.sync_on_append.items():
# syncing two collections by reference # syncing two collections by reference
@@ -131,6 +134,13 @@ class Collection(Generic[T]):
a.extend(b_data, **kwargs) a.extend(b_data, **kwargs)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
def append(self, other: Optional[T], **kwargs): def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
@@ -143,37 +153,36 @@ class Collection(Generic[T]):
if other is None: if other is None:
return return
if not other._inner._has_data:
return
if other.id in self._indexed_from_id: if other.id in self._indexed_from_id:
return return
object_trace(f"Appending {other.option_string} to {self}") object_trace(f"Appending {other.option_string} to {self}")
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
other.merge(r, **kwargs)
c.remove(r, existing=r, **kwargs)
break
existing_object = self._find_object(other)
# switching collection in the case of push to # switching collection in the case of push to
for c in self.push_to: for c in self.push_to:
r = c._find_object(other) r = c._find_object(other)
if r is not None: if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t") # output("found push to", r, other, c, self, color=BColors.RED, sep="\t")
if existing_object is not None:
self.remove(existing_object)
return c.append(other, **kwargs) return c.append(other, **kwargs)
if existing_object is None: for c in self.pull_from:
r = c._find_object(other)
if r is not None:
# output("found pull from", r, other, c, self, color=BColors.RED, sep="\t")
c.remove(r, existing=r, **kwargs)
existing = self._find_object(other)
if existing is None:
self._append_new_object(other, **kwargs) self._append_new_object(other, **kwargs)
else: else:
existing_object.merge(other, **kwargs) existing.merge(other, **kwargs)
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs): def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, remove_from_other_collection=True, **kwargs):
other: T
for other in other_list: for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None) existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None: if existing is None:
@@ -181,14 +190,13 @@ class Collection(Generic[T]):
raise ValueError(f"Object {other} not found in {self}") raise ValueError(f"Object {other} not found in {self}")
return other return other
for collection_attribute, generator in self.extend_object_to_attribute.items(): if remove_from_other_collection:
other.__getattribute__(collection_attribute).remove(*generator, silent=True, **kwargs) for c in copy.copy(other._inner._is_in_collection):
c.remove(other, silent=True, remove_from_other_collection=False, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items(): other._inner._is_in_collection = set()
other.__getattribute__(attribute).remove(new_object, silent=True, **kwargs) else:
self._data.remove(existing)
self._data.remove(existing) self._unmap_element(existing)
self._unmap_element(existing)
def contains(self, __object: T) -> bool: def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None return self._find_object(__object) is not None

View File

@@ -32,14 +32,19 @@ class FormattedText:
if self.is_empty and other.is_empty: if self.is_empty and other.is_empty:
return True return True
return self.doc == other.doc return self.html == other.html
@property @property
def markdown(self) -> str: def markdown(self) -> str:
return md(self.html).strip() return md(self.html).strip()
@property
def plain(self) -> str:
md = self.markdown
return md.replace("\n\n", "\n")
def __str__(self) -> str: def __str__(self) -> str:
return self.markdown return self.markdown
plaintext = markdown plaintext = plain

View File

@@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
@property @property
def metadata(self) -> Metadata: def metadata(self) -> Metadata:
return Metadata({ return Metadata({
id3Mapping.UNSYNCED_LYRICS: [self.text.markdown] id3Mapping.UNSYNCED_LYRICS: [self.text.plaintext]
}) })

View File

@@ -92,7 +92,7 @@ class Mapping(Enum):
key = attribute.value key = attribute.value
if key[0] == 'T': if key[0] == 'T':
# a text fiel # a text field
return cls.get_text_instance(key, value) return cls.get_text_instance(key, value)
if key[0] == "W": if key[0] == "W":
# an url field # an url field
@@ -355,7 +355,12 @@ class Metadata:
return None return None
list_data = self.id3_dict[field] list_data = self.id3_dict[field]
#correct duplications
correct_list_data = list()
for data in list_data:
if data not in correct_list_data:
correct_list_data.append(data)
list_data = correct_list_data
# convert for example the time objects to timestamps # convert for example the time objects to timestamps
for i, element in enumerate(list_data): for i, element in enumerate(list_data):
# for performances sake I don't do other checks if it is already the right type # for performances sake I don't do other checks if it is already the right type
@@ -395,6 +400,5 @@ class Metadata:
""" """
# set the tagging timestamp to the current time # set the tagging timestamp to the current time
self.__setitem__(Mapping.TAGGING_TIME, [ID3Timestamp.now()]) self.__setitem__(Mapping.TAGGING_TIME, [ID3Timestamp.now()])
for field in self.id3_dict: for field in self.id3_dict:
yield self.get_mutagen_object(field) yield self.get_mutagen_object(field)

View File

@@ -8,6 +8,7 @@ from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
from pathlib import Path from pathlib import Path
import inspect import inspect
from .source import SourceCollection
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
@@ -29,12 +30,17 @@ class InnerData:
""" """
_refers_to_instances: set = None _refers_to_instances: set = None
_is_in_collection: set = None
_has_data: bool = False
""" """
Attribute versions keep track, of if the attribute has been changed. Attribute versions keep track, of if the attribute has been changed.
""" """
def __init__(self, object_type, **kwargs): def __init__(self, object_type, **kwargs):
self._refers_to_instances = set() self._refers_to_instances = set()
self._is_in_collection = set()
self._fetched_from: dict = {} self._fetched_from: dict = {}
# initialize the default values # initialize the default values
@@ -48,6 +54,16 @@ class InnerData:
self.__setattr__(key, value) self.__setattr__(key, value)
if self._has_data:
continue
def __setattr__(self, key: str, value):
if self._has_data or not hasattr(self, "_default_values"):
return super().__setattr__(key, value)
super().__setattr__("_has_data", not (key in self._default_values and self._default_values[key] == value))
return super().__setattr__(key, value)
def __hash__(self): def __hash__(self):
return self.id return self.id
@@ -58,6 +74,7 @@ class InnerData:
""" """
self._fetched_from.update(__other._fetched_from) self._fetched_from.update(__other._fetched_from)
self._is_in_collection.update(__other._is_in_collection)
for key, value in __other.__dict__.copy().items(): for key, value in __other.__dict__.copy().items():
if key.startswith("_"): if key.startswith("_"):
@@ -83,7 +100,9 @@ class OuterProxy:
Wraps the inner data, and provides apis, to naturally access those values. Wraps the inner data, and provides apis, to naturally access those values.
""" """
_default_factories: dict = {} source_collection: SourceCollection
_default_factories: dict = {"source_collection": SourceCollection}
_outer_attribute: Set[str] = {"options", "metadata", "indexing_values", "option_string"} _outer_attribute: Set[str] = {"options", "metadata", "indexing_values", "option_string"}
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
@@ -187,6 +206,7 @@ class OuterProxy:
if __other is None: if __other is None:
return return
a_id = self.id
a = self a = self
b = __other b = __other
@@ -209,6 +229,8 @@ class OuterProxy:
a._inner.__merge__(old_inner, **kwargs) a._inner.__merge__(old_inner, **kwargs)
del old_inner del old_inner
self.id = a_id
def __merge__(self, __other: Optional[OuterProxy], **kwargs): def __merge__(self, __other: Optional[OuterProxy], **kwargs):
self.merge(__other, **kwargs) self.merge(__other, **kwargs)
@@ -285,10 +307,49 @@ class OuterProxy:
return r return r
@property
def root_collections(self) -> List[Collection]:
if len(self.UPWARDS_COLLECTION_STRING_ATTRIBUTES) == 0:
return [self]
r = []
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
def _compile(self, **kwargs):
pass
def compile(self, from_root=False, **kwargs):
# compile from the root
if not from_root:
for c in self.root_collections:
c.compile(from_root=True, **kwargs)
return
self._compile(**kwargs)
for c_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
for c in self.__getattribute__(c_attribute):
c.compile(from_root=True, **kwargs)
TITEL = "id" TITEL = "id"
@property @property
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
@property
def title_value(self) -> str:
return str(self.__getattribute__(self.TITEL))
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"
def get_child_collections(self):
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
yield self.__getattribute__(collection_string_attribute)
def get_parent_collections(self):
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
yield self.__getattribute__(collection_string_attribute)

View File

@@ -95,7 +95,7 @@ class Song(Base):
target_collection: Collection[Target] target_collection: Collection[Target]
lyrics_collection: Collection[Lyrics] lyrics_collection: Collection[Lyrics]
main_artist_collection: Collection[Artist] artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist] feature_artist_collection: Collection[Artist]
album_collection: Collection[Album] album_collection: Collection[Album]
@@ -107,11 +107,11 @@ class Song(Base):
"lyrics_collection": Collection, "lyrics_collection": Collection,
"artwork": Artwork, "artwork": Artwork,
"main_artist_collection": Collection,
"album_collection": Collection, "album_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection, "feature_artist_collection": Collection,
"title": lambda: "", "title": lambda: None,
"unified_title": lambda: None, "unified_title": lambda: None,
"isrc": lambda: None, "isrc": lambda: None,
"genre": lambda: None, "genre": lambda: None,
@@ -129,7 +129,7 @@ class Song(Base):
source_list: List[Source] = None, source_list: List[Source] = None,
target_list: List[Target] = None, target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None, lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None, artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, album_list: List[Album] = None,
tracksort: int = 0, tracksort: int = 0,
@@ -141,27 +141,27 @@ class Song(Base):
Base.__init__(**real_kwargs) Base.__init__(**real_kwargs)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title" TITEL = "title"
def __init_collections__(self) -> None: def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.sync_on_append = { self.album_collection.sync_on_append = {
"artist_collection": self.main_artist_collection, "artist_collection": self.artist_collection,
} }
self.album_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
"song_collection": self, "song_collection": self,
} }
self.main_artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection "album_collection": self.album_collection
} }
self.feature_artist_collection.append_object_to_attribute = { self.feature_artist_collection.extend_object_to_attribute = {
"feature_song_collection": self "album_collection": self.album_collection
} }
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
return return
@@ -203,14 +203,14 @@ class Song(Base):
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3 # metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
metadata.merge_many([a.metadata for a in self.album_collection]) metadata.merge_many([a.metadata for a in self.album_collection])
metadata.merge_many([a.metadata for a in self.main_artist_collection]) metadata.merge_many([a.metadata for a in self.artist_collection])
metadata.merge_many([a.metadata for a in self.feature_artist_collection]) metadata.merge_many([a.metadata for a in self.feature_artist_collection])
metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection]) metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection])
return metadata return metadata
def get_artist_credits(self) -> str: def get_artist_credits(self) -> str:
main_artists = ", ".join([artist.name for artist in self.main_artist_collection]) main_artists = ", ".join([artist.name for artist in self.artist_collection])
feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection]) feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection])
if len(feature_artists) == 0: if len(feature_artists) == 0:
@@ -219,20 +219,13 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "song "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}" if not self.main_artist_collection.empty or True else " by {}") r += get_collection_string(self.feature_artist_collection, " feat. {}" if len(self.artist_collection) > 0 else " by {}")
return r return r
@property
def options(self) -> List[P]:
options = self.main_artist_collection.shallow_list
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return options
@property @property
def tracksort_str(self) -> str: def tracksort_str(self) -> str:
""" """
@@ -245,11 +238,6 @@ class Song(Base):
return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}" return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}"
"""
All objects dependent on Album
"""
class Album(Base): class Album(Base):
title: str title: str
unified_title: str unified_title: str
@@ -260,12 +248,12 @@ class Album(Base):
barcode: str barcode: str
albumsort: int albumsort: int
notes: FormattedText notes: FormattedText
artwork: Artwork
source_collection: SourceCollection source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song] song_collection: Collection[Song]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -279,11 +267,12 @@ class Album(Base):
"language": lambda: Language.by_alpha_2("en"), "language": lambda: Language.by_alpha_2("en"),
"date": ID3Timestamp, "date": ID3Timestamp,
"notes": FormattedText, "notes": FormattedText,
"artwork": Artwork,
"source_collection": SourceCollection, "source_collection": SourceCollection,
"artist_collection": Collection,
"song_collection": Collection, "song_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -316,15 +305,18 @@ class Album(Base):
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
def __init_collections__(self): def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.song_collection.append_object_to_attribute = { self.song_collection.append_object_to_attribute = {
"album_collection": self "album_collection": self
} }
self.song_collection.sync_on_append = { self.song_collection.sync_on_append = {
"main_artist_collection": self.artist_collection "artist_collection": self.artist_collection
} }
self.artist_collection.append_object_to_attribute = { self.artist_collection.append_object_to_attribute = {
"main_album_collection": self "album_collection": self
} }
self.artist_collection.extend_object_to_attribute = { self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection "label_collection": self.label_collection
@@ -378,14 +370,37 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "album "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
if len(self.artist_collection) <= 0:
r += get_collection_string(self.feature_artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0: if len(self.song_collection) > 0:
r += f" with {len(self.song_collection)} songs" r += f" with {len(self.song_collection)} songs"
return r return r
def _compile(self):
self.analyze_implied_album_type()
self.update_tracksort()
self.fix_artist_collection()
def analyze_implied_album_type(self):
# if the song collection has only one song, it is reasonable to assume that it is a single
if len(self.song_collection) == 1:
self.album_type = AlbumType.SINGLE
return
# if the album already has an album type, we don't need to do anything
if self.album_type is not AlbumType.OTHER:
return
# for information on EP's I looked at https://www.reddit.com/r/WeAreTheMusicMakers/comments/a354ql/whats_the_cutoff_length_between_ep_and_album/
if len(self.song_collection) < 9:
self.album_type = AlbumType.EP
return
def update_tracksort(self): def update_tracksort(self):
""" """
This updates the tracksort attributes, of the songs in This updates the tracksort attributes, of the songs in
@@ -411,6 +426,16 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0) tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i tracksort_map[i].tracksort = i
def fix_artist_collection(self):
"""
I add artists, that could only be feature artists to the feature artist collection.
They get automatically moved to main artist collection, if a matching artist exists in the main artist collection or is appended to it later on.
If I am not sure for any artist, I try to analyze the most common artist in the song collection of one album.
"""
# move all artists that are in all feature_artist_collections, of every song, to the artist_collection
pass
@property @property
def copyright(self) -> str: def copyright(self) -> str:
if self.date is None: if self.date is None:
@@ -455,8 +480,7 @@ class Artist(Base):
source_collection: SourceCollection source_collection: SourceCollection
contact_collection: Collection[Contact] contact_collection: Collection[Contact]
feature_song_collection: Collection[Song] album_collection: Collection[Album]
main_album_collection: Collection[Album]
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
@@ -470,8 +494,7 @@ class Artist(Base):
"general_genre": lambda: "", "general_genre": lambda: "",
"source_collection": SourceCollection, "source_collection": SourceCollection,
"feature_song_collection": Collection, "album_collection": Collection,
"main_album_collection": Collection,
"contact_collection": Collection, "contact_collection": Collection,
"label_collection": Collection, "label_collection": Collection,
} }
@@ -492,7 +515,7 @@ class Artist(Base):
source_list: List[Source] = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, contact_list: List[Contact] = None,
feature_song_list: List[Song] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, album_list: List[Album] = None,
label_list: List[Label] = None, label_list: List[Label] = None,
**kwargs **kwargs
) -> None: ) -> None:
@@ -502,18 +525,14 @@ class Artist(Base):
Base.__init__(**real_kwargs) Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection") DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self): def __init_collections__(self):
self.feature_song_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
"feature_artist_collection": self "feature_artist_collection": self
} }
self.main_album_collection.append_object_to_attribute = {
"artist_collection": self
}
self.label_collection.append_object_to_attribute = { self.label_collection.append_object_to_attribute = {
"current_artist_collection": self "current_artist_collection": self
} }
@@ -521,33 +540,32 @@ class Artist(Base):
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
# this doesn't really make sense # this doesn't really make sense
# self.feature_song_collection.extend(object_list)
return return
if object_type is Artist: if object_type is Artist:
return return
if object_type is Album: if object_type is Album:
self.main_album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
if object_type is Label: if object_type is Label:
self.label_collection.extend(object_list) self.label_collection.extend(object_list)
return return
def _compile(self):
self.update_albumsort()
def update_albumsort(self): def update_albumsort(self):
""" """
This updates the albumsort attributes, of the albums in This updates the albumsort attributes, of the albums in
`self.main_album_collection`, and sorts the albums, if possible. `self.album_collection`, and sorts the albums, if possible.
It is advised to only call this function, once all the albums are It is advised to only call this function, once all the albums are
added to the artist. added to the artist.
:return: :return:
""" """
if len(self.main_album_collection) <= 0:
return
type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, { type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, {
AlbumType.OTHER: 0, # if I don't know it, I add it to the first section AlbumType.OTHER: 0, # if I don't know it, I add it to the first section
AlbumType.STUDIO_ALBUM: 0, AlbumType.STUDIO_ALBUM: 0,
@@ -559,7 +577,7 @@ class Artist(Base):
# order albums in the previously defined section # order albums in the previously defined section
album: Album album: Album
for album in self.main_album_collection: for album in self.album_collection:
sections[type_section[album.album_type]].append(album) sections[type_section[album.album_type]].append(album)
def sort_section(_section: List[Album], last_albumsort: int) -> int: def sort_section(_section: List[Album], last_albumsort: int) -> int:
@@ -590,7 +608,7 @@ class Artist(Base):
album_list.extend(sections[section_index]) album_list.extend(sections[section_index])
# replace the old collection with the new one # replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album) self.album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection") INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property @property
@@ -612,15 +630,14 @@ class Artist(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = "artist "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value
if len(self.main_album_collection) > 0: if len(self.album_collection) > 0:
r += f" with {len(self.main_album_collection)} albums" r += f" with {len(self.album_collection)} albums"
if len(self.feature_song_collection) > 0:
r += f" featured in {len(self.feature_song_collection)} songs"
r += BColors.ENDC.value r += BColors.ENDC.value
return r return r
@@ -707,4 +724,4 @@ class Label(Base):
@property @property
def option_string(self): def option_string(self):
return OPTION_FOREGROUND.value + self.name + BColors.ENDC.value return "label " + OPTION_FOREGROUND.value + self.name + BColors.ENDC.value

View File

@@ -2,40 +2,48 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import cached_property from functools import cached_property
from ..utils import generate_id from ..utils import generate_id
from ..utils.enums.source import SourcePages, SourceTypes from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.config import youtube_settings from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata from .metadata import Mapping, Metadata
from .parents import OuterProxy if TYPE_CHECKING:
from .collection import Collection from ..pages.abstract import Page
@dataclass @dataclass
class Source: class Source:
page_enum: SourcePages source_type: SourceType
url: str url: str
referrer_page: SourcePages = None referrer_page: SourceType = None
audio_url: Optional[str] = None audio_url: Optional[str] = None
additional_data: dict = field(default_factory=dict) additional_data: dict = field(default_factory=dict)
def __post_init__(self): def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum self.referrer_page = self.referrer_page or self.source_type
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod @classmethod
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]: def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
""" """
this shouldn't be used, unless you are not certain what the source is for this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient the reason is that it is more inefficient
@@ -44,38 +52,50 @@ class Source:
url = parsed_url.geturl() url = parsed_url.geturl()
if "musify" in parsed_url.netloc: if "musify" in parsed_url.netloc:
return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.MUSIFY, url, referrer_page=referrer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"): if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"): if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url: if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed_url.netloc: if "wikipedia" in parsed_url.netloc:
return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"): if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once # the less important once
if url.startswith("https://www.facebook"): if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"): if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"): if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"): if url.startswith("https://myspace.com"):
return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page) return cls(ALL_SOURCE_TYPES.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property @property
def hash_url(self) -> str: def hash_url(self) -> str:
@@ -89,37 +109,82 @@ class Source:
return r return r
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})" return f"Src({self.source_type.value}: {shorten_display_url(self.url)})"
def __merge__(self, other: Source, **kwargs): def __merge__(self, other: Source, **kwargs):
if self.audio_url is None: if self.audio_url is None:
self.audio_url = other.audio_url self.audio_url = other.audio_url
self.additional_data.update(other.additional_data) self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.page_enum.value) page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection: class SourceCollection:
__change_version__ = generate_id() __change_version__ = generate_id()
_indexed_sources: Dict[str, Source] _indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourcePages, List[Source]] _sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list) self._sources_by_type = defaultdict(list)
self._indexed_sources = {} self._indexed_sources = {}
self.extend(data or []) self.extend(data or [])
def has_source_page(self, *source_pages: SourcePages) -> bool: def source_types(
return any(source_page in self._page_to_source_list for source_page in source_pages) self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: Args:
if not len(source_pages): only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
source_pages = self.source_pages sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
for page in source_pages: Returns:
yield from self._page_to_source_list[page] Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._sources_by_type.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._sources_by_type[source_type]
def append(self, source: Source): def append(self, source: Source):
if source is None: if source is None:
@@ -135,7 +200,7 @@ class SourceCollection:
existing_source.__merge__(source) existing_source.__merge__(source)
source = existing_source source = existing_source
else: else:
self._page_to_source_list[source.page_enum].append(source) self._sources_by_type[source.source_type].append(source)
changed = False changed = False
for key in source.indexing_values: for key in source.indexing_values:
@@ -156,10 +221,6 @@ class SourceCollection:
def __merge__(self, other: SourceCollection, **kwargs): def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other) self.extend(other)
@property
def source_pages(self) -> Iterable[SourcePages]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property @property
def hash_url_list(self) -> List[str]: def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()] return [hash_url(source.url) for source in self.get_sources()]
@@ -170,7 +231,7 @@ class SourceCollection:
@property @property
def homepage_list(self) -> List[str]: def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages] return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]: def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources: for index in self._indexed_sources:

View File

@@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import List, Tuple, TextIO, Union from typing import List, Tuple, TextIO, Union, Optional
import logging import logging
import random import random
import requests import requests
@@ -31,7 +31,10 @@ class Target(OuterProxy):
} }
@classmethod @classmethod
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID))) -> P: def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
if file_extension is not None:
name = f"{name}.{file_extension}"
return cls(main_settings["temp_directory"] / name) return cls(main_settings["temp_directory"] / name)
# This is automatically generated # This is automatically generated

View File

@@ -3,8 +3,9 @@ import random
import re import re
from copy import copy from copy import copy
from pathlib import Path from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
from string import Formatter from string import Formatter
from dataclasses import dataclass, field
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@@ -21,85 +22,45 @@ from ..objects import (
Collection, Collection,
Label, Label,
) )
from ..utils.enums.source import SourcePages from ..utils.enums import SourceType
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings from ..utils.config import main_settings
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils import trace from ..utils import trace, output, BColors
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song] INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]] INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
class NamingDict(dict): @dataclass
CUSTOM_KEYS: Dict[str, str] = { class DownloadOptions:
"label": "label.name", download_all: bool = False
"artist": "artist.name", album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
"song": "song.title",
"isrc": "song.isrc",
"album": "album.title",
"album_type": "album.album_type_string"
}
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = main_settings["audio_format"]
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
def __getitem__(self, key: str) -> str:
return fit_to_file_system(super().__getitem__(key))
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
def __missing__(self, key: str) -> str:
if "." not in key:
if key not in self.CUSTOM_KEYS:
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
frag_list = key.split(".")
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
return str(value)
except AttributeError:
return self.default_value_for_name(attribute_name)
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
class Page: class Page:
""" SOURCE_TYPE: SourceType
This is an abstract class, laying out the LOGGER: logging.Logger
functionality for every other class fetching something
"""
SOURCE_TYPE: SourcePages def __new__(cls, *args, **kwargs):
LOGGER = logging.getLogger("this shouldn't be used") cls.LOGGER = logging.getLogger(cls.__name__)
# set this to true, if all song details can also be fetched by fetching album details return super().__new__(cls)
NO_ADDITIONAL_DATA_FROM_SONG = False
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None): def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
""" """
@@ -172,106 +133,7 @@ class Page:
def song_search(self, song: Song) -> List[Song]: def song_search(self, song: Song) -> List[Song]:
return [] return []
def fetch_details( # to fetch stuff
self,
music_object: DatabaseObject,
stop_at_level: int = 1,
post_process: bool = True
) -> DatabaseObject:
"""
when a music object with lacking data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
If you for example put in, an album, it fetches the tracklist
:param music_object:
:param stop_at_level:
This says the depth of the level the scraper will recurse to.
If this is for example set to 2, then the levels could be:
1. Level: the album
2. Level: every song of the album + every artist of the album
If no additional requests are needed to get the data one level below the supposed stop level
this gets ignored
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
"""
# creating a new object, of the same type
new_music_object: Optional[DatabaseObject] = None
fetched_from_url: List[str] = []
# only certain database objects, have a source list
if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source
for source in music_object.source_collection.get_sources(self.SOURCE_TYPE):
if music_object.already_fetched_from(source.hash_url):
continue
tmp = self.fetch_object_from_source(
source=source,
enforce_type=type(music_object),
stop_at_level=stop_at_level,
post_process=False,
type_string=type(music_object).__name__,
entity_string=music_object.option_string,
)
if new_music_object is None:
new_music_object = tmp
else:
new_music_object.merge(tmp)
fetched_from_url.append(source.hash_url)
if new_music_object is not None:
music_object.merge(new_music_object)
music_object.mark_as_fetched(*fetched_from_url)
return music_object
def fetch_object_from_source(
self,
source: Source,
stop_at_level: int = 2,
enforce_type: Type[DatabaseObject] = None,
post_process: bool = True,
type_string: str = "",
entity_string: str = "",
) -> Optional[DatabaseObject]:
obj_type = self.get_source_type(source)
if obj_type is None:
return None
if enforce_type != obj_type and enforce_type is not None:
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
return None
music_object: DatabaseObject = None
fetch_map = {
Song: self.fetch_song,
Album: self.fetch_album,
Artist: self.fetch_artist,
Label: self.fetch_label
}
if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level=stop_at_level)
else:
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
return None
if stop_at_level > 0:
trace(f"fetching {type_string} [{entity_string}] [stop_at_level={stop_at_level}]")
collection: Collection
for collection_str in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection = music_object.__getattribute__(collection_str)
for sub_element in collection:
sub_element.merge(
self.fetch_details(sub_element, stop_at_level=stop_at_level - 1, post_process=False))
return music_object
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song() return Song()
@@ -284,163 +146,7 @@ class Page:
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label() return Label()
def download( # to download stuff
self,
music_object: DatabaseObject,
genre: str,
download_all: bool = False,
process_metadata_anyway: bool = True
) -> DownloadResult:
naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty:
continue
dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object)
return self._download(music_object, naming_dict, download_all, process_metadata_anyway=process_metadata_anyway)
def _download(
self,
music_object: DatabaseObject,
naming_dict: NamingDict,
download_all: bool = False,
skip_details: bool = False,
process_metadata_anyway: bool = True
) -> DownloadResult:
trace(f"downloading {type(music_object).__name__} [{music_object.option_string}]")
skip_next_details = skip_details
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
if self.NO_ADDITIONAL_DATA_FROM_SONG:
skip_next_details = True
if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]:
return DownloadResult()
if not (isinstance(music_object, Song) and self.NO_ADDITIONAL_DATA_FROM_SONG):
self.fetch_details(music_object=music_object, stop_at_level=1)
if isinstance(music_object, Album):
music_object.update_tracksort()
naming_dict.add_object(music_object)
if isinstance(music_object, Song):
return self._download_song(music_object, naming_dict, process_metadata_anyway=process_metadata_anyway)
download_result: DownloadResult = DownloadResult()
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(music_object, collection_name)
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all,
skip_details=skip_next_details,
process_metadata_anyway=process_metadata_anyway))
return download_result
def _download_song(self, song: Song, naming_dict: NamingDict, process_metadata_anyway: bool = True):
if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre
if song.genre is None:
song.genre = naming_dict["genre"]
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
new_target = Target(
relative_to_music_dir=True,
file_path=Path(
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
)
if song.target_collection.empty:
song.target_collection.append(new_target)
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
temp_target: Target = Target(
relative_to_music_dir=False,
file_path=Path(
main_settings["temp_directory"],
str(song.id)
)
)
r = DownloadResult(1)
found_on_disc = False
target: Target
for target in song.target_collection:
if target.exists:
if process_metadata_anyway:
target.copy_content(temp_target)
found_on_disc = True
r.found_on_disk += 1
r.add_target(target)
if found_on_disc and not process_metadata_anyway:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r
skip_intervals = []
if not found_on_disc:
for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error:
skip_intervals = self.get_skip_intervals(song, source)
break
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=skip_intervals,
))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List) -> DownloadResult:
correct_codec(temp_target, interval_list=interval_list)
self.post_process_hook(song, temp_target)
write_metadata_to_target(song.metadata, temp_target, song)
r = DownloadResult()
target: Target
for target in song.target_collection:
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
return r
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return [] return []

View File

@@ -10,7 +10,7 @@ from .abstract import Page
from ..objects import ( from ..objects import (
Artist, Artist,
Source, Source,
SourcePages, SourceType,
Song, Song,
Album, Album,
Label, Label,
@@ -22,6 +22,8 @@ from ..objects import (
Artwork, Artwork,
) )
from ..connection import Connection from ..connection import Connection
from ..utils import dump_to_file
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings from ..utils.config import main_settings, logging_settings
@@ -48,9 +50,7 @@ class BandcampTypes(Enum):
class Bandcamp(Page): class Bandcamp(Page):
# CHANGE SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
SOURCE_TYPE = SourcePages.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(
@@ -62,8 +62,7 @@ class Bandcamp(Page):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
parsed_url = urlparse(source.url) path = source.parsed_url.path.replace("/", "")
path = parsed_url.path.replace("/", "")
if path == "" or path.startswith("music"): if path == "" or path.startswith("music"):
return Artist return Artist
@@ -118,7 +117,7 @@ class Bandcamp(Page):
return Song( return Song(
title=clean_song_title(name, artist_name=data["band_name"]), title=clean_song_title(name, artist_name=data["band_name"]),
source_list=source_list, source_list=source_list,
main_artist_list=[ artist_list=[
Artist( Artist(
name=data["band_name"], name=data["band_name"],
source_list=[ source_list=[
@@ -238,7 +237,7 @@ class Bandcamp(Page):
html_music_grid = soup.find("ol", {"id": "music-grid"}) html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None: if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"): for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source)) artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})): for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"] data_blob = data_blob_soup["data-blob"]
@@ -247,7 +246,7 @@ class Bandcamp(Page):
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False) dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None: if data_blob is not None:
artist.main_album_collection.extend( artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url) self._parse_artist_data_blob(json.loads(data_blob), source.url)
) )
@@ -371,7 +370,7 @@ class Bandcamp(Page):
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])] source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)], )],
main_artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)], )],

View File

@@ -7,7 +7,7 @@ from urllib.parse import urlparse, urlencode
from ..connection import Connection from ..connection import Connection
from ..utils.config import logging_settings from ..utils.config import logging_settings
from .abstract import Page from .abstract import Page
from ..utils.enums.source import SourcePages from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..objects import ( from ..objects import (
@@ -52,14 +52,14 @@ def _song_from_json(artist_html=None, album_html=None, release_type=None, title=
return Song( return Song(
title=title, title=title,
main_artist_list=[ artist_list=[
_artist_from_json(artist_html=artist_html) _artist_from_json(artist_html=artist_html)
], ],
album_list=[ album_list=[
_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html) _album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html)
], ],
source_list=[ source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, song_id) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, song_id)
] ]
) )
@@ -85,7 +85,7 @@ def _artist_from_json(artist_html=None, genre=None, country=None) -> Artist:
return Artist( return Artist(
name=artist_name, name=artist_name,
source_list=[ source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, artist_url) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, artist_url)
] ]
) )
@@ -105,7 +105,7 @@ def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Al
title=album_name, title=album_name,
album_type=album_type, album_type=album_type,
source_list=[ source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, album_url) Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, album_url)
], ],
artist_list=[ artist_list=[
_artist_from_json(artist_html=artist_html) _artist_from_json(artist_html=artist_html)
@@ -207,7 +207,7 @@ def create_grid(
class EncyclopaediaMetallum(Page): class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourcePages.ENCYCLOPAEDIA_METALLUM SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"] LOGGER = logging_settings["metal_archives_logger"]
def __init__(self, **kwargs): def __init__(self, **kwargs):
@@ -266,7 +266,7 @@ class EncyclopaediaMetallum(Page):
song_title = song.title.strip() song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection] album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection] artist_titles = ["*"] if song.artist_collection.empty else [artist.name.strip() for artist in song.artist_collection]
search_results = [] search_results = []
@@ -663,7 +663,7 @@ class EncyclopaediaMetallum(Page):
artist.notes = band_notes artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id) discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography) artist.album_collection.extend(discography)
return artist return artist
@@ -832,7 +832,7 @@ class EncyclopaediaMetallum(Page):
) )
def get_source_type(self, source: Source): def get_source_type(self, source: Source):
if self.SOURCE_TYPE != source.page_enum: if self.SOURCE_TYPE != source.source_type:
return None return None
url = source.url url = source.url

View File

@@ -9,7 +9,7 @@ from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from .abstract import Page from .abstract import Page
from ..utils.enums.source import SourcePages from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType, AlbumStatus from ..utils.enums.album import AlbumType, AlbumStatus
from ..objects import ( from ..objects import (
Artist, Artist,
@@ -111,9 +111,7 @@ def parse_url(url: str) -> MusifyUrl:
class Musify(Page): class Musify(Page):
# CHANGE SOURCE_TYPE = ALL_SOURCE_TYPES.MUSIFY
SOURCE_TYPE = SourcePages.MUSIFY
LOGGER = logging_settings["musify_logger"]
HOST = "https://musify.club" HOST = "https://musify.club"
@@ -418,6 +416,10 @@ class Musify(Page):
href = artist_soup["href"] href = artist_soup["href"]
if href is not None: if href is not None:
href_parts = href.split("/")
if len(href_parts) <= 1 or href_parts[-2] != "artist":
return
artist_src_list.append(Source(self.SOURCE_TYPE, self.HOST + href)) artist_src_list.append(Source(self.SOURCE_TYPE, self.HOST + href))
name_elem: BeautifulSoup = artist_soup.find("span", {"itemprop": "name"}) name_elem: BeautifulSoup = artist_soup.find("span", {"itemprop": "name"})
@@ -500,9 +502,18 @@ class Musify(Page):
for video_container in video_container_list: for video_container in video_container_list:
iframe_list: List[BeautifulSoup] = video_container.findAll("iframe") iframe_list: List[BeautifulSoup] = video_container.findAll("iframe")
for iframe in iframe_list: for iframe in iframe_list:
"""
the url could look like this
https://www.youtube.com/embed/sNObCkhzOYA?si=dNVgnZMBNVlNb0P_
"""
parsed_url = urlparse(iframe["src"])
path_parts = parsed_url.path.strip("/").split("/")
if path_parts[0] != "embed" or len(path_parts) < 2:
continue
source_list.append(Source( source_list.append(Source(
SourcePages.YOUTUBE, ALL_SOURCE_TYPES.YOUTUBE,
iframe["src"], f"https://music.youtube.com/watch?v={path_parts[1]}",
referrer_page=self.SOURCE_TYPE referrer_page=self.SOURCE_TYPE
)) ))
@@ -681,17 +692,20 @@ class Musify(Page):
anchor: BeautifulSoup = artist_crumb.find("a") anchor: BeautifulSoup = artist_crumb.find("a")
if anchor is not None: if anchor is not None:
href = anchor.get("href") href = anchor.get("href")
artist_source_list: List[Source] = []
if href is not None: href_parts = href.split("/")
artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip())) if not(len(href_parts) <= 1 or href_parts[-2] != "artist"):
artist_source_list: List[Source] = []
span: BeautifulSoup = anchor.find("span") if href is not None:
if span is not None: artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip()))
artist_list.append(Artist(
name=span.get_text(strip=True), span: BeautifulSoup = anchor.find("span")
source_list=artist_source_list if span is not None:
)) artist_list.append(Artist(
name=span.get_text(strip=True),
source_list=artist_source_list
))
else: else:
self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case") self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case")
@@ -938,10 +952,10 @@ class Musify(Page):
album_status_id = album_card.get("data-type") album_status_id = album_card.get("data-type")
if album_status_id.isdigit(): if album_status_id.isdigit():
album_status_id = int(album_status_id) album_status_id = int(album_status_id)
album_type = ALBUM_TYPE_MAP[album_status_id] album_kwargs["album_type"] = ALBUM_TYPE_MAP[album_status_id]
if album_status_id == 5: if album_status_id == 5:
album_status = AlbumStatus.BOOTLEG album_kwargs["album_status"] = AlbumStatus.BOOTLEG
def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False): def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False):
nonlocal album_kwargs nonlocal album_kwargs
@@ -1037,10 +1051,10 @@ class Musify(Page):
for card_soup in soup.find_all("div", {"class": "card"}): for card_soup in soup.find_all("div", {"class": "card"}):
album = self._parse_album_card(card_soup, artist_name, **kwargs) album = self._parse_album_card(card_soup, artist_name, **kwargs)
if album.album_type in _album_type_blacklist: if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist:
continue continue
artist.main_album_collection.append(album) artist.album_collection.append(album)
def fetch_artist(self, source: Source, **kwargs) -> Artist: def fetch_artist(self, source: Source, **kwargs) -> Artist:
""" """

View File

@@ -1,65 +0,0 @@
from typing import List, Optional, Type
from urllib.parse import urlparse
import logging
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ..connection import Connection
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
class Preset(Page):
# CHANGE
SOURCE_TYPE = SourcePages.PRESET
LOGGER = logging.getLogger("preset")
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://www.preset.cum/",
logger=self.LOGGER
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -9,7 +9,6 @@ from .abstract import Page
from ..objects import ( from ..objects import (
Artist, Artist,
Source, Source,
SourcePages,
Song, Song,
Album, Album,
Label, Label,
@@ -19,6 +18,7 @@ from ..objects import (
) )
from ..connection import Connection from ..connection import Connection
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings from ..utils.config import youtube_settings, main_settings, logging_settings
@@ -39,10 +39,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s
class YouTube(SuperYouTube): class YouTube(SuperYouTube):
# CHANGE # CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(
@@ -146,7 +143,7 @@ class YouTube(SuperYouTube):
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}") self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)], )],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ), notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list artist_list=artist_list
), int(data["published"]) ), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
@@ -287,7 +284,7 @@ class YouTube(SuperYouTube):
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)") self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id) album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source]) return Artist(name=artist_name, album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
""" """

View File

@@ -7,7 +7,6 @@ from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
Source, Source,
SourcePages,
Song, Song,
Album, Album,
Label, Label,
@@ -59,6 +58,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
song.album_collection.extend(album_list) song.album_collection.extend(album_list)
return [song] return [song]
if len(album_list) == 1:
album = album_list[0]
album.artist_collection.extend(artist_list)
album.song_collection.extend(song_list)
return [album]
"""
if len(artist_list) == 1:
artist = artist_list[0]
artist.main_album_collection.extend(album_list)
return [artist]
"""
return results return results

View File

@@ -3,12 +3,13 @@ from enum import Enum
from ...utils.config import youtube_settings, logging_settings from ...utils.config import youtube_settings, logging_settings
from ...utils.string_processing import clean_song_title from ...utils.string_processing import clean_song_title
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...objects import Source, DatabaseObject from ...objects import Source, DatabaseObject
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
Source, Source,
SourcePages,
Song, Song,
Album, Album,
Label, Label,
@@ -18,7 +19,7 @@ from ...objects import (
LOGGER = logging_settings["youtube_music_logger"] LOGGER = logging_settings["youtube_music_logger"]
SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC SOURCE_PAGE = ALL_SOURCE_TYPES.YOUTUBE
class PageType(Enum): class PageType(Enum):

View File

@@ -10,7 +10,6 @@ from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
Source, Source,
SourcePages,
Song, Song,
Album, Album,
Label, Label,
@@ -21,6 +20,7 @@ from ...objects import (
from ...connection import Connection from ...connection import Connection
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings from ...utils.config import youtube_settings, logging_settings, main_settings
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
@@ -50,7 +50,7 @@ class YouTubeUrl:
""" """
def __init__(self, url: str) -> None: def __init__(self, url: str) -> None:
self.SOURCE_TYPE = SourcePages.YOUTUBE self.SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
""" """
Raises Index exception for wrong url, and value error for not found enum type Raises Index exception for wrong url, and value error for not found enum type
@@ -58,9 +58,6 @@ class YouTubeUrl:
self.id = "" self.id = ""
parsed = urlparse(url=url) parsed = urlparse(url=url)
if parsed.netloc == "music.youtube.com":
self.SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
self.url_type: YouTubeUrlType self.url_type: YouTubeUrlType
type_frag_list = parsed.path.split("/") type_frag_list = parsed.path.split("/")
@@ -124,8 +121,7 @@ class YouTubeUrl:
class SuperYouTube(Page): class SuperYouTube(Page):
# CHANGE # CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = False NO_ADDITIONAL_DATA_FROM_SONG = False
@@ -145,6 +141,8 @@ class SuperYouTube(Page):
_sponsorblock_connection: Connection = Connection() _sponsorblock_connection: Connection = Connection()
self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session) self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = { _url_type = {
YouTubeUrlType.CHANNEL: Artist, YouTubeUrlType.CHANNEL: Artist,

View File

@@ -22,20 +22,22 @@ from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file from ...utils import dump_to_file
from ...objects import Source, DatabaseObject, ID3Timestamp, Artwork
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, DatabaseObject as DataObject,
Source, Source,
SourcePages, FormattedText,
ID3Timestamp,
Artwork,
Artist,
Song, Song,
Album, Album,
Label, Label,
Target, Target,
Lyrics, Lyrics,
FormattedText
) )
from ...connection import Connection from ...connection import Connection
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...utils.enums.album import AlbumType from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
@@ -176,8 +178,7 @@ ALBUM_TYPE_MAP = {
class YoutubeMusic(SuperYouTube): class YoutubeMusic(SuperYouTube):
# CHANGE # CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
LOGGER = logging_settings["youtube_music_logger"]
def __init__(self, *args, ydl_opts: dict = None, **kwargs): def __init__(self, *args, ydl_opts: dict = None, **kwargs):
self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection( self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection(
@@ -193,8 +194,7 @@ class YoutubeMusic(SuperYouTube):
self.start_millis = get_current_millis() self.start_millis = get_current_millis()
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING: self._fetch_from_main_page()
self._fetch_from_main_page()
SuperYouTube.__init__(self, *args, **kwargs) SuperYouTube.__init__(self, *args, **kwargs)
@@ -215,6 +215,8 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url: dict = {} self.download_values_by_url: dict = {}
self.not_download: Dict[str, DownloadError] = {} self.not_download: Dict[str, DownloadError] = {}
super().__init__(*args, **kwargs)
def _fetch_from_main_page(self): def _fetch_from_main_page(self):
""" """
===API=KEY=== ===API=KEY===
@@ -347,10 +349,10 @@ class YoutubeMusic(SuperYouTube):
default='{}' default='{}'
)) or {} )) or {}
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return super().get_source_type(source) return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]: def general_search(self, search_query: str) -> List[DataObject]:
search_query = search_query.strip() search_query = search_query.strip()
urlescaped_query: str = quote(search_query.strip().replace(" ", "+")) urlescaped_query: str = quote(search_query.strip().replace(" ", "+"))
@@ -587,6 +589,8 @@ class YoutubeMusic(SuperYouTube):
}, },
name=f"fetch_song_lyrics_{video_id}.json" name=f"fetch_song_lyrics_{video_id}.json"
) )
if r is None:
return None
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False) dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
@@ -618,7 +622,7 @@ class YoutubeMusic(SuperYouTube):
Artist( Artist(
name=name, name=name,
source_list=[Source( source_list=[Source(
SourcePages.YOUTUBE_MUSIC, self.SOURCE_TYPE,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}" f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)] )]
) for name in artist_names] ) for name in artist_names]
@@ -637,9 +641,9 @@ class YoutubeMusic(SuperYouTube):
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])), artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list, artist_list=artist_list,
source_list=[Source( source_list=[Source(
SourcePages.YOUTUBE_MUSIC, self.SOURCE_TYPE,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}" f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
), source], ), source],
) )
@@ -736,8 +740,9 @@ class YoutubeMusic(SuperYouTube):
raw_headers=True, raw_headers=True,
disable_cache=True, disable_cache=True,
headers=media.get("headers", {}), headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]), chunk_size=main_settings["chunk_size"],
method="GET", method="GET",
timeout=5,
) )
else: else:
result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url])) result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url]))

View File

@@ -19,7 +19,7 @@ config = Config((
You can use Audio formats which support ID3.2 and ID3.1, You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""), but you will have cleaner Metadata using ID3.2."""),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results. Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""), The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history. Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage. The further you choose to be able to go back, the higher the memory usage.

View File

@@ -1 +1,54 @@
from .source import SourcePages from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, TYPE_CHECKING, Type
if TYPE_CHECKING:
from ...pages.abstract import Page
@dataclass
class SourceType:
name: str
homepage: Optional[str] = None
download_priority: int = 0
page_type: Type[Page] = None
page: Page = None
def register_page(self, page: Page):
self.page = page
def __hash__(self):
return hash(self.name)
@property
def has_page(self) -> bool:
return self.page is not None
# for backwards compatibility
@property
def value(self) -> str:
return self.name
class ALL_SOURCE_TYPES:
YOUTUBE = SourceType(name="youtube", homepage="https://music.youtube.com/")
BANDCAMP = SourceType(name="bandcamp", homepage="https://bandcamp.com/", download_priority=10)
MUSIFY = SourceType(name="musify", homepage="https://musify.club/", download_priority=7)
GENIUS = SourceType(name="genius", homepage="https://genius.com/")
MUSICBRAINZ = SourceType(name="musicbrainz", homepage="https://musicbrainz.org/")
ENCYCLOPAEDIA_METALLUM = SourceType(name="encyclopaedia metallum")
DEEZER = SourceType(name="deezer", homepage="https://www.deezer.com/")
SPOTIFY = SourceType(name="spotify", homepage="https://open.spotify.com/")
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = SourceType(name="wikipedia", homepage="https://en.wikipedia.org/wiki/Main_Page")
INSTAGRAM = SourceType(name="instagram", homepage="https://www.instagram.com/")
FACEBOOK = SourceType(name="facebook", homepage="https://www.facebook.com/")
TWITTER = SourceType(name="twitter", homepage="https://twitter.com/")
# Yes somehow this ancient site is linked EVERYWHERE
MYSPACE = SourceType(name="myspace", homepage="https://myspace.com/")
MANUAL = SourceType(name="manual")
PRESET = SourceType(name="preset")

View File

@@ -1,40 +0,0 @@
from enum import Enum
class SourceTypes(Enum):
SONG = "song"
ALBUM = "album"
ARTIST = "artist"
LYRICS = "lyrics"
class SourcePages(Enum):
YOUTUBE = "youtube", "https://www.youtube.com/"
MUSIFY = "musify", "https://musify.club/"
YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/"
GENIUS = "genius", "https://genius.com/"
MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp", "https://bandcamp.com/"
DEEZER = "deezer", "https://www.deezer.com/"
SPOTIFY = "spotify", "https://open.spotify.com/"
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page"
INSTAGRAM = "instagram", "https://www.instagram.com/"
FACEBOOK = "facebook", "https://www.facebook.com/"
TWITTER = "twitter", "https://twitter.com/"
MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE
MANUAL = "manual", ""
PRESET = "preset", ""
def __new__(cls, value, homepage = None):
member = object.__new__(cls)
member._value_ = value
member.homepage = homepage
return member

View File

@@ -4,8 +4,20 @@ class MKBaseException(Exception):
super().__init__(message, **kwargs) super().__init__(message, **kwargs)
# Downloading
class MKDownloadException(MKBaseException):
pass
class MKMissingNameException(MKDownloadException):
pass
# Frontend
class MKFrontendException(MKBaseException): class MKFrontendException(MKBaseException):
pass pass
class MKInvalidInputException(MKFrontendException): class MKInvalidInputException(MKFrontendException):
pass pass

View File

@@ -15,7 +15,7 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False

View File

@@ -116,10 +116,13 @@ def clean_song_title(raw_song_title: str, artist_name: Optional[str] = None) ->
# Remove artist from the start of the title # Remove artist from the start of the title
if raw_song_title.lower().startswith(artist_name.lower()): if raw_song_title.lower().startswith(artist_name.lower()):
raw_song_title = raw_song_title[len(artist_name):].strip()
if raw_song_title.startswith("-"): possible_new_name = raw_song_title[len(artist_name):].strip()
raw_song_title = raw_song_title[1:].strip()
for char in ("-", "", ":", "|"):
if possible_new_name.startswith(char):
raw_song_title = possible_new_name[1:].strip()
break
return raw_song_title.strip() return raw_song_title.strip()

View File

@@ -24,7 +24,7 @@ class Query:
return [self.music_object.name] return [self.music_object.name]
if isinstance(self.music_object, Song): if isinstance(self.music_object, Song):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.main_artist_collection] return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]
if isinstance(self.music_object, Album): if isinstance(self.music_object, Album):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection] return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]

View File

@@ -69,7 +69,7 @@ dependencies = [
"toml~=0.10.2", "toml~=0.10.2",
"typing_extensions~=4.7.1", "typing_extensions~=4.7.1",
"python-sponsorblock~=0.0.0", "python-sponsorblock~=0.1",
"youtube_dl", "youtube_dl",
] ]
dynamic = [ dynamic = [

View File

@@ -3,96 +3,98 @@ import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase): class TestCollection(unittest.TestCase):
@staticmethod def test_song_contains_album(self):
def complicated_object() -> Artist: """
return Artist( Tests that every song contains the album it is added to in its album_collection
name="artist", """
country=Country.by_alpha_2("DE"),
main_album_list=[ a_1 = Album(
Album( title="album",
title="album", song_list= [
song_list=[ Song(title="song"),
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
] ]
) )
a_2 = a_1.song_collection[0].album_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_song_album_relation(self): def test_album_contains_song(self):
""" """
Tests that Tests that every album contains the song it is added to in its song_collection
album = album.any_song.one_album """
is the same object s_1 = Song(
title="song",
album_list=[
Album(title="album"),
]
)
s_2 = s_1.album_collection[0].song_collection[0]
self.assertTrue(s_1.id == s_2.id)
def test_auto_add_artist_to_album_feature_artist(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default
""" """
a = self.complicated_object().main_album_collection[0] a_1 = Artist(
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
def test_artist_artist_relation(self):
artist = Artist(
name="artist", name="artist",
main_album_list=[ album_list=[
Album(title="album")
]
)
a_2 = a_1.album_collection[0].feature_artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_auto_add_artist_to_album_feature_artist_push(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default but pulled into the album's artist_collection if a merge exitst
"""
a_1 = Artist(
name="artist",
album_list=[
Album( Album(
title="album", title="album",
song_list=[
Song(title="song"),
],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
] ]
) )
] ]
) )
a_2 = a_1.album_collection[0].artist_collection[0]
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id) self.assertTrue(a_1.id == a_2.id)
def test_artist_artist_relation(self):
"""
Tests the proper syncing between album.artist_collection and song.artist_collection
"""
album = Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
a_1 = album.artist_collection[0]
a_2 = album.song_collection[0].artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_artist_collection_sync(self): def test_artist_collection_sync(self):
"""
tests the actual implementation of the test above
"""
album_1 = Album( album_1 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -102,7 +104,7 @@ class TestCollection(unittest.TestCase):
album_2 = Album( album_2 = Album(
title="album", title="album",
song_list=[ song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]), Song(title="song", artist_list=[Artist(name="artist")]),
], ],
artist_list=[ artist_list=[
Artist(name="artist"), Artist(name="artist"),
@@ -111,17 +113,7 @@ class TestCollection(unittest.TestCase):
album_1.merge(album_2) album_1.merge(album_2)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection)) self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()