47 Commits

Author SHA1 Message Date
49145a7d93 feat: moved cli helpers to seperate file
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 13:41:24 +02:00
0f2229b0f2 feat: completely dynamified the datasource import
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-24 17:00:39 +02:00
5af95f1b03 feat: auto import pages in page module 2024-05-24 15:46:42 +02:00
c24cf701c1 fix: pages were not in the subclasses because the module was never importet 2024-05-24 15:28:47 +02:00
cef87460a7 draft
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-24 14:46:38 +02:00
c0fbd16929 feat: basic select layout 2024-05-24 13:21:07 +02:00
b5a5559f7b feat: created layout for option
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 18:05:18 +02:00
906ddb679d draft:
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 17:27:24 +02:00
cd2e7d7173 draft: moving page interface to downloader module
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 16:33:40 +02:00
c683394228 feat: imports 2024-05-23 16:13:47 +02:00
aafbba3b1c feat: implemented consistent settings
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 14:36:19 +02:00
40e9366a0b feat: implemented the new page mechanics in the downloader 2024-05-23 14:32:31 +02:00
8255ad5264 feat: added detection to autoscann pages
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 14:24:20 +02:00
2aa0f02fa5 Merge branch 'adding_genius' into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 13:36:10 +02:00
7b0b830d64 feat: removed legacy key
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2024-05-23 13:24:25 +02:00
1ba6c97f5a feat: more extensive browse id 2024-05-23 13:20:34 +02:00
c8cbfc7cb9 feat: improved output of clearing the cache 2024-05-23 13:17:14 +02:00
344da0a0bf fix: converting pictures to rgb before saving
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:20:26 +02:00
49dc7093c8 fix: genius fallback
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:18:43 +02:00
90f70638b4 feat: better lyrics support
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:55:08 +02:00
7b4eee858a feat: parsed script json
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:14:58 +02:00
f61b34dd40 feat: improved feature artists by also adding writer and producer to it
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:52:01 +02:00
688b4fd357 feat: getting the album tracklist
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:47:38 +02:00
769d27dc5c feat: album details
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:43:52 +02:00
f5d953d9ce feat: theoretically fetching feature songs
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:34:04 +02:00
46b64b8f8d feat: fetched the flat artist details 2024-05-21 16:23:05 +02:00
adfce16d2a feat: fetched the flat artist details 2024-05-21 16:21:58 +02:00
e4fd9faf12 feat: detecting url type 2024-05-21 15:57:09 +02:00
f6caee41a8 feat: finished searching genious
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 15:52:41 +02:00
068c749c38 feat: implemented artist search 2024-05-21 15:27:10 +02:00
c131924577 Merge pull request 'fix/clean_feature_artists' (#38) from fix/clean_feature_artists into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Reviewed-on: #38
2024-05-21 12:08:04 +00:00
8cdb5c1f99 fix: tests were a mess and didn't properly test the functionality but random things that worked with implementation
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-21 13:52:20 +02:00
356ba658ce fix: lyrics enpoint could crash the whole program 2024-05-21 13:37:26 +02:00
000a6c0dba feat: added type identifier to option strings 2024-05-17 18:24:56 +02:00
83a3334f1a changed default value
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:55:58 +02:00
ab61ff7e9b feat: added the option to select all at options at the same time
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:37:49 +02:00
3cb35909d1 feat: added option to just fetch the objects in select 2024-05-17 13:31:46 +02:00
e87075a809 feat: changed syncing from event based to reference
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:39:12 +02:00
86e985acec fix: files don't contain id anymore
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:30:53 +02:00
a70a24d93e feat: minor adjustments
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:14:18 +02:00
2c1ac0f12d fix: wrong collection 2024-05-16 17:09:36 +02:00
897897dba2 feat: added recursive structure 2024-05-16 14:29:50 +02:00
adcf26b518 feat: renamed main album collection to album collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:10:00 +02:00
8ccc28daf8 draft: added feature artist collection attr 2024-05-16 14:09:34 +02:00
2b3f4d82d9 feat: renamed main_artist_collection to artist_collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:05:33 +02:00
41a91a6afe feat: removing dependence beteen artist.album and album.artist 2024-05-16 13:41:06 +02:00
82df96a193 Merge pull request 'feature/move_download_code_to_download' (#34) from feature/move_download_code_to_download into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #34
2024-05-15 15:24:30 +00:00
35 changed files with 1293 additions and 927 deletions

6
.vscode/launch.json vendored
View File

@@ -17,6 +17,12 @@
"request": "launch",
"program": "development/actual_donwload.py",
"console": "integratedTerminal"
},
{
"name": "Python Debugger: Music Kraken",
"type": "debugpy",
"request": "launch", // run the module
"module": "music_kraken",
}
]
}

View File

@@ -27,6 +27,7 @@
"Gitea",
"iframe",
"isrc",
"itemprop",
"levenshtein",
"metallum",
"MUSICBRAINZ",

View File

@@ -6,9 +6,10 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a I'm in a coffin",
"0",
"d: 0",
"s: #a Crystal F",
"10",
"1",
"3",
]

View File

@@ -13,7 +13,7 @@ if __name__ == "__main__":
song_2 = Song(
title = "song",
main_artist_list=[other_artist]
artist_list=[other_artist]
)
other_artist.name = "main_artist"
@@ -21,5 +21,5 @@ if __name__ == "__main__":
song_1.merge(song_2)
print("#" * 120)
print("main", *song_1.main_artist_collection)
print("main", *song_1.artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@@ -93,6 +93,10 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
# resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(converted_target.file_path, "JPEG")
# https://stackoverflow.com/questions/70228440/mutagen-how-can-i-correctly-embed-album-art-into-mp3-file-so-that-i-can-see-t

View File

@@ -1,89 +1,25 @@
import random
from typing import Set, Type, Dict, List
from pathlib import Path
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type
from .utils import cli_function
from .options.first_config import initial_config
from ..utils import output, BColors
from ..utils.config import write_config, main_settings
from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from .. import console
from ..download import Downloader, Page, components
from ..download.results import GoToResults
from ..download.results import Option as ResultOption
from ..download.results import PageResults, Results
from ..objects import Album, Artist, DatabaseObject, Song
from ..utils import BColors, output
from ..utils.config import main_settings, write_config
from ..utils.enums.colors import BColors
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors
from .. import console
from ..download.results import Results, Option, PageResults, GoToResults
from ..download.page_attributes import Pages
from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject
"""
This is the implementation of the Shell
# Behaviour
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```
"""
from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
@@ -91,50 +27,19 @@ PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
select_genre = components.GenreSelect()
select_genre._ask_for_creating_option = lambda key: ask_for_bool(f"Create the genre \"{key}\"")
while True:
genre = input("Id or new genre: ")
genre: Optional[components.Option] = None
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
while genre is None:
for genre in select_genre:
print(genre)
return existing_genres[genre_id]
genre = select_genre.choose(input("Id or new genre: "))
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
return genre.value
def help_message():
@@ -143,7 +48,7 @@ def help_message():
print()
class Downloader:
class CliDownloader:
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
@@ -153,7 +58,7 @@ class Downloader:
genre: str = None,
process_metadata_anyway: bool = False,
) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.pages: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
@@ -177,7 +82,7 @@ class Downloader:
page_count = 0
for option in self.current_results.formatted_generator():
if isinstance(option, Option):
if isinstance(option, ResultOption):
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:
@@ -226,7 +131,7 @@ class Downloader:
if album is not None:
song.album_collection.append(album)
if artist is not None:
song.main_artist_collection.append(artist)
song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
@@ -354,37 +259,41 @@ class Downloader:
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and do_download:
raise MKInvalidInputException(message="You can't search and download at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False
indices = []
for possible_index in query.split(","):
possible_index = possible_index.strip()
if possible_index == "":
continue
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
def get_selected_objects(q: str):
if q.strip().lower() == "all":
return list(self.current_results)
if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
indices = []
for possible_index in q.split(","):
possible_index = possible_index.strip()
if possible_index == "":
continue
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
selected_objects = [self.current_results[i] for i in indices]
if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge:
old_selected_objects = selected_objects
@@ -397,6 +306,13 @@ class Downloader:
selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.print_current_options()
return False
if do_download:
self.download(selected_objects)
return False
@@ -435,7 +351,7 @@ def download(
else:
print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}")
shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None:
for command in command_list:

View File

@@ -39,4 +39,8 @@ def print_cute_message():
print(message)
AGREE_INPUTS = {"y", "yes", "ok"}
def ask_for_bool(msg: str) -> bool:
i = input(msg + " (Y/N):").lower()
return i in AGREE_INPUTS

View File

@@ -6,6 +6,7 @@ from typing import List, Optional
from functools import lru_cache
import logging
from ..utils import output, BColors
from ..utils.config import main_settings
from ..utils.string_processing import fit_to_file_system
@@ -136,13 +137,13 @@ class Cache:
)
self._write_attribute(cache_attribute)
cache_path = fit_to_file_system(Path(module_path, name), hidden_ok=True)
cache_path = fit_to_file_system(Path(module_path, name.replace("/", "_")), hidden_ok=True)
with cache_path.open("wb") as content_file:
self.logger.debug(f"writing cache to {cache_path}")
content_file.write(content)
def get(self, name: str) -> Optional[CacheResult]:
path = fit_to_file_system(Path(self._dir, self.module, name), hidden_ok=True)
path = fit_to_file_system(Path(self._dir, self.module, name.replace("/", "_")), hidden_ok=True)
if not path.is_file():
return None
@@ -165,7 +166,7 @@ class Cache:
if ca.name == "":
continue
file = fit_to_file_system(Path(self._dir, ca.module, ca.name), hidden_ok=True)
file = fit_to_file_system(Path(self._dir, ca.module, ca.name.replace("/", "_")), hidden_ok=True)
if not ca.is_valid:
self.logger.debug(f"deleting cache {ca.id}")
@@ -204,9 +205,12 @@ class Cache:
for path in self._dir.iterdir():
if path.is_dir():
for file in path.iterdir():
output(f"Deleting file {file}", color=BColors.GREY)
file.unlink()
output(f"Deleting folder {path}", color=BColors.HEADER)
path.rmdir()
else:
output(f"Deleting folder {path}", color=BColors.HEADER)
path.unlink()
self.cached_attributes.clear()

View File

@@ -1,8 +1,36 @@
from dataclasses import dataclass, field
from typing import Set
from __future__ import annotations
from ..utils.config import main_settings
import logging
import random
import re
from collections import defaultdict
from copy import copy
from dataclasses import dataclass, field
from pathlib import Path
from string import Formatter
from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List,
Optional, Set, Tuple, Type, TypedDict, Union)
import requests
from bs4 import BeautifulSoup
from ..audio import correct_codec, write_metadata_to_target
from ..connection import Connection
from ..objects import Album, Artist, Collection
from ..objects import DatabaseObject as DataObject
from ..objects import Label, Options, Song, Source, Target
from ..utils import BColors, output, trace
from ..utils.config import main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
from ..utils.exception import MKComposeException, MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.path_manager import LOCATIONS
from ..utils.shared import DEBUG_PAGES
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .results import SearchResults
@dataclass
@@ -19,3 +47,409 @@ class DownloadOptions:
download_again_if_found: bool = False
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
class Downloader:
def __init__(
self,
auto_register_pages: bool = True,
download_options: DownloadOptions = None,
fetch_options: FetchOptions = None,
**kwargs
):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set)
if auto_register_pages:
self.scan_for_pages(**kwargs)
def register_page(self, page_type: Type[Page], **kwargs):
if page_type in self._registered_pages:
return
self._registered_pages[page_type].add(page_type(
download_options=self.download_options,
fetch_options=self.fetch_options,
**kwargs
))
def deregister_page(self, page_type: Type[Page]):
if page_type not in _registered_pages:
return
for p in self._registered_pages[page_type]:
p.__del__()
del self._registered_pages[page_type]
def scan_for_pages(self, **kwargs):
# assuming the wanted pages are the leaf classes of the interface
from .. import pages
leaf_classes = []
class_list = [Page]
while len(class_list):
_class = class_list.pop()
class_subclasses = _class.__subclasses__()
if len(class_subclasses) == 0:
if _class.REGISTER:
leaf_classes.append(_class)
else:
class_list.extend(class_subclasses)
if Page in leaf_classes:
self.LOGGER.warn("couldn't find any data source")
return
for leaf_class in leaf_classes:
self.register_page(leaf_class, **kwargs)
def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]:
if len(page_types) == 0:
page_types = _registered_pages.keys()
for page_type in page_types:
yield from self._registered_pages[page_type]
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page in self.get_pages():
result.add(
page=type(page),
search_result=page.search(query=query)
)
return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, **kwargs) -> DataObject:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None or source.page is None:
raise UrlNotFoundException(url=url)
return source.page.fetch_object_from_source(source=source, **kwargs)
class Page:
REGISTER = True
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
@classmethod
def is_leaf_page(cls) -> bool:
return len(cls.__subclasses__()) == 0
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def __del__(self):
self.SOURCE_TYPE.deregister_page()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DataObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DataObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -0,0 +1,158 @@
import re
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional
from ..utils.config import main_settings
from ..utils.exception import MKComposeException
from ..utils.string_processing import unify
class Option:
"""
This could represent a data object, a string or a page.
"""
def __init__(
self,
value: Any,
text: Optional[str] = None,
keys: List[Any] = None,
hidden: bool = False,
parse_key: Callable[[Any], Any] = lambda x: x,
):
self._parse_key: Callable[[Any], Any] = parse_key
self.value = value
self.text = text or str(value)
self.hidden = hidden
self._raw_keys = set(keys or [])
self._raw_keys.add(self.text)
self.keys = set(self.parse_key(key) for key in self._raw_keys)
def register_key(self, key: Any):
self._raw_keys.add(key)
self.keys.add(self._parse_key(key))
@property
def parse_key(self) -> Callable[[Any], Any]:
return self._parse_key
@parse_key.setter
def parse_key(self, value: Callable[[Any], Any]):
self._parse_key = value
self.keys = set(self._parse_key(key) for key in self._raw_keys)
def __str__(self):
return self.text
class Select:
def __init__(
self,
options: Generator[Option, None, None] = None,
option_factory: Callable[[Any], Option] = None,
raw_options: List[Any] = None,
parse_option_key: Callable[[Any], Any] = lambda x: x,
ask_for_creating_option: Callable[[Option], bool] = lambda x: True,
sort: bool = False,
**kwargs
):
self._parse_option_key: Callable[[Any], Any] = parse_option_key
self._ask_for_creating_option: Callable[[Option], bool] = ask_for_creating_option
self._key_to_option: Dict[Any, Option] = dict()
self._options: List[Option] = []
options = options or []
self.option_factory: Optional[Callable[[Any], Option]] = option_factory
if self.can_create_options:
_raw_options = raw_options or []
if sort:
_raw_options = sorted(_raw_options)
for raw_option in _raw_options:
self.append(self.option_factory(raw_option))
elif raw_options is not None:
raise MKComposeException("Cannot create options without a factory.")
self.extend(options)
@property
def can_create_options(self) -> bool:
return self.option_factory is not None
def append(self, option: Option):
option.parse_key = self._parse_option_key
self._options.append(option)
for key in option.keys:
self._key_to_option[key] = option
def extend(self, options: List[Option]):
for option in options:
self.append(option)
def __iter__(self) -> Generator[Option, None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __contains__(self, key: Any) -> bool:
return key in self._key_to_option
def __getitem__(self, key: Any) -> Option:
return self._key_to_option[key]
def create_option(self, key: Any, **kwargs) -> Option:
if not self.can_create_options:
raise MKComposeException("Cannot create options without a factory.")
option = self.option_factory(key, **kwargs)
self.append(option)
return option
def choose(self, key: Any) -> Optional[Option]:
if key not in self:
if self.can_create_options and self._ask_for_creating_option(key):
return self.create_option(key)
return None
return self[key]
class StringSelect(Select):
def __init__(self, **kwargs):
self._current_index = 0
kwargs["option_factory"] = self.next_option
kwargs["parse_option_key"] = lambda x: unify(str(x))
super().__init__(**kwargs)
def next_option(self, value: Any) -> Optional[Option]:
o = Option(value=value, keys=[self._current_index], text=f"{self._current_index:0>2}: {value}")
self._current_index += 1
return o
class GenreSelect(StringSelect):
@staticmethod
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
def __init__(self):
super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir())))

View File

@@ -1,327 +0,0 @@
from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict
from pathlib import Path
import re
import logging
from . import FetchOptions, DownloadOptions
from .results import SearchResults
from ..objects import (
DatabaseObject as DataObject,
Collection,
Target,
Source,
Options,
Song,
Album,
Artist,
Label,
)
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum,
Musify,
YoutubeMusic,
Bandcamp
}
if youtube_settings["use_youtube_alongside_youtube_music"]:
ALL_PAGES.add(YouTube)
AUDIO_PAGES: Set[Type[Page]] = {
Musify,
YouTube,
YoutubeMusic,
Bandcamp
}
SHADY_PAGES: Set[Type[Page]] = {
Musify,
}
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
if DEBUG_PAGES:
DEBUGGING_PAGE = Bandcamp
print(f"Only downloading from page {DEBUGGING_PAGE}.")
ALL_PAGES = {DEBUGGING_PAGE}
AUDIO_PAGES = ALL_PAGES.union(AUDIO_PAGES)
class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set()
if exclude_shady:
exclude_pages = exclude_pages.union(SHADY_PAGES)
if not exclude_pages.issubset(ALL_PAGES):
raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
for page_type in self.pages:
self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page:
if source_page not in self._source_to_page:
return None
return self._page_instances[self._source_to_page[source_page]]
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page_type in self.pages:
result.add(
page=page_type,
search_result=self._page_instances[page_type].search(query=query)
)
return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return data_object
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_string)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_string for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.main_artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_string for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@@ -1,8 +1,12 @@
from typing import Tuple, Type, Dict, List, Generator, Union
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Generator, List, Tuple, Type, Union
from ..objects import DatabaseObject
from ..pages import Page, EncyclopaediaMetallum, Musify
if TYPE_CHECKING:
from . import Page
@dataclass

View File

@@ -59,4 +59,6 @@ class Artwork:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool:
if not isinstance(other, Artwork):
return False
return any(a == b for a, b in zip(self._variant_mapping.keys(), other._variant_mapping.keys()))

View File

@@ -115,13 +115,6 @@ class Collection(Generic[T]):
self._data.append(other)
other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
@@ -141,6 +134,13 @@ class Collection(Generic[T]):
a.extend(b_data, **kwargs)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
@@ -160,6 +160,7 @@ class Collection(Generic[T]):
object_trace(f"Appending {other.option_string} to {self}")
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)

View File

@@ -37,11 +37,19 @@ class FormattedText:
@property
def markdown(self) -> str:
return md(self.html).strip()
@markdown.setter
def markdown(self, value: str) -> None:
self.html = mistune.markdown(value)
@property
def plain(self) -> str:
md = self.markdown
return md.replace("\n\n", "\n")
@plain.setter
def plain(self, value: str) -> None:
self.html = mistune.markdown(plain_to_markdown(value))
def __str__(self) -> str:
return self.markdown

View File

@@ -339,6 +339,10 @@ class OuterProxy:
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
@property
def title_value(self) -> str:
return str(self.__getattribute__(self.TITEL))
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"

View File

@@ -95,7 +95,7 @@ class Song(Base):
target_collection: Collection[Target]
lyrics_collection: Collection[Lyrics]
main_artist_collection: Collection[Artist]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
album_collection: Collection[Album]
@@ -107,8 +107,8 @@ class Song(Base):
"lyrics_collection": Collection,
"artwork": Artwork,
"main_artist_collection": Collection,
"album_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"title": lambda: None,
@@ -129,7 +129,7 @@ class Song(Base):
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None,
artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None,
album_list: List[Album] = None,
tracksort: int = 0,
@@ -141,23 +141,26 @@ class Song(Base):
Base.__init__(**real_kwargs)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title"
def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.sync_on_append = {
"artist_collection": self.main_artist_collection,
"artist_collection": self.artist_collection,
}
self.album_collection.append_object_to_attribute = {
"song_collection": self,
}
self.main_artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection
self.artist_collection.extend_object_to_attribute = {
"album_collection": self.album_collection
}
self.feature_artist_collection.extend_object_to_attribute = {
"album_collection": self.album_collection
}
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
@@ -200,14 +203,14 @@ class Song(Base):
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
metadata.merge_many([a.metadata for a in self.album_collection])
metadata.merge_many([a.metadata for a in self.main_artist_collection])
metadata.merge_many([a.metadata for a in self.artist_collection])
metadata.merge_many([a.metadata for a in self.feature_artist_collection])
metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection])
return metadata
def get_artist_credits(self) -> str:
main_artists = ", ".join([artist.name for artist in self.main_artist_collection])
main_artists = ", ".join([artist.name for artist in self.artist_collection])
feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection])
if len(feature_artists) == 0:
@@ -216,10 +219,11 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "song "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}" if len(self.artist_collection) > 0 else " by {}")
return r
@property
@@ -234,11 +238,6 @@ class Song(Base):
return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}"
"""
All objects dependent on Album
"""
class Album(Base):
title: str
unified_title: str
@@ -252,8 +251,9 @@ class Album(Base):
source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
label_collection: Collection[Label]
_default_factories = {
@@ -269,8 +269,10 @@ class Album(Base):
"notes": FormattedText,
"source_collection": SourceCollection,
"artist_collection": Collection,
"song_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"label_collection": Collection,
}
@@ -303,15 +305,18 @@ class Album(Base):
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.song_collection.append_object_to_attribute = {
"album_collection": self
}
self.song_collection.sync_on_append = {
"main_artist_collection": self.artist_collection
"artist_collection": self.artist_collection
}
self.artist_collection.append_object_to_attribute = {
"main_album_collection": self
"album_collection": self
}
self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection
@@ -365,8 +370,11 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "album "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
if len(self.artist_collection) <= 0:
r += get_collection_string(self.feature_artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0:
@@ -376,6 +384,7 @@ class Album(Base):
def _compile(self):
self.analyze_implied_album_type()
self.update_tracksort()
self.fix_artist_collection()
def analyze_implied_album_type(self):
# if the song collection has only one song, it is reasonable to assume that it is a single
@@ -417,6 +426,16 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i
def fix_artist_collection(self):
"""
I add artists, that could only be feature artists to the feature artist collection.
They get automatically moved to main artist collection, if a matching artist exists in the main artist collection or is appended to it later on.
If I am not sure for any artist, I try to analyze the most common artist in the song collection of one album.
"""
# move all artists that are in all feature_artist_collections, of every song, to the artist_collection
pass
@property
def copyright(self) -> str:
if self.date is None:
@@ -461,7 +480,7 @@ class Artist(Base):
source_collection: SourceCollection
contact_collection: Collection[Contact]
main_album_collection: Collection[Album]
album_collection: Collection[Album]
label_collection: Collection[Label]
_default_factories = {
@@ -475,7 +494,7 @@ class Artist(Base):
"general_genre": lambda: "",
"source_collection": SourceCollection,
"main_album_collection": Collection,
"album_collection": Collection,
"contact_collection": Collection,
"label_collection": Collection,
}
@@ -496,7 +515,7 @@ class Artist(Base):
source_list: List[Source] = None,
contact_list: List[Contact] = None,
feature_song_list: List[Song] = None,
main_album_list: List[Album] = None,
album_list: List[Album] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
@@ -506,12 +525,12 @@ class Artist(Base):
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection",)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self):
self.main_album_collection.append_object_to_attribute = {
"artist_collection": self
self.album_collection.append_object_to_attribute = {
"feature_artist_collection": self
}
self.label_collection.append_object_to_attribute = {
@@ -527,7 +546,7 @@ class Artist(Base):
return
if object_type is Album:
self.main_album_collection.extend(object_list)
self.album_collection.extend(object_list)
return
if object_type is Label:
@@ -540,7 +559,7 @@ class Artist(Base):
def update_albumsort(self):
"""
This updates the albumsort attributes, of the albums in
`self.main_album_collection`, and sorts the albums, if possible.
`self.album_collection`, and sorts the albums, if possible.
It is advised to only call this function, once all the albums are
added to the artist.
@@ -558,7 +577,7 @@ class Artist(Base):
# order albums in the previously defined section
album: Album
for album in self.main_album_collection:
for album in self.album_collection:
sections[type_section[album.album_type]].append(album)
def sort_section(_section: List[Album], last_albumsort: int) -> int:
@@ -589,7 +608,7 @@ class Artist(Base):
album_list.extend(sections[section_index])
# replace the old collection with the new one
self.main_album_collection._data = album_list
self.album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property
@@ -611,12 +630,13 @@ class Artist(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "artist "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value
if len(self.main_album_collection) > 0:
r += f" with {len(self.main_album_collection)} albums"
if len(self.album_collection) > 0:
r += f" with {len(self.album_collection)} albums"
r += BColors.ENDC.value
@@ -704,4 +724,4 @@ class Label(Base):
@property
def option_string(self):
return OPTION_FOREGROUND.value + self.name + BColors.ENDC.value
return "label " + OPTION_FOREGROUND.value + self.name + BColors.ENDC.value

View File

@@ -1,7 +1,52 @@
from .encyclopaedia_metallum import EncyclopaediaMetallum
from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp
import importlib
import inspect
import logging
import pkgutil
import sys
from collections import defaultdict
from copy import copy
from pathlib import Path
from typing import Dict, Generator, List, Set, Type
from .abstract import Page, INDEPENDENT_DB_OBJECTS
from ._bandcamp import Bandcamp
from ._encyclopaedia_metallum import EncyclopaediaMetallum
from ._genius import Genius
from ._musify import Musify
from ._youtube import YouTube
from ._youtube_music import YoutubeMusic
def import_children():
_page_directory = Path(__file__).parent
_stem_blacklist = set(["__pycache__", "__init__"])
for _file in _page_directory.iterdir():
if _file.stem in _stem_blacklist:
continue
logging.debug(f"importing {_file.absolute()}")
exec(f"from . import {_file.stem}")
# module_blacklist = set(sys.modules.keys())
import_children()
"""
classes = set()
print(__name__)
for module_name, module in sys.modules.items():
if module_name in module_blacklist or not module_name.startswith(__name__):
continue
print("scanning module", module_name)
for name, obj in inspect.getmembers(module, predicate=inspect.isclass):
_module = obj.__module__
if _module.startswith(__name__) and hasattr(obj, "SOURCE_TYPE"):
print("checking object", name, obj.__module__)
classes.add(obj)
print()
print(*(c.__name__ for c in classes), sep=",\t")
__all__ = [c.__name__ for c in classes]
"""

View File

@@ -1,33 +1,22 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourceType,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
if DEBUG:
from ..utils import dump_to_file
@@ -117,7 +106,7 @@ class Bandcamp(Page):
return Song(
title=clean_song_title(name, artist_name=data["band_name"]),
source_list=source_list,
main_artist_list=[
artist_list=[
Artist(
name=data["band_name"],
source_list=[
@@ -237,7 +226,7 @@ class Bandcamp(Page):
html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"]
@@ -246,7 +235,7 @@ class Bandcamp(Page):
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None:
artist.main_album_collection.extend(
artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url)
)
@@ -370,7 +359,7 @@ class Bandcamp(Page):
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)],
main_artist_list=[Artist(
artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)],

View File

@@ -1,31 +1,20 @@
from collections import defaultdict
from typing import List, Optional, Dict, Type, Union
from bs4 import BeautifulSoup
from typing import Dict, List, Optional, Type, Union
from urllib.parse import urlencode, urlparse
import pycountry
from urllib.parse import urlparse, urlencode
from bs4 import BeautifulSoup
from ..connection import Connection
from ..utils.config import logging_settings
from .abstract import Page
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query
from ..objects import (
Lyrics,
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Options,
DatabaseObject
)
from ..utils.shared import DEBUG
from ..download import Page
from ..objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Lyrics, Options, Song, Source)
from ..utils import dump_to_file
from ..utils.config import logging_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
from ..utils.shared import DEBUG
from ..utils.support_classes.query import Query
ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
"Full-length": AlbumType.STUDIO_ALBUM,
@@ -52,7 +41,7 @@ def _song_from_json(artist_html=None, album_html=None, release_type=None, title=
return Song(
title=title,
main_artist_list=[
artist_list=[
_artist_from_json(artist_html=artist_html)
],
album_list=[
@@ -207,6 +196,7 @@ def create_grid(
class EncyclopaediaMetallum(Page):
REGISTER = False
SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"]
@@ -266,7 +256,7 @@ class EncyclopaediaMetallum(Page):
song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection]
artist_titles = ["*"] if song.artist_collection.empty else [artist.name.strip() for artist in song.artist_collection]
search_results = []
@@ -663,7 +653,7 @@ class EncyclopaediaMetallum(Page):
artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography)
artist.album_collection.extend(discography)
return artist

View File

@@ -0,0 +1,286 @@
import json
from enum import Enum
from typing import List, Optional, Type
from urllib.parse import urlencode, urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file, traverse_json_path
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
if DEBUG:
from ..utils import dump_to_file
class Genius(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS
HOST = "genius.com"
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://genius.com/",
logger=self.LOGGER,
module="genius",
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
path = source.parsed_url.path.replace("/", "")
if path.startswith("artists"):
return Artist
if path.startswith("albums"):
return Album
return Song
def add_to_artwork(self, artwork: Artwork, url: str):
if url is None:
return
url_frags = url.split(".")
if len(url_frags) < 2:
artwork.append(url=url)
return
dimensions = url_frags[-2].split("x")
if len(dimensions) < 2:
artwork.append(url=url)
return
if len(dimensions) == 3:
dimensions = dimensions[:-1]
try:
artwork.append(url=url, width=int(dimensions[0]), height=int(dimensions[1]))
except ValueError:
artwork.append(url=url)
def parse_api_object(self, data: dict) -> Optional[DatabaseObject]:
if data is None:
return None
object_type = data.get("_type")
artwork = Artwork()
self.add_to_artwork(artwork, data.get("header_image_url"))
self.add_to_artwork(artwork, data.get("image_url"))
additional_sources: List[Source] = []
source: Source = Source(self.SOURCE_TYPE, data.get("url"), additional_data={
"id": data.get("id"),
"slug": data.get("slug"),
"api_path": data.get("api_path"),
})
notes = FormattedText()
description = data.get("description") or {}
if "html" in description:
notes.html = description["html"]
elif "markdown" in description:
notes.markdown = description["markdown"]
elif "description_preview" in data:
notes.plaintext = data["description_preview"]
if source.url is None:
return None
if object_type == "artist":
if data.get("instagram_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.INSTAGRAM, f"https://www.instagram.com/{data['instagram_name']}/"))
if data.get("facebook_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.FACEBOOK, f"https://www.facebook.com/{data['facebook_name']}/"))
if data.get("twitter_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.TWITTER, f"https://x.com/{data['twitter_name']}/"))
return Artist(
name=data["name"].strip() if data.get("name") is not None else None,
source_list=[source],
artwork=artwork,
notes=notes,
)
if object_type == "album":
self.add_to_artwork(artwork, data.get("cover_art_thumbnail_url"))
self.add_to_artwork(artwork, data.get("cover_art_url"))
for cover_art in data.get("cover_arts", []):
self.add_to_artwork(artwork, cover_art.get("image_url"))
self.add_to_artwork(artwork, cover_art.get("thumbnail_image_url"))
return Album(
title=data.get("name").strip(),
source_list=[source],
artist_list=[self.parse_api_object(data.get("artist"))],
artwork=artwork,
date=ID3Timestamp(**data.get("release_date_components", {})),
)
if object_type == "song":
self.add_to_artwork(artwork, data.get("song_art_image_thumbnail_url"))
self.add_to_artwork(artwork, data.get("song_art_image_url"))
main_artist_list = []
featured_artist_list = []
_artist_name = None
primary_artist = self.parse_api_object(data.get("primary_artist"))
if primary_artist is not None:
_artist_name = primary_artist.name
main_artist_list.append(primary_artist)
for feature_artist in (*(data.get("featured_artists") or []), *(data.get("producer_artists") or []), *(data.get("writer_artists") or [])):
artist = self.parse_api_object(feature_artist)
if artist is not None:
featured_artist_list.append(artist)
return Song(
title=clean_song_title(data.get("title"), artist_name=_artist_name),
source_list=[source],
artwork=artwork,
feature_artist_list=featured_artist_list,
artist_list=main_artist_list,
)
return None
def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]:
results = []
search_params = {
"q": search_query,
}
r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}")
if r is None:
return results
dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
for elements in traverse_json_path(data, "response.sections", default=[]):
hits = elements.get("hits", [])
for hit in hits:
parsed = self.parse_api_object(hit.get("result"))
if parsed is not None:
results.append(parsed)
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist: Artist = Artist()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return artist
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_artist.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
artist = self.parse_api_object(data.get("artist"))
for e in (data.get("artist_albums") or []):
r = self.parse_api_object(e)
if not isinstance(r, Album):
continue
artist.album_collection.append(r)
for e in (data.get("artist_songs") or []):
r = self.parse_api_object(e)
if not isinstance(r, Song):
continue
"""
TODO
fetch the album for these songs, because the api doesn't
return them
"""
artist.album_collection.extend(r.album_collection)
artist.source_collection.append(source)
return artist
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album: Album = Album()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return album
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_album.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
album = self.parse_api_object(data.get("album"))
for e in data.get("album_appearances", []):
r = self.parse_api_object(e.get("song"))
if not isinstance(r, Song):
continue
album.song_collection.append(r)
album.source_collection.append(source)
return album
def get_json_content_from_response(self, response, start: str, end: str) -> Optional[str]:
content = response.text
start_index = content.find(start)
if start_index < 0:
return None
start_index += len(start)
end_index = content.find(end, start_index)
if end_index < 0:
return None
return content[start_index:end_index]
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
song: Song = Song()
r = self.connection.get(source.url, name=source.url)
if r is None:
return song
# get the contents that are between `JSON.parse('` and `');`
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
if content is not None:
content = content.replace("\\\\", "\\").replace('\\"', '"').replace("\\'", "'")
data = json.loads(content)
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)
if lyrics_html is not None:
song.lyrics_collection.append(Lyrics(FormattedText(html=lyrics_html)))
dump_to_file("genius_song_script_json.json", content, is_json=True, exit_after_dump=False)
soup = self.get_soup_from_response(r)
for lyrics in soup.find_all("div", {"data-lyrics-container": "true"}):
lyrics_object = Lyrics(FormattedText(html=lyrics.prettify()))
song.lyrics_collection.append(lyrics_object)
song.source_collection.append(source)
return song

View File

@@ -1,34 +1,23 @@
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Type, Union, Generator, Dict, Any
from typing import Any, Dict, Generator, List, Optional, Type, Union
from urllib.parse import urlparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from .abstract import Page
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType, AlbumStatus
from ..objects import (
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Target,
DatabaseObject,
Lyrics,
Artwork
)
from ..download import Page
from ..objects import (Album, Artist, Artwork, DatabaseObject, FormattedText,
ID3Timestamp, Label, Lyrics, Song, Source, Target)
from ..utils import shared, string_processing
from ..utils.config import logging_settings, main_settings
from ..utils import string_processing, shared
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
"""
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
@@ -1054,7 +1043,7 @@ class Musify(Page):
if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist:
continue
artist.main_album_collection.append(album)
artist.album_collection.append(album)
def fetch_artist(self, source: Source, **kwargs) -> Artist:
"""

View File

@@ -1,29 +1,19 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock
from ..objects import Source, DatabaseObject, Song, Target
from .abstract import Page
from ..objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ..utils.config import logging_settings, main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.string_processing import clean_song_title
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
from ._youtube_music.super_youtube import (SuperYouTube, YouTubeUrl,
YouTubeUrlType, get_invidious_url)
"""
- https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
@@ -38,7 +28,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s
class YouTube(SuperYouTube):
# CHANGE
REGISTER = youtube_settings["use_youtube_alongside_youtube_music"]
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
def __init__(self, *args, **kwargs):
@@ -143,7 +133,7 @@ class YouTube(SuperYouTube):
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list
artist_list=artist_list
), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
@@ -284,7 +274,7 @@ class YouTube(SuperYouTube):
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source])
return Artist(name=artist_name, album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
"""

View File

@@ -3,7 +3,6 @@ from enum import Enum
from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
@@ -58,6 +57,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
song.album_collection.extend(album_list)
return [song]
if len(album_list) == 1:
album = album_list[0]
album.artist_collection.extend(artist_list)
album.song_collection.extend(song_list)
return [album]
"""
if len(artist_list) == 1:
artist = artist_list[0]
artist.main_album_collection.extend(album_list)
return [artist]
"""
return results

View File

@@ -6,7 +6,6 @@ from ...utils.string_processing import clean_song_title
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,

View File

@@ -1,26 +1,17 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
import requests
from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock
import requests
from ...objects import Source, DatabaseObject, Song, Target
from ..abstract import Page
from ...objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ...connection import Connection
from ...download import Page
from ...objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:

View File

@@ -1,46 +1,33 @@
from __future__ import unicode_literals, annotations
from __future__ import annotations, unicode_literals
from typing import Dict, List, Optional, Set, Type
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
import json
import logging
import random
import json
from dataclasses import dataclass
import re
from functools import lru_cache
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Set, Type
from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError
from ...connection import Connection
from ...download import Page
from ...objects import Album, Artist, Artwork
from ...objects import DatabaseObject as DataObject
from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target)
from ...utils import dump_to_file, get_current_millis, traverse_json_path
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.enums.album import AlbumType
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
from ..abstract import Page
from ...objects import (
DatabaseObject as DataObject,
Source,
FormattedText,
ID3Timestamp,
Artwork,
Artist,
Song,
Album,
Label,
Target,
Lyrics,
)
from ...connection import Connection
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube
@@ -549,6 +536,11 @@ class YoutubeMusic(SuperYouTube):
return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
"""
1. fetches the tabs of a song, to get the browse id
2. finds the browse id of the lyrics
3. fetches the lyrics with the browse id
"""
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
@@ -575,7 +567,8 @@ class YoutubeMusic(SuperYouTube):
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is not None:
break
if browse_id is None:
return None
@@ -589,6 +582,8 @@ class YoutubeMusic(SuperYouTube):
},
name=f"fetch_song_lyrics_{video_id}.json"
)
if r is None:
return None
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
@@ -639,7 +634,7 @@ class YoutubeMusic(SuperYouTube):
album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list,
artist_list=artist_list,
source_list=[Source(
self.SOURCE_TYPE,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
@@ -719,7 +714,6 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url[source.url] = {
"url": _best_format.get("url"),
"chunk_size": _best_format.get("downloader_options", {}).get("http_chunk_size", main_settings["chunk_size"]),
"headers": _best_format.get("http_headers", {}),
}

View File

@@ -1,157 +0,0 @@
import logging
import random
import re
from copy import copy
from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
from string import Formatter
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (
Song,
Source,
Album,
Artist,
Target,
DatabaseObject,
Options,
Collection,
Label,
)
from ..utils.enums import SourceType
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
from ..utils import trace, output, BColors
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
class Page:
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -19,7 +19,7 @@ config = Config((
You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results.
Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage.

View File

@@ -17,6 +17,9 @@ class SourceType:
def register_page(self, page: Page):
self.page = page
def deregister_page(self):
self.page = None
def __hash__(self):
return hash(self.name)

View File

@@ -3,6 +3,9 @@ class MKBaseException(Exception):
self.message = message
super().__init__(message, **kwargs)
# Compose exceptions. Those usually mean a bug on my side.
class MKComposeException(MKBaseException):
pass
# Downloading
class MKDownloadException(MKBaseException):

View File

@@ -12,14 +12,14 @@ if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
__stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and False
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_DUMP = DEBUG and True
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:

View File

@@ -24,7 +24,7 @@ class Query:
return [self.music_object.name]
if isinstance(self.music_object, Song):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.main_artist_collection]
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]
if isinstance(self.music_object, Album):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]

View File

@@ -3,96 +3,98 @@ import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase):
@staticmethod
def complicated_object() -> Artist:
return Artist(
name="artist",
country=Country.by_alpha_2("DE"),
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
def test_song_contains_album(self):
"""
Tests that every song contains the album it is added to in its album_collection
"""
a_1 = Album(
title="album",
song_list= [
Song(title="song"),
]
)
a_2 = a_1.song_collection[0].album_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_song_album_relation(self):
def test_album_contains_song(self):
"""
Tests that
album = album.any_song.one_album
is the same object
Tests that every album contains the song it is added to in its song_collection
"""
s_1 = Song(
title="song",
album_list=[
Album(title="album"),
]
)
s_2 = s_1.album_collection[0].song_collection[0]
self.assertTrue(s_1.id == s_2.id)
def test_auto_add_artist_to_album_feature_artist(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default
"""
a = self.complicated_object().main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
def test_artist_artist_relation(self):
artist = Artist(
a_1 = Artist(
name="artist",
main_album_list=[
album_list=[
Album(title="album")
]
)
a_2 = a_1.album_collection[0].feature_artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_auto_add_artist_to_album_feature_artist_push(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default but pulled into the album's artist_collection if a merge exitst
"""
a_1 = Artist(
name="artist",
album_list=[
Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
]
)
a_2 = a_1.album_collection[0].artist_collection[0]
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
self.assertTrue(a_1.id == a_2.id)
def test_artist_artist_relation(self):
"""
Tests the proper syncing between album.artist_collection and song.artist_collection
"""
album = Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
a_1 = album.artist_collection[0]
a_2 = album.song_collection[0].artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_artist_collection_sync(self):
"""
tests the actual implementation of the test above
"""
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
Song(title="song", artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
@@ -102,7 +104,7 @@ class TestCollection(unittest.TestCase):
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
Song(title="song", artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
@@ -111,17 +113,7 @@ class TestCollection(unittest.TestCase):
album_1.merge(album_2)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection))
if __name__ == "__main__":
unittest.main()