26 Commits

Author SHA1 Message Date
130f5edcfe draft: rewrite of interface
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 12:02:12 +02:00
636645e862 feat: implemented genre to external file 2024-06-04 11:00:12 +02:00
df1743c695 feat: implemented better components 2024-06-03 15:04:47 +02:00
ead4f83456 feat: launch programm
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-28 13:45:20 +02:00
4b2dd4a36a feat: launch programm
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-28 13:45:08 +02:00
d4fe99ffc7 feat: dynamic indices
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 16:59:51 +02:00
413d422e2f draft: improved searching one page only 2024-05-27 16:38:03 +02:00
999299c32a draft: improved searching one page only 2024-05-27 16:37:55 +02:00
a0e42fc6ee draft: outline of better select 2024-05-27 15:50:04 +02:00
5cdd4fb6a9 feat: renamed pages
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 14:16:00 +02:00
71ec309953 feat: reworked the genre select
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 14:13:18 +02:00
850c68f3e5 feat: implemented functionality to type out the choice
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 14:00:01 +02:00
7219048422 feat: colored asking for bool 2024-05-27 13:55:18 +02:00
49145a7d93 feat: moved cli helpers to seperate file
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-27 13:41:24 +02:00
0f2229b0f2 feat: completely dynamified the datasource import
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-24 17:00:39 +02:00
5af95f1b03 feat: auto import pages in page module 2024-05-24 15:46:42 +02:00
c24cf701c1 fix: pages were not in the subclasses because the module was never importet 2024-05-24 15:28:47 +02:00
cef87460a7 draft
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-24 14:46:38 +02:00
c0fbd16929 feat: basic select layout 2024-05-24 13:21:07 +02:00
b5a5559f7b feat: created layout for option
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 18:05:18 +02:00
906ddb679d draft:
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 17:27:24 +02:00
cd2e7d7173 draft: moving page interface to downloader module
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 16:33:40 +02:00
c683394228 feat: imports 2024-05-23 16:13:47 +02:00
aafbba3b1c feat: implemented consistent settings
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 14:36:19 +02:00
40e9366a0b feat: implemented the new page mechanics in the downloader 2024-05-23 14:32:31 +02:00
8255ad5264 feat: added detection to autoscann pages
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 14:24:20 +02:00
40 changed files with 1053 additions and 1377 deletions

6
.vscode/launch.json vendored
View File

@@ -17,6 +17,12 @@
"request": "launch", "request": "launch",
"program": "development/actual_donwload.py", "program": "development/actual_donwload.py",
"console": "integratedTerminal" "console": "integratedTerminal"
},
{
"name": "Python Debugger: Music Kraken",
"type": "debugpy",
"request": "launch", // run the module
"program": "${workspaceFolder}/.vscode/run_script.py",
} }
] ]
} }

3
.vscode/run_script.py vendored Normal file
View File

@@ -0,0 +1,3 @@
from music_kraken.__main__ import cli
cli()

View File

@@ -20,7 +20,6 @@
"APIC", "APIC",
"Bandcamp", "Bandcamp",
"bitrate", "bitrate",
"CALLSTACK",
"DEEZER", "DEEZER",
"dotenv", "dotenv",
"encyclopaedia", "encyclopaedia",

View File

@@ -1,13 +1,15 @@
import logging
import music_kraken import music_kraken
import logging
print("Setting logging-level to DEBUG") print("Setting logging-level to DEBUG")
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Ghost Bath", "s: #a Crystal F",
"10",
"1",
"3",
] ]

View File

@@ -1,13 +1,13 @@
import logging
import gc import gc
import logging
import sys import sys
from pathlib import Path from pathlib import Path
from rich.logging import RichHandler
from rich.console import Console from rich.console import Console
from rich.logging import RichHandler
from .utils.shared import DEBUG, DEBUG_LOGGING
from .utils.config import logging_settings, main_settings, read_config from .utils.config import logging_settings, main_settings, read_config
from .utils.shared import DEBUG, DEBUG_LOGGING
read_config() read_config()

View File

@@ -16,6 +16,7 @@ LOGGER = logging_settings["tagging_logger"]
artwork_connection: Connection = Connection() artwork_connection: Connection = Connection()
class AudioMetadata: class AudioMetadata:
def __init__(self, file_location: str = None) -> None: def __init__(self, file_location: str = None) -> None:
self._file_location = None self._file_location = None
@@ -67,14 +68,13 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
id3_object = AudioMetadata(file_location=target.file_path) id3_object = AudioMetadata(file_location=target.file_path)
LOGGER.info(str(metadata)) LOGGER.info(str(metadata))
## REWRITE COMPLETLY !!!!!!!!!!!!
if len(song.artwork._data) != 0: if song.artwork.best_variant is not None:
variants = song.artwork._data.__getitem__(0) best_variant = song.artwork.best_variant
best_variant = variants.variants.__getitem__(0)
r = artwork_connection.get( r = artwork_connection.get(
url=best_variant.url, url=best_variant["url"],
name=best_variant.url, name=song.artwork.get_variant_name(best_variant),
) )
temp_target: Target = Target.temp() temp_target: Target = Target.temp()
@@ -106,9 +106,9 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
APIC( APIC(
encoding=0, encoding=0,
mime="image/jpeg", mime="image/jpeg",
type=3, type=mutagen.id3.PictureType.COVER_FRONT,
desc=u"Cover", desc=u"Cover",
data=converted_target.raw_content, data=converted_target.read_bytes(),
) )
) )
id3_object.frames.delall("USLT") id3_object.frames.delall("USLT")

53
music_kraken/cli/genre.py Normal file
View File

@@ -0,0 +1,53 @@
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from ..download import Downloader, Page, components
from ..utils.config import main_settings
from .utils import ask_for_bool, cli_function
class GenreIO(components.HumanIO):
@staticmethod
def ask_to_create(option: components.Option) -> bool:
output()
return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}")
@staticmethod
def not_found(key: str) -> None:
output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL)
def _genre_generator() -> Generator[str, None, None]:
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
def get_genre() -> str:
select_genre = components.Select(
human_io=GenreIO,
can_create_options=True,
data=_genre_generator(),
)
genre: Optional[components.Option[str]] = None
while genre is None:
print(select_genre.pprint())
print()
genre = select_genre.choose(input("> "))
return genre.value

View File

@@ -1,89 +1,26 @@
import random import random
from typing import Set, Type, Dict, List
from pathlib import Path
import re import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from .utils import cli_function from .. import console
from .options.first_config import initial_config from ..download import Downloader, Page, components
from ..download.results import GoToResults
from ..utils import output, BColors from ..download.results import Option as ResultOption
from ..utils.config import write_config, main_settings from ..download.results import PageResults, Results
from ..utils.shared import URL_PATTERN from ..objects import Album, Artist, DatabaseObject, Song
from ..utils.string_processing import fit_to_file_system from ..utils import BColors, output
from ..utils.support_classes.query import Query from ..utils.config import main_settings, write_config
from ..utils.support_classes.download_result import DownloadResult from ..utils.enums.colors import BColors
from ..utils.exception import MKInvalidInputException from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from .. import console from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..download.results import Results, Option, PageResults, GoToResults from ..utils.support_classes.query import Query
from ..download.page_attributes import Pages from .genre import get_genre
from ..pages import Page from .options.first_config import initial_config
from ..objects import Song, Album, Artist, DatabaseObject from .utils import ask_for_bool, cli_function
"""
This is the implementation of the Shell
# Behaviour
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```
"""
EXIT_COMMANDS = {"q", "quit", "exit", "abort"} EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz" ALPHABET = "abcdefghijklmnopqrstuvwxyz"
@@ -91,59 +28,17 @@ PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21 MAX_PAGE_LEN = 21
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def help_message(): def help_message():
print(HELP_MESSAGE)
print() print()
print(random.choice(main_settings["happy_messages"])) print(random.choice(main_settings["happy_messages"]))
print() print()
class Downloader: class CliDownloader:
def __init__( def __init__(
self, self,
exclude_pages: Set[Type[Page]] = None, exclude_pages: Set[Type[Page]] = None,
@@ -153,7 +48,7 @@ class Downloader:
genre: str = None, genre: str = None,
process_metadata_anyway: bool = False, process_metadata_anyway: bool = False,
) -> None: ) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady) self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict() self.page_dict: Dict[str, Type[Page]] = dict()
@@ -171,13 +66,16 @@ class Downloader:
output() output()
def print_current_options(self): def print_current_options(self):
self.page_dict = dict()
print() print()
print(self.current_results.pprint())
"""
self.page_dict = dict()
page_count = 0 page_count = 0
for option in self.current_results.formatted_generator(): for option in self.current_results.formatted_generator():
if isinstance(option, Option): if isinstance(option, ResultOption):
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r) print(r)
else: else:
@@ -189,10 +87,13 @@ class Downloader:
self.page_dict[option.__name__] = option self.page_dict[option.__name__] = option
page_count += 1 page_count += 1
"""
print() print()
def set_current_options(self, current_options: Results): def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]):
current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options)
if main_settings["result_history"]: if main_settings["result_history"]:
self._result_history.append(current_options) self._result_history.append(current_options)
@@ -242,7 +143,7 @@ class Downloader:
def search(self, query: str): def search(self, query: str):
if re.match(URL_PATTERN, query) is not None: if re.match(URL_PATTERN, query) is not None:
try: try:
page, data_object = self.pages.fetch_url(query) data_object = self.downloader.fetch_url(query)
except UrlNotFoundException as e: except UrlNotFoundException as e:
print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n" print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n"
f"PR appreciated if the site isn't implemented.\n" f"PR appreciated if the site isn't implemented.\n"
@@ -296,15 +197,17 @@ class Downloader:
parsed_query: Query = self._process_parsed(key_text, query) parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.pages.search(parsed_query)) self.set_current_options(self.downloader.search(parsed_query))
self.print_current_options() self.print_current_options()
def goto(self, data_object: DatabaseObject): def goto(self, data_object: Union[DatabaseObject, components.Select]):
page: Type[Page] page: Type[Page]
self.pages.fetch_details(data_object, stop_at_level=1) if isinstance(data_object, components.Select):
self.set_current_options(data_object)
self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options)) else:
self.downloader.fetch_details(data_object, stop_at_level=1)
self.set_current_options(data_object.options)
self.print_current_options() self.print_current_options()
@@ -316,7 +219,7 @@ class Downloader:
_result_map: Dict[DatabaseObject, DownloadResult] = dict() _result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects: for database_object in data_objects:
r = self.pages.download( r = self.downloader.download(
data_object=database_object, data_object=database_object,
genre=self.genre, genre=self.genre,
**kwargs **kwargs
@@ -371,24 +274,15 @@ class Downloader:
indices = [] indices = []
for possible_index in q.split(","): for possible_index in q.split(","):
possible_index = possible_index.strip()
if possible_index == "": if possible_index == "":
continue continue
i = 0 if possible_index not in self.current_results:
try: raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.")
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
if i < 0 or i >= len(self.current_results): yield self.current_results[possible_index]
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i) selected_objects = list(get_selected_objects(query))
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge: if do_merge:
old_selected_objects = selected_objects old_selected_objects = selected_objects
@@ -403,19 +297,19 @@ class Downloader:
if do_fetch: if do_fetch:
for data_object in selected_objects: for data_object in selected_objects:
self.pages.fetch_details(data_object) self.downloader.fetch_details(data_object)
self.print_current_options() self.print_current_options()
return False return False
if do_download: if do_download:
self.download(selected_objects) self.download(list(o.value for o in selected_objects))
return False return False
if len(selected_objects) != 1: if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.") raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0]) self.goto(selected_objects[0].value)
return False return False
except MKInvalidInputException as e: except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL) output("\n" + e.message + "\n", color=BColors.FAIL)
@@ -446,7 +340,7 @@ def download(
else: else:
print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}") print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}")
shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway) shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None: if command_list is not None:
for command in command_list: for command in command_list:

View File

@@ -1,3 +1,4 @@
from ..utils import BColors
from ..utils.shared import get_random_message from ..utils.shared import get_random_message
@@ -39,4 +40,8 @@ def print_cute_message():
print(message) print(message)
AGREE_INPUTS = {"y", "yes", "ok"}
def ask_for_bool(msg: str) -> bool:
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
return i in AGREE_INPUTS

View File

@@ -1,12 +1,12 @@
from __future__ import annotations from __future__ import annotations
import copy
import inspect
import logging import logging
import threading import threading
import time import time
from typing import TYPE_CHECKING, Dict, List, Optional, Set from typing import List, Dict, Optional, Set
from urllib.parse import ParseResult, urlparse, urlunsplit from urllib.parse import urlparse, urlunsplit, ParseResult
import copy
import inspect
import requests import requests
import responses import responses
@@ -14,15 +14,12 @@ from tqdm import tqdm
from .cache import Cache from .cache import Cache
from .rotating import RotatingProxy from .rotating import RotatingProxy
from ..objects import Target
if TYPE_CHECKING:
from ..objects import Target
from ..utils import request_trace from ..utils import request_trace
from ..utils.config import main_settings
from ..utils.hacking import merge_args
from ..utils.string_processing import shorten_display_url from ..utils.string_processing import shorten_display_url
from ..utils.config import main_settings
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.hacking import merge_args
class Connection: class Connection:

View File

@@ -1,8 +1,36 @@
from dataclasses import dataclass, field from __future__ import annotations
from typing import Set
from ..utils.config import main_settings import logging
import random
import re
from collections import defaultdict
from copy import copy
from dataclasses import dataclass, field
from pathlib import Path
from string import Formatter
from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List,
Optional, Set, Tuple, Type, TypedDict, Union)
import requests
from bs4 import BeautifulSoup
from ..audio import correct_codec, write_metadata_to_target
from ..connection import Connection
from ..objects import Album, Artist, Collection
from ..objects import DatabaseObject as DataObject
from ..objects import Label, Options, Song, Source, Target
from ..utils import BColors, output, trace
from ..utils.config import main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType
from ..utils.exception import MKComposeException, MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.path_manager import LOCATIONS
from ..utils.shared import DEBUG_PAGES
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .results import SearchResults
@dataclass @dataclass
@@ -19,3 +47,402 @@ class DownloadOptions:
download_again_if_found: bool = False download_again_if_found: bool = False
process_audio_if_found: bool = False process_audio_if_found: bool = False
process_metadata_if_found: bool = True process_metadata_if_found: bool = True
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
class Downloader:
def __init__(
self,
auto_register_pages: bool = True,
download_options: DownloadOptions = None,
fetch_options: FetchOptions = None,
**kwargs
):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set)
if auto_register_pages:
self.scan_for_pages(**kwargs)
def register_page(self, page_type: Type[Page], **kwargs):
if page_type in self._registered_pages:
return
self._registered_pages[page_type].add(page_type(
download_options=self.download_options,
fetch_options=self.fetch_options,
**kwargs
))
def deregister_page(self, page_type: Type[Page]):
if page_type not in _registered_pages:
return
for p in self._registered_pages[page_type]:
p.__del__()
del self._registered_pages[page_type]
def scan_for_pages(self, **kwargs):
# assuming the wanted pages are the leaf classes of the interface
from .. import pages
leaf_classes = []
class_list = [Page]
while len(class_list):
_class = class_list.pop()
class_subclasses = _class.__subclasses__()
if len(class_subclasses) == 0:
if _class.REGISTER:
leaf_classes.append(_class)
else:
class_list.extend(class_subclasses)
if Page in leaf_classes:
self.LOGGER.warn("couldn't find any data source")
return
for leaf_class in leaf_classes:
self.register_page(leaf_class, **kwargs)
def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]:
if len(page_types) == 0:
page_types = self._registered_pages.keys()
for page_type in page_types:
yield from self._registered_pages[page_type]
def search(self, query: Query) -> Generator[DataObject, None, None]:
for page in self.get_pages():
yield from page.search(query=query)
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, **kwargs) -> DataObject:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None or source.page is None:
raise UrlNotFoundException(url=url)
return source.page.fetch_object_from_source(source=source, **kwargs)
class Page:
REGISTER = True
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
@classmethod
def is_leaf_page(cls) -> bool:
return len(cls.__subclasses__()) == 0
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def __del__(self):
self.SOURCE_TYPE.deregister_page()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DataObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DataObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -0,0 +1,225 @@
from __future__ import annotations
import copy
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List,
Optional, Tuple, TypeVar, Union)
from ...objects import OuterProxy as DataObject
from ...utils import BColors
from ...utils.config import main_settings
from ...utils.enums import SourceType
from ...utils.exception import MKComposeException
from ...utils.shared import ALPHABET
from ...utils.string_processing import unify
P = TypeVar('P')
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option(Generic[P]):
"""
This could represent a data object, a string or a page.
"""
TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}"
ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value")
ATTRIBUTES_KEY: Tuple[str, ...] = ("index", )
def __init__(
self,
value: P,
hidden: bool = False,
additional_keys: List[Hashable] = None,
**kwargs
):
self.value = value
self.hidden = hidden
self._additional_keys = set(self._to_hash(key) for key in additional_keys or [])
for key, value in kwargs.items():
setattr(self, key, value)
super(Option, self).__init__()
def _to_option_string(self, value: Any) -> str:
if hasattr(value, "option_string"):
return value.option_string
return str(value)
@property
def text(self) -> str:
text = self.TEXT_TEMPLATE
for attribute_key in self.ATTRIBUTES_FORMATTING:
text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key)))
return text
def _to_hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def keys(self) -> set:
keys = self._additional_keys.copy()
for key in self.ATTRIBUTES_KEY:
keys.add(self._to_hash(getattr(self, key)))
def __contains__(self, key: Any) -> bool:
return self._to_hash(key) in self.keys
def __str__(self):
return self.text
def __iter__(self) -> Generator[Option[P], None, None]:
yield self
class Select(Generic[P]):
OPTION: Type[Option[P]] = Option
HUMAN_IO: Type[HumanIO] = HumanIO
CAN_CREATE_OPTIONS: bool = False
def __init__(
self,
data: Generator[P, None, None],
**kwargs
):
self.option: Type[Option[P]] = kwargs.get("option", self.OPTION)
self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO)
self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS)
self._options: List[Option[P]] = []
self.extend(data)
super(Select, self).__init__(**kwargs)
def append(self, value: P) -> Option[P]:
option = self.option(value)
self._options.append(option)
return option
def extend(self, values: Generator[P, None, None]):
for value in values:
self.append(value)
@property
def _options_to_show(self) -> Generator[Option[P], None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __iter__(self) -> Generator[Option, None, None]:
_index = 0
for i, o in enumerate(self._options_to_show):
for option in o:
option.index = _index
yield option
_index += 1
def __contains__(self, key: Any) -> bool:
for option in self._options:
if key in option:
return True
return False
def __getitem__(self, key: Any) -> Option[P]:
for option in self._options:
if key in option:
return option
raise KeyError(key)
def choose(self, key: Any) -> Optional[Option[P]]:
try:
return self[key]
except KeyError:
if self.can_create_options:
return self.append(key)
self.human_io.not_found(key)
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class Node(Generator[P]):
def __init__(
self,
value: Optional[P] = None,
children: List[Node[P]] = None,
parent: Node[P] = None,
**kwargs
):
self.value = value
self.depth = 0
self.same_level_index: int = 0
self.children: List[Node[P]] = kwargs.get("children", [])
self.parent: Optional[Node[P]] = kwargs.get("parent", None)
super(Node, self).__init__(**kwargs)
def hash_key(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def is_root(self) -> bool:
return self.parent is None
@property
def is_leaf(self) -> bool:
return not self.children
def __iter__(self, **kwargs) -> Generator[Node[P], None, None]:
_level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0))
self.same_level_index = _level_index_map[self.depth]
yield self
_level_index_map[self.depth] += 1
for child in self.children:
child.depth = self.depth + 1
for node in child.__iter__(level_index_map=_level_index_map):
yield node
def __getitem__(self, key: Any) -> Option[P]:
pass
def __contains__(self, key: Any) -> bool:
if key in self.option:
return True

View File

@@ -0,0 +1 @@
from . import Option, Select

View File

@@ -1,382 +0,0 @@
from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict
from pathlib import Path
import re
import logging
import subprocess
from PIL import Image
from . import FetchOptions, DownloadOptions
from .results import SearchResults
from ..objects import (
DatabaseObject as DataObject,
Collection,
Target,
Source,
Options,
Song,
Album,
Artist,
Label,
)
from ..objects.artwork import ArtworkVariant
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES
from ..connection import Connection
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum,
Genius,
Musify,
YoutubeMusic,
Bandcamp
}
if youtube_settings["use_youtube_alongside_youtube_music"]:
ALL_PAGES.add(YouTube)
AUDIO_PAGES: Set[Type[Page]] = {
Musify,
YouTube,
YoutubeMusic,
Bandcamp
}
SHADY_PAGES: Set[Type[Page]] = {
Musify,
}
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
if DEBUG_PAGES:
DEBUGGING_PAGE = Bandcamp
print(f"Only downloading from page {DEBUGGING_PAGE}.")
ALL_PAGES = {DEBUGGING_PAGE}
AUDIO_PAGES = ALL_PAGES.union(AUDIO_PAGES)
class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set()
if exclude_shady:
exclude_pages = exclude_pages.union(SHADY_PAGES)
if not exclude_pages.issubset(ALL_PAGES):
raise ValueError(
f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]
] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(
self._audio_pages_set)
for page_type in self.pages:
self._page_instances[page_type] = page_type(
fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page:
if source_page not in self._source_to_page:
return None
return self._page_instances[self._source_to_page[source_page]]
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page_type in self.pages:
result.add(
page=page_type,
search_result=self._page_instances[page_type].search(
query=query)
)
return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return data_object
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(
source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def _fetch_artist_artwork(self, artist: Artist, naming: dict):
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["artist"].append(artist.name)
naming["label"].extend(
[l.title_value for l in artist.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
artwork_collection: ArtworkCollection = artist.artwork
artwork_collection.compile()
for image_number, artwork in enumerate(artwork_collection):
for artwork_variant in artwork.variants:
naming["image_number"] = [str(image_number)]
target = Target(
relative_to_music_dir=True,
file_path=Path(self._parse_path_template(
main_settings["artist_artwork_path"], naming=naming))
)
if not target.file_path.parent.exists():
target.create_path()
subprocess.Popen(["gio", "set", target.file_path.parent, "metadata::custom-icon", "file://"+str(target.file_path)])
with Image.open(artwork_variant.target.file_path) as img:
img.save(target.file_path, main_settings["image_format"])
artwork_variant.target = Target
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(
f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
naming = kwargs.get("naming", {
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
"image_format": [main_settings["image_format"]]
})
# download artist artwork
if isinstance(data_object, Artist):
self._fetch_artist_artwork(artist=data_object, naming=naming)
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming=naming)
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(
path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(
f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(
a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(
main_settings["download_path"], naming=naming),
self._parse_path_template(
main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(
f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(
source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(
song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(
metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@@ -1,8 +1,12 @@
from typing import Tuple, Type, Dict, List, Generator, Union from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Generator, List, Tuple, Type, Union
from ..objects import DatabaseObject from ..objects import DatabaseObject
from ..pages import Page, EncyclopaediaMetallum, Musify
if TYPE_CHECKING:
from . import Page
@dataclass @dataclass

View File

@@ -1,16 +1,27 @@
from typing_extensions import TypeVar from typing_extensions import TypeVar
from .artwork import ArtworkCollection
from .collection import Collection
from .contact import Contact
from .country import Country
from .formatted_text import FormattedText
from .metadata import ID3Timestamp
from .metadata import Mapping as ID3Mapping
from .metadata import Metadata
from .option import Options from .option import Options
from .parents import OuterProxy
from .song import Album, Artist, Label, Lyrics, Song, Target from .metadata import Metadata, Mapping as ID3Mapping, ID3Timestamp
from .source import Source, SourceType from .source import Source, SourceType
from .song import (
Song,
Album,
Artist,
Target,
Lyrics,
Label
)
from .formatted_text import FormattedText
from .collection import Collection
from .country import Country
from .contact import Contact
from .parents import OuterProxy
from .artwork import Artwork
DatabaseObject = OuterProxy DatabaseObject = OuterProxy

View File

@@ -1,243 +1,64 @@
from __future__ import annotations from __future__ import annotations
from copy import copy from typing import List, Optional, Dict, Tuple, Type, Union, TypedDict
from dataclasses import dataclass, field
from functools import cached_property
from typing import Dict, List, Optional, Set, Tuple, Type, TypedDict, Union
from ..connection import Connection
from ..utils import create_dataclass_instance, custom_hash
from ..utils.config import main_settings
from ..utils.enums import PictureType
from ..utils.string_processing import hash_url, unify
from .collection import Collection from .collection import Collection
from .metadata import ID3Timestamp from .metadata import (
from .metadata import Mapping as id3Mapping Mapping as id3Mapping,
from .metadata import Metadata ID3Timestamp,
Metadata
)
from ..utils.string_processing import unify, hash_url
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
from .target import Target
from PIL import Image
import imagehash from ..utils.config import main_settings
artwork_connection: Connection = Connection(module="artwork")
@dataclass class ArtworkVariant(TypedDict):
class ArtworkVariant:
url: str url: str
width: Optional[int] = None width: int
heigth: Optional[int] = None height: int
image_format: Optional[str] = None deviation: float
def __hash__(self) -> int:
return custom_hash(self.url)
def __eq__(self, other: ArtworkVariant) -> bool: class Artwork:
return hash(self) == hash(other) def __init__(self, *variants: List[ArtworkVariant]) -> None:
self._variant_mapping: Dict[str, ArtworkVariant] = {}
def __contains__(self, other: str) -> bool: for variant in variants:
return custom_hash(other) == hash(self.url) self.append(**variant)
def __merge__(self, other: ArtworkVariant) -> None: @staticmethod
for key, value in other.__dict__.items(): def _calculate_deviation(*dimensions: List[int]) -> float:
if value is None: return sum(abs(d - main_settings["preferred_artwork_resolution"]) for d in dimensions) / len(dimensions)
continue
if getattr(self, key) is None: def append(self, url: str, width: int = main_settings["preferred_artwork_resolution"], height: int = main_settings["preferred_artwork_resolution"], **kwargs) -> None:
setattr(self, key, value) if url is None:
@cached_property
def target(self) -> Target:
return Target.temp()
def fetch(self) -> None:
global artwork_connection
r = artwork_connection.get(self.url, name=hash_url(self.url))
if r is None:
return return
self.target.raw_content = r.content self._variant_mapping[hash_url(url=url)] = {
"url": url,
@dataclass "width": width,
class Artwork: "height": height,
variants: List[ArtworkVariant] = field(default_factory=list) "deviation": self._calculate_deviation(width, height),
}
artwork_type: PictureType = PictureType.OTHER
def search_variant(self, url: str) -> Optional[ArtworkVariant]:
if url is None:
return None
for variant in self.variants:
if url in variant:
return variant
return None
def __contains__(self, other: str) -> bool:
return self.search_variant(other) is not None
def add_data(self, **kwargs) -> None:
variant = self.search_variant(kwargs.get("url"))
if variant is None:
variant, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
self.variants.append(variant)
variant.__dict__.update(kwargs)
@property @property
def url(self) -> Optional[str]: def best_variant(self) -> ArtworkVariant:
if len(self.variants) <= 0: if len(self._variant_mapping.keys()) <= 0:
return None return None
return self.variants[0].url return min(self._variant_mapping.values(), key=lambda x: x["deviation"])
def fetch(self) -> None:
for variant in self.variants:
variant.fetch()
class ArtworkCollection:
"""
Stores all the images/artworks for one data object.
There could be duplicates before calling ArtworkCollection.compile()
_this is called before one object is downloaded automatically._
"""
artwork_type: PictureType = PictureType.OTHER
def __init__(
self,
*data: List[Artwork],
parent_artworks: Set[ArtworkCollection] = None,
crop_images: bool = True,
) -> None:
# this is used for the song artwork, to fall back to the song artwork
self.parent_artworks: Set[ArtworkCollection] = parent_artworks or set()
self.crop_images: bool = crop_images
self._data = []
self.extend(data)
def search_artwork(self, url: str) -> Optional[ArtworkVariant]:
for artwork in self._data:
if url in artwork:
return artwork
return None
def __contains__(self, other: str) -> bool:
return self.search_artwork(other) is not None
def _create_new_artwork(self, **kwargs) -> Tuple[Artwork, dict]:
kwargs["artwork_type"] = kwargs.get("artwork_type", self.artwork_type)
return create_dataclass_instance(Artwork, dict(**kwargs))
def add_data(self, url: str, **kwargs) -> Artwork:
kwargs["url"] = url
artwork = self.search_artwork(url)
if artwork is None:
artwork, kwargs = self._create_new_artwork(**kwargs)
self._data.append(artwork)
artwork.add_data(**kwargs)
return artwork
def append(self, value: Union[Artwork, ArtworkVariant, dict], **kwargs):
"""
You can append the types Artwork, ArtworkVariant or dict
the best option would be to use Artwork and avoid the other options.
"""
if isinstance(value, dict):
kwargs.update(value)
value, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
if isinstance(value, ArtworkVariant):
kwargs["variants"] = [value]
value, kwargs = create_dataclass_instance(Artwork, kwargs)
if isinstance(value, Artwork):
self._data.append(value)
return
def extend(self, values: List[Union[Artwork, ArtworkVariant, dict]], **kwargs):
for value in values:
self.append(value, **kwargs)
def compile(self, **kwargs) -> None:
"""
This will make the artworks ready for download and delete duplicates.
"""
artwork_hashes: list = list()
artwork_urls: list = list()
for artwork in self._data:
index = 0
for artwork_variant in artwork.variants:
r = artwork_connection.get(
url=artwork_variant.url,
name=artwork_variant.url,
)
if artwork_variant.url in artwork_urls:
artwork.variants.pop(index)
continue
artwork_urls.append(artwork_variant.url)
target: Target = artwork_variant.target
with target.open("wb") as f:
f.write(r.content)
with Image.open(target.file_path) as img:
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
try:
image_hash = imagehash.crop_resistant_hash(img)
except Exception as e:
continue
if image_hash in artwork_hashes:
artwork.variants.pop(index)
target.delete()
continue
artwork_hashes.append(image_hash)
width, height = img.size
if width != height:
if width > height:
img = img.crop((width // 2 - height // 2, 0, width // 2 + height // 2, height))
else:
img = img.crop((0, height // 2 - width // 2, width, height // 2 + width // 2))
# resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
index =+ 1
def __merge__(self, other: ArtworkCollection, **kwargs) -> None:
self.parent_artworks.update(other.parent_artworks)
for other_artwork in other._data:
for other_variant in other_artwork.variants:
if self.__contains__(other_variant.url):
continue
self.append(ArtworkVariant(other_variant.url))
def __hash__(self) -> int:
return id(self)
def __iter__(self) -> Generator[Artwork, None, None]:
yield from self._data
def get_urls(self) -> Generator[str, None, None]:
yield from (artwork.url for artwork in self._data if artwork.url is not None)
def get_variant_name(self, variant: ArtworkVariant) -> str:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
def __merge__(self, other: Artwork, **kwargs) -> None:
for key, value in other._variant_mapping.items():
if key not in self._variant_mapping:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool:
if not isinstance(other, Artwork):
return False
return any(a == b for a, b in zip(self._variant_mapping.keys(), other._variant_mapping.keys()))

View File

@@ -1,43 +1,16 @@
from __future__ import annotations from __future__ import annotations
import copy
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from typing import (Any, Callable, Dict, Generator, Generic, Iterable, import copy
Iterator, List, Optional, Set, Tuple, TypeVar, Union)
from ..utils import BColors, object_trace, output from .parents import OuterProxy
from .parents import InnerData, OuterProxy from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy) T = TypeVar('T', bound=OuterProxy)
@dataclass
class AppendHookArguments:
"""
This class is used to store the arguments for the append hook.
The best explanation is with an examples:
```
album = Album()
song = Song()
album.song_collection.append(song)
```
In this case, the append hook is triggered with the following arguments:
```
AppendHookArguments(
collection=album.song_collection,
new_object=song,
collection_root_objects=[album]
)
```
"""
collection: Collection
new_object: T
collection_root_objects: Set[InnerData]
class Collection(Generic[T]): class Collection(Generic[T]):
__is_collection__ = True __is_collection__ = True
@@ -54,7 +27,6 @@ class Collection(Generic[T]):
sync_on_append: Dict[str, Collection] = None, sync_on_append: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None, append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None, extend_object_to_attribute: Dict[str, Collection] = None,
append_callbacks: Set[Callable[[AppendHookArguments], None]] = None,
) -> None: ) -> None:
self._collection_for: dict = dict() self._collection_for: dict = dict()
@@ -69,7 +41,6 @@ class Collection(Generic[T]):
self.sync_on_append: Dict[str, Collection] = sync_on_append or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = [] self.pull_from: List[Collection] = []
self.push_to: List[Collection] = [] self.push_to: List[Collection] = []
self.append_callbacks: Set[Callable[[AppendHookArguments], None]] = append_callbacks or set()
# This is to cleanly unmap previously mapped items by their id # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
@@ -170,14 +141,6 @@ class Collection(Generic[T]):
for attribute, new_object in self.append_object_to_attribute.items(): for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs) other.__getattribute__(attribute).append(new_object, **kwargs)
append_hook_args = AppendHookArguments(
collection=self,
new_object=other,
collection_root_objects=self._collection_for.keys(),
)
for callback in self.append_callbacks:
callback(append_hook_args)
def append(self, other: Optional[T], **kwargs): def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,

View File

@@ -12,8 +12,8 @@ from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.enums.colors import BColors from ..utils.enums.colors import BColors
from ..utils.shared import DEBUG_PRINT_ID from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .artwork import ArtworkCollection from .artwork import Artwork
from .collection import AppendHookArguments, Collection from .collection import Collection
from .contact import Contact from .contact import Contact
from .country import Country, Language from .country import Country, Language
from .formatted_text import FormattedText from .formatted_text import FormattedText
@@ -86,7 +86,7 @@ class Song(Base):
genre: str genre: str
note: FormattedText note: FormattedText
tracksort: int tracksort: int
artwork: ArtworkCollection artwork: Artwork
source_collection: SourceCollection source_collection: SourceCollection
target_collection: Collection[Target] target_collection: Collection[Target]
@@ -102,7 +102,7 @@ class Song(Base):
"source_collection": SourceCollection, "source_collection": SourceCollection,
"target_collection": Collection, "target_collection": Collection,
"lyrics_collection": Collection, "lyrics_collection": Collection,
"artwork": ArtworkCollection, "artwork": Artwork,
"album_collection": Collection, "album_collection": Collection,
"artist_collection": Collection, "artist_collection": Collection,
@@ -130,7 +130,7 @@ class Song(Base):
feature_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, album_list: List[Album] = None,
tracksort: int = 0, tracksort: int = 0,
artwork: Optional[ArtworkCollection] = None, artwork: Optional[Artwork] = None,
**kwargs **kwargs
) -> None: ) -> None:
real_kwargs = copy.copy(locals()) real_kwargs = copy.copy(locals())
@@ -141,14 +141,6 @@ class Song(Base):
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title" TITEL = "title"
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
album: Album = append_hook_arguments.new_object
song: Song
for song in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(album.artwork)
def __init_collections__(self) -> None: def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection] self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection] self.artist_collection.pull_from = [self.feature_artist_collection]
@@ -166,7 +158,6 @@ class Song(Base):
self.feature_artist_collection.extend_object_to_attribute = { self.feature_artist_collection.extend_object_to_attribute = {
"album_collection": self.album_collection "album_collection": self.album_collection
} }
self.album_collection.append_callbacks = set((Song.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
@@ -184,10 +175,6 @@ class Song(Base):
self.album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
def _compile(self):
self.artwork.compile()
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection") INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property @property
@@ -259,7 +246,6 @@ class Album(Base):
albumsort: int albumsort: int
notes: FormattedText notes: FormattedText
artwork: ArtworkCollection
source_collection: SourceCollection source_collection: SourceCollection
song_collection: Collection[Song] song_collection: Collection[Song]
@@ -279,7 +265,6 @@ class Album(Base):
"date": ID3Timestamp, "date": ID3Timestamp,
"notes": FormattedText, "notes": FormattedText,
"artwork": lambda: ArtworkCollection(crop_images=False),
"source_collection": SourceCollection, "source_collection": SourceCollection,
"song_collection": Collection, "song_collection": Collection,
@@ -302,7 +287,6 @@ class Album(Base):
barcode: str = None, barcode: str = None,
albumsort: int = None, albumsort: int = None,
notes: FormattedText = None, notes: FormattedText = None,
artwork: ArtworkCollection = None,
source_list: List[Source] = None, source_list: List[Source] = None,
artist_list: List[Artist] = None, artist_list: List[Artist] = None,
song_list: List[Song] = None, song_list: List[Song] = None,
@@ -317,13 +301,6 @@ class Album(Base):
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
song: Song = append_hook_arguments.new_object
for root_object in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(root_object.artwork)
def __init_collections__(self): def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection] self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection] self.artist_collection.pull_from = [self.feature_artist_collection]
@@ -342,8 +319,6 @@ class Album(Base):
"label_collection": self.label_collection "label_collection": self.label_collection
} }
self.song_collection.append_callbacks = set((Album.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
self.song_collection.extend(object_list) self.song_collection.extend(object_list)
@@ -499,8 +474,6 @@ class Artist(Base):
general_genre: str general_genre: str
unformatted_location: str unformatted_location: str
artwork: ArtworkCollection
source_collection: SourceCollection source_collection: SourceCollection
contact_collection: Collection[Contact] contact_collection: Collection[Contact]
@@ -517,8 +490,6 @@ class Artist(Base):
"lyrical_themes": list, "lyrical_themes": list,
"general_genre": lambda: "", "general_genre": lambda: "",
"artwork": ArtworkCollection,
"source_collection": SourceCollection, "source_collection": SourceCollection,
"album_collection": Collection, "album_collection": Collection,
"contact_collection": Collection, "contact_collection": Collection,
@@ -537,7 +508,6 @@ class Artist(Base):
notes: FormattedText = None, notes: FormattedText = None,
lyrical_themes: List[str] = None, lyrical_themes: List[str] = None,
general_genre: str = None, general_genre: str = None,
artwork: ArtworkCollection = None,
unformatted_location: str = None, unformatted_location: str = None,
source_list: List[Source] = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, contact_list: List[Contact] = None,

View File

@@ -1,17 +1,17 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, TextIO, Union, Optional
import logging import logging
import random import random
from pathlib import Path
from typing import List, Optional, TextIO, Tuple, Union
import requests import requests
from tqdm import tqdm from tqdm import tqdm
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.string_processing import fit_to_file_system
from .parents import OuterProxy from .parents import OuterProxy
from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings
from ..utils.string_processing import fit_to_file_system
LOGGER = logging.getLogger("target") LOGGER = logging.getLogger("target")
@@ -31,8 +31,7 @@ class Target(OuterProxy):
} }
@classmethod @classmethod
def temp(cls, name: str = None, file_extension: Optional[str] = None) -> P: def temp(cls, name: str = str(random.randint(0, HIGHEST_ID)), file_extension: Optional[str] = None) -> P:
name = name or str(random.randint(0, HIGHEST_ID))
if file_extension is not None: if file_extension is not None:
name = f"{name}.{file_extension}" name = f"{name}.{file_extension}"
@@ -118,11 +117,3 @@ class Target(OuterProxy):
def read_bytes(self) -> bytes: def read_bytes(self) -> bytes:
return self.file_path.read_bytes() return self.file_path.read_bytes()
@property
def raw_content(self) -> bytes:
return self.file_path.read_bytes()
@raw_content.setter
def raw_content(self, content: bytes):
self.file_path.write_bytes(content)

View File

@@ -1,8 +1,52 @@
from .encyclopaedia_metallum import EncyclopaediaMetallum import importlib
from .musify import Musify import inspect
from .youtube import YouTube import logging
from .youtube_music import YoutubeMusic import pkgutil
from .bandcamp import Bandcamp import sys
from .genius import Genius from collections import defaultdict
from copy import copy
from pathlib import Path
from typing import Dict, Generator, List, Set, Type
from .abstract import Page, INDEPENDENT_DB_OBJECTS from ._bandcamp import Bandcamp
from ._encyclopaedia_metallum import EncyclopaediaMetallum
from ._genius import Genius
from ._musify import Musify
from ._youtube import YouTube
from ._youtube_music import YoutubeMusic
def import_children():
_page_directory = Path(__file__).parent
_stem_blacklist = set(["__pycache__", "__init__"])
for _file in _page_directory.iterdir():
if _file.stem in _stem_blacklist:
continue
logging.debug(f"importing {_file.absolute()}")
exec(f"from . import {_file.stem}")
# module_blacklist = set(sys.modules.keys())
import_children()
"""
classes = set()
print(__name__)
for module_name, module in sys.modules.items():
if module_name in module_blacklist or not module_name.startswith(__name__):
continue
print("scanning module", module_name)
for name, obj in inspect.getmembers(module, predicate=inspect.isclass):
_module = obj.__module__
if _module.startswith(__name__) and hasattr(obj, "SOURCE_TYPE"):
print("checking object", name, obj.__module__)
classes.add(obj)
print()
print(*(c.__name__ for c in classes), sep=",\t")
__all__ = [c.__name__ for c in classes]
"""

View File

@@ -7,16 +7,16 @@ import pycountry
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from ..objects import (Album, Artist, ArtworkCollection, Contact, from ..download import Page
DatabaseObject, FormattedText, ID3Timestamp, Label, from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
Lyrics, Song, Source, SourceType, Target) FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file from ..utils import dump_to_file
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG: if DEBUG:
from ..utils import dump_to_file from ..utils import dump_to_file
@@ -228,11 +228,6 @@ class Bandcamp(Page):
for subsoup in html_music_grid.find_all("li"): for subsoup in html_music_grid.find_all("li"):
artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source)) artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
# artist artwork
artist_artwork: BeautifulSoup = soup.find("img", {"class":"band-photo"})
if artist_artwork is not None:
artist.artwork.add_data(artist_artwork.get("data-src", artist_artwork.get("src")))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})): for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"] data_blob = data_blob_soup["data-blob"]
@@ -247,7 +242,7 @@ class Bandcamp(Page):
artist.source_collection.append(source) artist.source_collection.append(source)
return artist return artist
def _parse_track_element(self, track: dict, artwork: ArtworkCollection) -> Optional[Song]: def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]:
lyrics_list: List[Lyrics] = [] lyrics_list: List[Lyrics] = []
_lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text") _lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text")
@@ -281,15 +276,9 @@ class Bandcamp(Page):
artist_source_list = [] artist_source_list = []
if "@id" in artist_data: if "@id" in artist_data:
artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))] artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"])))
album = Album( album = Album(
title=data["name"].strip(), title=data["name"].strip(),
source_list=source_list, source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
@@ -297,7 +286,7 @@ class Bandcamp(Page):
)] )]
) )
artwork: ArtworkCollection = ArtworkCollection() artwork: Artwork = Artwork()
def _get_artwork_url(_data: dict) -> Optional[str]: def _get_artwork_url(_data: dict) -> Optional[str]:
if "image" in _data: if "image" in _data:
@@ -308,14 +297,15 @@ class Bandcamp(Page):
_artwork_url = _get_artwork_url(data) _artwork_url = _get_artwork_url(data)
if _artwork_url is not None: if _artwork_url is not None:
artwork.add_data(url=_artwork_url, width=350, height=350) artwork.append(url=_artwork_url, width=350, height=350)
else: else:
for album_release in data.get("albumRelease", []): for album_release in data.get("albumRelease", []):
_artwork_url = _get_artwork_url(album_release) _artwork_url = _get_artwork_url(album_release)
if _artwork_url is not None: if _artwork_url is not None:
artwork.add_data(url=_artwork_url, width=350, height=350) artwork.append(url=_artwork_url, width=350, height=350)
break break
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])): for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
if DEBUG: if DEBUG:
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False) dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
@@ -361,29 +351,17 @@ class Bandcamp(Page):
for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items(): for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items():
mp3_url = value mp3_url = value
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url))
source_list_album: List[Source] = [source]
if "@id" in album_data:
source_list_album.append(Source(self.SOURCE_TYPE, album_data["@id"]))
source_list_artist: List[Source] = [source]
if "@id" in artist_data:
source_list_artist.append(Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"])))
song = Song( song = Song(
title=clean_song_title(data["name"], artist_name=artist_data["name"]), title=clean_song_title(data["name"], artist_name=artist_data["name"]),
source_list=source_list, source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
album_list=[Album( album_list=[Album(
title=album_data["name"].strip(), title=album_data["name"].strip(),
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"), date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=source_list_album source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
)], )],
artist_list=[Artist( artist_list=[Artist(
name=artist_data["name"].strip(), name=artist_data["name"].strip(),
source_list=source_list_artist source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
)], )],
lyrics_list=self._fetch_lyrics(soup=soup) lyrics_list=self._fetch_lyrics(soup=soup)
) )

View File

@@ -1,31 +1,20 @@
from collections import defaultdict from collections import defaultdict
from typing import List, Optional, Dict, Type, Union from typing import Dict, List, Optional, Type, Union
from bs4 import BeautifulSoup from urllib.parse import urlencode, urlparse
import pycountry import pycountry
from urllib.parse import urlparse, urlencode from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from ..utils.config import logging_settings from ..download import Page
from .abstract import Page from ..objects import (Album, Artist, DatabaseObject, FormattedText,
from ..utils.enums import SourceType, ALL_SOURCE_TYPES ID3Timestamp, Label, Lyrics, Options, Song, Source)
from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query
from ..objects import (
Lyrics,
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Options,
DatabaseObject
)
from ..utils.shared import DEBUG
from ..utils import dump_to_file from ..utils import dump_to_file
from ..utils.config import logging_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
from ..utils.shared import DEBUG
from ..utils.support_classes.query import Query
ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, { ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
"Full-length": AlbumType.STUDIO_ALBUM, "Full-length": AlbumType.STUDIO_ALBUM,
@@ -207,6 +196,7 @@ def create_grid(
class EncyclopaediaMetallum(Page): class EncyclopaediaMetallum(Page):
REGISTER = False
SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"] LOGGER = logging_settings["metal_archives_logger"]

View File

@@ -1,5 +1,4 @@
import simplejson as json import json
from json_unescape import escape_json, unescape_json
from enum import Enum from enum import Enum
from typing import List, Optional, Type from typing import List, Optional, Type
from urllib.parse import urlencode, urlparse, urlunparse from urllib.parse import urlencode, urlparse, urlunparse
@@ -8,16 +7,16 @@ import pycountry
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from ..objects import (Album, Artist, ArtworkCollection, Contact, from ..download import Page
DatabaseObject, FormattedText, ID3Timestamp, Label, from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
Lyrics, Song, Source, SourceType, Target) FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file, traverse_json_path from ..utils import dump_to_file, traverse_json_path
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG: if DEBUG:
from ..utils import dump_to_file from ..utils import dump_to_file
@@ -46,34 +45,34 @@ class Genius(Page):
return Song return Song
def add_to_artwork(self, artwork: ArtworkCollection, url: str): def add_to_artwork(self, artwork: Artwork, url: str):
if url is None: if url is None:
return return
url_frags = url.split(".") url_frags = url.split(".")
if len(url_frags) < 2: if len(url_frags) < 2:
artwork.add_data(url=url) artwork.append(url=url)
return return
dimensions = url_frags[-2].split("x") dimensions = url_frags[-2].split("x")
if len(dimensions) < 2: if len(dimensions) < 2:
artwork.add_data(url=url) artwork.append(url=url)
return return
if len(dimensions) == 3: if len(dimensions) == 3:
dimensions = dimensions[:-1] dimensions = dimensions[:-1]
try: try:
artwork.add_data(url=url, width=int(dimensions[0]), height=int(dimensions[1])) artwork.append(url=url, width=int(dimensions[0]), height=int(dimensions[1]))
except ValueError: except ValueError:
artwork.add_data(url=url) artwork.append(url=url)
def parse_api_object(self, data: dict) -> Optional[DatabaseObject]: def parse_api_object(self, data: dict) -> Optional[DatabaseObject]:
if data is None: if data is None:
return None return None
object_type = data.get("_type") object_type = data.get("_type")
artwork = ArtworkCollection() artwork = Artwork()
self.add_to_artwork(artwork, data.get("header_image_url")) self.add_to_artwork(artwork, data.get("header_image_url"))
self.add_to_artwork(artwork, data.get("image_url")) self.add_to_artwork(artwork, data.get("image_url"))
@@ -124,7 +123,7 @@ class Genius(Page):
source_list=[source], source_list=[source],
artist_list=[self.parse_api_object(data.get("artist"))], artist_list=[self.parse_api_object(data.get("artist"))],
artwork=artwork, artwork=artwork,
date=ID3Timestamp(**(data.get("release_date_components") or {})), date=ID3Timestamp(**data.get("release_date_components", {})),
) )
if object_type == "song": if object_type == "song":
@@ -269,8 +268,7 @@ class Genius(Page):
# get the contents that are between `JSON.parse('` and `');` # get the contents that are between `JSON.parse('` and `');`
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ") content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
if content is not None: if content is not None:
#IMPLEMENT FIX FROM HAZEL content = content.replace("\\\\", "\\").replace('\\"', '"').replace("\\'", "'")
content = escape_json(content)
data = json.loads(content) data = json.loads(content)
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None) lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)

View File

@@ -8,10 +8,9 @@ import pycountry
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from ..connection import Connection from ..connection import Connection
from ..objects import (Album, Artist, DatabaseObject, from ..download import Page
FormattedText, ID3Timestamp, Label, Lyrics, Song, from ..objects import (Album, Artist, Artwork, DatabaseObject, FormattedText,
Source, Target) ID3Timestamp, Label, Lyrics, Song, Source, Target)
from ..objects.artwork import (Artwork, ArtworkVariant, ArtworkCollection)
from ..utils import shared, string_processing from ..utils import shared, string_processing
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType from ..utils.enums import ALL_SOURCE_TYPES, SourceType
@@ -19,7 +18,6 @@ from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from .abstract import Page
""" """
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
@@ -476,11 +474,11 @@ class Musify(Page):
track_name = list_points[4].text.strip() track_name = list_points[4].text.strip()
# album artwork # artwork
artwork: ArtworkCollection = ArtworkCollection() artwork: Artwork = Artwork()
album_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class": "album-img"}) album_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class": "album-img"})
for album_image_element in album_image_element_list: for album_image_element in album_image_element_list:
artwork.add_data(url=album_image_element.get("data-src", album_image_element.get("src"))) artwork.append(url=album_image_element.get("data-src", album_image_element.get("src")))
# lyrics # lyrics
lyrics_container: List[BeautifulSoup] = soup.find_all("div", {"id": "tabLyrics"}) lyrics_container: List[BeautifulSoup] = soup.find_all("div", {"id": "tabLyrics"})
@@ -745,18 +743,11 @@ class Musify(Page):
except ValueError: except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}") self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
# album artwork
album_artwork: ArtworkCollection = ArtworkCollection()
album_artwork_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for album_artwork in album_artwork_list:
album_artwork.add_data(url=album_artwork.get("data-src", album_artwork.get("src")))
return Album( return Album(
title=name, title=name,
source_list=source_list, source_list=source_list,
artist_list=artist_list, artist_list=artist_list,
date=date, date=date
artwork=album_artwork
) )
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
@@ -793,8 +784,6 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup) new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song) album.song_collection.append(new_song)
album.update_tracksort() album.update_tracksort()
return album return album
@@ -914,18 +903,11 @@ class Musify(Page):
if note_soup is not None: if note_soup is not None:
notes.html = note_soup.decode_contents() notes.html = note_soup.decode_contents()
# get artist profile artwork
main_artist_artwork: ArtworkCollection = ArtworkCollection()
artist_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for artist_image_element in artist_image_element_list:
main_artist_artwork.add_data(url=artist_image_element.get("data-src", artist_image_element.get("src")))
return Artist( return Artist(
name=name, name=name,
country=country, country=country,
source_list=source_list, source_list=source_list,
notes=notes, notes=notes
artwork=main_artist_artwork
) )
def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album: def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album:
@@ -1063,29 +1045,19 @@ class Musify(Page):
artist.album_collection.append(album) artist.album_collection.append(album)
def _fetch_artist_artwork(self, source: str, artist: Artist, **kwargs):
# artist artwork
artwork_gallery = self.get_soup_from_response(self.connection.get(source.strip().strip("/") + "/photos"))
if artwork_gallery is not None:
gallery_body_content: BeautifulSoup = artwork_gallery.find(id="bodyContent")
gallery_image_element_list: List[BeautifulSoup] = gallery_body_content.find_all("img")
for gallery_image_element in gallery_image_element_list:
artist.artwork.append(ArtworkVariant(url=gallery_image_element.get("data-src", gallery_image_element.get("src")), width=247, heigth=247))
def fetch_artist(self, source: Source, **kwargs) -> Artist: def fetch_artist(self, source: Source, **kwargs) -> Artist:
""" """
TODO TODO
[x] discography [x] discography
[x] attributes [x] attributes
[x] picture gallery [] picture gallery
""" """
url = parse_url(source.url) url = parse_url(source.url)
artist = self._fetch_initial_artist(url, source=source, **kwargs) artist = self._fetch_initial_artist(url, source=source, **kwargs)
self._fetch_artist_discography(artist, url, artist.name, **kwargs) self._fetch_artist_discography(artist, url, artist.name, **kwargs)
self._fetch_artist_artwork(url.url, artist, **kwargs)
return artist return artist
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:

View File

@@ -1,29 +1,19 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum from enum import Enum
from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock import python_sponsorblock
from ..objects import Source, DatabaseObject, Song, Target
from .abstract import Page
from ..objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ..connection import Connection from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ..utils.config import logging_settings, main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings from ._youtube_music.super_youtube import (SuperYouTube, YouTubeUrl,
YouTubeUrlType, get_invidious_url)
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
""" """
- https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance - https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
@@ -38,7 +28,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s
class YouTube(SuperYouTube): class YouTube(SuperYouTube):
# CHANGE REGISTER = youtube_settings["use_youtube_alongside_youtube_music"]
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

View File

@@ -3,7 +3,6 @@ from enum import Enum
from ...utils.config import logging_settings from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
Source, Source,

View File

@@ -6,7 +6,6 @@ from ...utils.string_processing import clean_song_title
from ...utils.enums import SourceType, ALL_SOURCE_TYPES from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...objects import Source, DatabaseObject from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import ( from ...objects import (
Artist, Artist,
Source, Source,

View File

@@ -1,26 +1,17 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum from enum import Enum
import requests from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock import python_sponsorblock
import requests
from ...objects import Source, DatabaseObject, Song, Target
from ..abstract import Page
from ...objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ...connection import Connection from ...connection import Connection
from ...download import Page
from ...objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:

View File

@@ -15,7 +15,8 @@ from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError from youtube_dl.utils import DownloadError
from ...connection import Connection from ...connection import Connection
from ...objects import Album, Artist, ArtworkCollection from ...download import Page
from ...objects import Album, Artist, Artwork
from ...objects import DatabaseObject as DataObject from ...objects import DatabaseObject as DataObject
from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song, from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target) Source, Target)
@@ -27,7 +28,6 @@ from ...utils.exception.config import SettingValueError
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title from ...utils.string_processing import clean_song_title
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
from ..abstract import Page
from ._list_render import parse_renderer from ._list_render import parse_renderer
from ._music_object_render import parse_run_element from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube from .super_youtube import SuperYouTube
@@ -425,7 +425,6 @@ class YoutubeMusic(SuperYouTube):
data: dict = r.json() data: dict = r.json()
header = data.get("header", {}) header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
musicImmersiveHeaderRenderer = header.get("musicImmersiveHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@@ -438,11 +437,6 @@ class YoutubeMusic(SuperYouTube):
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
# fetch artist artwork
artist_thumbnails = musicImmersiveHeaderRenderer.get("thumbnail", {}).get("musicThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for artist_thumbnail in artist_thumbnails:
artist.artwork.append(artist_thumbnail)
if DEBUG: if DEBUG:
for i, content in enumerate(renderer_list): for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False) dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
@@ -490,11 +484,6 @@ class YoutubeMusic(SuperYouTube):
header = data.get("header", {}) header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {}) musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
# album artwork
album_thumbnails = musicDetailHeaderRenderer.get("thumbnail", {}).get("croppedSquareThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for album_thumbnail in album_thumbnails:
album.artwork.append(value=album_thumbnail)
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", []) title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", []) subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@@ -644,7 +633,7 @@ class YoutubeMusic(SuperYouTube):
note=ydl_res.get("descriptions"), note=ydl_res.get("descriptions"),
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=ArtworkCollection(*ydl_res.get("thumbnails", [])), artwork=Artwork(*ydl_res.get("thumbnails", [])),
artist_list=artist_list, artist_list=artist_list,
source_list=[Source( source_list=[Source(
self.SOURCE_TYPE, self.SOURCE_TYPE,
@@ -683,7 +672,7 @@ class YoutubeMusic(SuperYouTube):
for album in song.album_list: for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []): for thumbnail in video_details.get("thumbnails", []):
song.artwork.add_data(**thumbnail) song.artwork.append(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId"))) song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))

View File

@@ -1,157 +0,0 @@
import logging
import random
import re
from copy import copy
from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
from string import Formatter
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (
Song,
Source,
Album,
Artist,
Target,
DatabaseObject,
Options,
Collection,
Label,
)
from ..utils.enums import SourceType
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
from ..utils import trace, output, BColors
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
class Page:
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@@ -2,17 +2,15 @@ import inspect
import json import json
import logging import logging
from datetime import datetime from datetime import datetime
from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any, List, Union from typing import List, Union
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .hacking import merge_args from .hacking import merge_args
from .path_manager import LOCATIONS from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE, from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE, URL_PATTERN) DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE)
from .string_processing import hash_url, is_url, unify
""" """
IO functions IO functions
@@ -129,34 +127,3 @@ def get_current_millis() -> int:
def get_unix_time() -> int: def get_unix_time() -> int:
return int(datetime.now().timestamp()) return int(datetime.now().timestamp())
@lru_cache
def custom_hash(value: Any) -> int:
if is_url(value):
value = hash_url(value)
elif isinstance(value, str):
try:
value = int(value)
except ValueError:
value = unify(value)
return hash(value)
def create_dataclass_instance(t, data: dict):
"""Creates an instance of a dataclass with the given data.
It filters out all data key, which has no attribute in the dataclass.
Args:
t (Type): The dataclass type class
data (dict): the attribute to pass into the constructor
Returns:
Tuple[Type, dict]: The created instance and a dict, containing the data, which was not used in the creation
"""
needed_data = {k: v for k, v in data.items() if k in t.__dataclass_fields__}
removed_data = {k: v for k, v in data.items() if k not in t.__dataclass_fields__}
return t(**needed_data), removed_data

View File

@@ -1,8 +1,11 @@
from typing import Tuple from typing import Tuple
from .config import Config from .config import Config
from .config_files import main_config, logging_config, youtube_config from .config_files import (
main_config,
logging_config,
youtube_config,
)
_sections: Tuple[Config, ...] = ( _sections: Tuple[Config, ...] = (
main_config.config, main_config.config,

View File

@@ -18,7 +18,6 @@ config = Config((
AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format. AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format.
You can use Audio formats which support ID3.2 and ID3.1, You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""), but you will have cleaner Metadata using ID3.2."""),
Attribute(name="image_format", default_value="jpeg", description="This Changes the format in which images are getting downloaded"),
Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results. Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""), The consequence is a higher meory consumption, because every result is saved."""),
@@ -29,7 +28,6 @@ The further you choose to be able to go back, the higher the memory usage.
EmptyLine(), EmptyLine(),
Attribute(name="preferred_artwork_resolution", default_value=1000), Attribute(name="preferred_artwork_resolution", default_value=1000),
Attribute(name="download_artist_artworks", default_value=True, description="Enables the fetching of artist galleries."),
EmptyLine(), EmptyLine(),
@@ -46,7 +44,6 @@ This means for example, the Studio Albums and EP's are always in front of Single
- album_type - album_type
The folder music kraken should put the songs into."""), The folder music kraken should put the songs into."""),
Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."), Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."),
Attribute(name="artist_artwork_path", default_value="{genre}/{artist}/{artist}_{image_number}.{image_format}", description="The Path to download artist images to."),
SelectAttribute(name="album_type_blacklist", default_value=[ SelectAttribute(name="album_type_blacklist", default_value=[
"Compilation Album", "Compilation Album",
"Live Album", "Live Album",
@@ -155,13 +152,10 @@ class SettingsStructure(TypedDict):
# artwork # artwork
preferred_artwork_resolution: int preferred_artwork_resolution: int
image_format: str
download_artist_artworks: bool
# paths # paths
music_directory: Path music_directory: Path
temp_directory: Path temp_directory: Path
artist_artwork_path: Path
log_file: Path log_file: Path
not_a_genre_regex: List[str] not_a_genre_regex: List[str]
ffmpeg_binary: Path ffmpeg_binary: Path

View File

@@ -1,11 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from typing import Optional, TYPE_CHECKING, Type
from typing import TYPE_CHECKING, Optional, Type
from mutagen.id3 import PictureType
if TYPE_CHECKING: if TYPE_CHECKING:
from ...pages.abstract import Page from ...pages.abstract import Page
@@ -21,6 +17,9 @@ class SourceType:
def register_page(self, page: Page): def register_page(self, page: Page):
self.page = page self.page = page
def deregister_page(self):
self.page = None
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)
@@ -56,73 +55,3 @@ class ALL_SOURCE_TYPES:
MANUAL = SourceType(name="manual") MANUAL = SourceType(name="manual")
PRESET = SourceType(name="preset") PRESET = SourceType(name="preset")
class PictureType(Enum):
"""Enumeration of image types defined by the ID3 standard for the APIC
frame, but also reused in WMA/FLAC/VorbisComment.
This is copied from mutagen.id3.PictureType
"""
OTHER = 0
FILE_ICON = 1
"""32x32 pixels 'file icon' (PNG only)"""
OTHER_FILE_ICON = 2
"""Other file icon"""
COVER_FRONT = 3
"""Cover (front)"""
COVER_BACK = 4
"""Cover (back)"""
LEAFLET_PAGE = 5
"""Leaflet page"""
MEDIA = 6
"""Media (e.g. label side of CD)"""
LEAD_ARTIST = 7
"""Lead artist/lead performer/soloist"""
ARTIST = 8
"""Artist/performer"""
CONDUCTOR = 9
"""Conductor"""
BAND = 10
"""Band/Orchestra"""
COMPOSER = 11
"""Composer"""
LYRICIST = 12
"""Lyricist/text writer"""
RECORDING_LOCATION = 13
"""Recording Location"""
DURING_RECORDING = 14
"""During recording"""
DURING_PERFORMANCE = 15
"""During performance"""
SCREEN_CAPTURE = 16
"""Movie/video screen capture"""
FISH = 17
"""A bright colored fish"""
ILLUSTRATION = 18
"""Illustration"""
BAND_LOGOTYPE = 19
"""Band/artist logotype"""
PUBLISHER_LOGOTYPE = 20
"""Publisher/Studio logotype"""

View File

@@ -3,6 +3,9 @@ class MKBaseException(Exception):
self.message = message self.message = message
super().__init__(message, **kwargs) super().__init__(message, **kwargs)
# Compose exceptions. Those usually mean a bug on my side.
class MKComposeException(MKBaseException):
pass
# Downloading # Downloading
class MKDownloadException(MKBaseException): class MKDownloadException(MKBaseException):

View File

@@ -1,11 +1,11 @@
import random
from dotenv import load_dotenv
from pathlib import Path
import os import os
import random
from pathlib import Path
from dotenv import load_dotenv
from .path_manager import LOCATIONS
from .config import main_settings from .config import main_settings
from .path_manager import LOCATIONS
if not load_dotenv(Path(__file__).parent.parent.parent / ".env"): if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
load_dotenv(Path(__file__).parent.parent.parent / ".env.example") load_dotenv(Path(__file__).parent.parent.parent / ".env.example")
@@ -51,3 +51,6 @@ have fun :3""".strip()
URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+" URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+"
INT_PATTERN = r"^\d*$" INT_PATTERN = r"^\d*$"
FLOAT_PATTERN = r"^[\d|\,|\.]*$" FLOAT_PATTERN = r"^[\d|\,|\.]*$"
ALPHABET = "abcdefghijklmnopqrstuvwxyz"

View File

@@ -1,16 +1,13 @@
import re
import string import string
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple, Union from typing import Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse from urllib.parse import ParseResult, parse_qs, urlparse
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from transliterate import translit from transliterate import translit
from transliterate.exceptions import LanguageDetectionError from transliterate.exceptions import LanguageDetectionError
from .shared import URL_PATTERN
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)", "(official video)",
) )
@@ -182,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str:
r = r.lower().strip() r = r.lower().strip()
return r return r
def hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
def remove_feature_part_from_track(title: str) -> str: def remove_feature_part_from_track(title: str) -> str:
if ")" != title[-1]: if ")" != title[-1]:
@@ -231,13 +239,3 @@ def shorten_display_url(url: str, max_length: int = 150, chars_at_end: int = 4,
return url return url
return url[:max_length] + shorten_string + url[-chars_at_end:] return url[:max_length] + shorten_string + url[-chars_at_end:]
def is_url(value: Any) -> bool:
if isinstance(value, ParseResult):
return True
if not isinstance(value, str):
return True
# value has to be a string
return re.match(URL_PATTERN, value) is not None

View File

@@ -1,13 +1,9 @@
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Tuple from typing import List, Tuple
if TYPE_CHECKING: from ...utils.config import main_settings, logging_settings
from ...objects import Target
from ...utils.config import logging_settings, main_settings
from ...utils.enums.colors import BColors from ...utils.enums.colors import BColors
from ...objects import Target
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"] UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
UNIT_DIVISOR = 1024 UNIT_DIVISOR = 1024