WIP: feature/cleanup_programming_interface #40

Draft
Hazel wants to merge 35 commits from feature/cleanup_programming_interface into experimental
41 changed files with 1193 additions and 905 deletions

6
.vscode/launch.json vendored
View File

@ -17,6 +17,12 @@
"request": "launch",
"program": "development/actual_donwload.py",
"console": "integratedTerminal"
},
{
"name": "Python Debugger: Music Kraken",
"type": "debugpy",
"request": "launch", // run the module
"program": "${workspaceFolder}/.vscode/run_script.py",
}
]
}

3
.vscode/run_script.py vendored Normal file
View File

@ -0,0 +1,3 @@
from music_kraken.__main__ import cli
cli()

View File

@ -1,13 +1,13 @@
import logging
import gc
import logging
import sys
from pathlib import Path
from rich.logging import RichHandler
from rich.console import Console
from rich.logging import RichHandler
from .utils.shared import DEBUG, DEBUG_LOGGING
from .utils.config import logging_settings, main_settings, read_config
from .utils.shared import DEBUG, DEBUG_LOGGING
read_config()

3
music_kraken/__meta__.py Normal file
View File

@ -0,0 +1,3 @@
PROGRAMM: str = "music-kraken"
DESCRIPTION: str = """This program will first get the metadata of various songs from metadata providers like musicbrainz, and then search for download links on pages like bandcamp.
Then it will download the song and edit the metadata accordingly."""

View File

@ -1,14 +1,15 @@
import mutagen
from mutagen.id3 import ID3, Frame, APIC, USLT
import logging
from pathlib import Path
from typing import List
import logging
import mutagen
from mutagen.id3 import APIC, ID3, USLT, Frame
from PIL import Image
from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata
from ..objects.metadata import Mapping
from ..connection import Connection
from ..objects import Metadata, Song, Target
from ..objects.metadata import Mapping
from ..utils.config import logging_settings, main_settings
LOGGER = logging_settings["tagging_logger"]
@ -105,7 +106,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
APIC(
encoding=0,
mime="image/jpeg",
type=3,
type=mutagen.id3.PictureType.COVER_FRONT,
desc=u"Cover",
data=converted_target.read_bytes(),
)

View File

@ -1,5 +0,0 @@
from .informations import print_paths
from .main_downloader import download
from .options.settings import settings
from .options.frontend import set_frontend

View File

@ -1,42 +0,0 @@
from ..utils.shared import get_random_message
def cli_function(function):
def wrapper(*args, **kwargs):
silent = kwargs.get("no_cli", False)
if "no_cli" in kwargs:
del kwargs["no_cli"]
if silent:
return function(*args, **kwargs)
return
code = 0
print_cute_message()
print()
try:
code = function(*args, **kwargs)
except KeyboardInterrupt:
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
finally:
print()
print_cute_message()
print("See you soon! :3")
exit()
return wrapper
def print_cute_message():
message = get_random_message()
try:
print(message)
except UnicodeEncodeError:
message = str(c for c in message if 0 < ord(c) < 127)
print(message)

View File

@ -0,0 +1,82 @@
import argparse
from functools import cached_property
from ..__meta__ import DESCRIPTION, PROGRAMM
from ..download import Downloader
from ..utils import BColors
from ..utils.string_processing import unify
from .utils import HELP_MESSAGE, ask_for_bool, ask_for_create
class DevelopmentCli:
def __init__(self, args: argparse.Namespace):
self.args = args
if args.genre:
self.genre = args.genre
self.downloader: Downloader = Downloader()
@cached_property
def genre(self) -> str:
"""This is a cached property, which means if it isn't set in the constructor or before it is accessed,
the program will be thrown in a shell
Returns:
str: the genre that should be used
"""
option_string = f"{BColors.HEADER}Genres{BColors.ENDC}"
genre_map = {}
_string_list = []
for i, genre in enumerate(self.downloader.get_existing_genres()):
option_string += f"\n{BColors.BOLD}{i}{BColors.ENDC}: {genre}"
genre_map[str(i)] = genre
genre_map[unify(genre)] = genre
genre = None
while genre is None:
print(option_string)
print()
i = input("> ")
u = unify(i)
if u in genre_map:
genre = genre_map[u]
break
if ask_for_create("genre", i):
genre = i
break
return genre
def help_screen(self) -> None:
print(HELP_MESSAGE)
def shell(self) -> None:
print(f"Welcome to the {PROGRAMM} shell!")
print(f"Type '{BColors.OKBLUE}help{BColors.ENDC}' for a list of commands.")
print("")
while True:
i = input("> ")
if i == "help":
self.help_screen()
elif i == "genre":
self.genre
elif i == "exit":
break
else:
print("Unknown command. Type 'help' for a list of commands.")
def main():
parser = argparse.ArgumentParser(
prog=PROGRAMM,
description=DESCRIPTION,
epilog='This is just a development cli. The real frontend is coming soon.'
)
parser.add_argument('--genre', '-g', action='store_const', required=False, help="choose a genre to download from")
args = parser.parse_args()

View File

@ -1,89 +1,26 @@
import random
from typing import Set, Type, Dict, List
from pathlib import Path
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from .utils import cli_function
from .options.first_config import initial_config
from ..utils import output, BColors
from ..utils.config import write_config, main_settings
from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from .. import console
from ..download import Downloader, Page, components
from ..download.results import GoToResults
from ..download.results import Option as ResultOption
from ..download.results import PageResults, Results
from ..objects import Album, Artist, DatabaseObject, Song
from ..utils import BColors, output
from ..utils.config import main_settings, write_config
from ..utils.enums.colors import BColors
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors
from .. import console
from ..download.results import Results, Option, PageResults, GoToResults
from ..download.page_attributes import Pages
from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject
"""
This is the implementation of the Shell
# Behaviour
## Searching
```mkshell
> s: {querry or url}
# examples
> s: https://musify.club/release/some-random-release-183028492
> s: r: #a an Artist #r some random Release
```
Searches for an url, or an query
### Query Syntax
```
#a {artist} #r {release} #t {track}
```
You can escape stuff like `#` doing this: `\#`
## Downloading
To download something, you either need a direct link, or you need to have already searched for options
```mkshell
> d: {option ids or direct url}
# examples
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
```
## Misc
### Exit
```mkshell
> q
> quit
> exit
> abort
```
### Current Options
```mkshell
> .
```
### Previous Options
```
> ..
```
"""
from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .genre import get_genre
from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function
EXIT_COMMANDS = {"q", "quit", "exit", "abort"}
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
@ -91,59 +28,17 @@ PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
def get_existing_genre() -> List[str]:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any patern
from shared.NOT_A_GENRE_REGEX.
"""
existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories:
name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]):
existing_genres.append(name)
existing_genres.sort()
return existing_genres
def get_genre():
existing_genres = get_existing_genre()
for i, genre_option in enumerate(existing_genres):
print(f"{i + 1:0>2}: {genre_option}")
while True:
genre = input("Id or new genre: ")
if genre.isdigit():
genre_id = int(genre) - 1
if genre_id >= len(existing_genres):
print(f"No genre under the id {genre_id + 1}.")
continue
return existing_genres[genre_id]
new_genre = fit_to_file_system(genre)
agree_inputs = {"y", "yes", "ok"}
verification = input(f"create new genre \"{new_genre}\"? (Y/N): ").lower()
if verification in agree_inputs:
return new_genre
def help_message():
print(HELP_MESSAGE)
print()
print(random.choice(main_settings["happy_messages"]))
print()
class Downloader:
class CliDownloader:
def __init__(
self,
exclude_pages: Set[Type[Page]] = None,
@ -153,7 +48,7 @@ class Downloader:
genre: str = None,
process_metadata_anyway: bool = False,
) -> None:
self.pages: Pages = Pages(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
self.page_dict: Dict[str, Type[Page]] = dict()
@ -171,13 +66,16 @@ class Downloader:
output()
def print_current_options(self):
self.page_dict = dict()
print()
print(self.current_results.pprint())
"""
self.page_dict = dict()
page_count = 0
for option in self.current_results.formatted_generator():
if isinstance(option, Option):
if isinstance(option, ResultOption):
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:
@ -189,10 +87,13 @@ class Downloader:
self.page_dict[option.__name__] = option
page_count += 1
"""
print()
def set_current_options(self, current_options: Results):
def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]):
current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options)
if main_settings["result_history"]:
self._result_history.append(current_options)
@ -242,7 +143,7 @@ class Downloader:
def search(self, query: str):
if re.match(URL_PATTERN, query) is not None:
try:
page, data_object = self.pages.fetch_url(query)
data_object = self.downloader.fetch_url(query)
except UrlNotFoundException as e:
print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n"
f"PR appreciated if the site isn't implemented.\n"
@ -296,15 +197,17 @@ class Downloader:
parsed_query: Query = self._process_parsed(key_text, query)
self.set_current_options(self.pages.search(parsed_query))
self.set_current_options(self.downloader.search(parsed_query))
self.print_current_options()
def goto(self, data_object: DatabaseObject):
def goto(self, data_object: Union[DatabaseObject, components.Select]):
page: Type[Page]
self.pages.fetch_details(data_object, stop_at_level=1)
self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options))
if isinstance(data_object, components.Select):
self.set_current_options(data_object)
else:
self.downloader.fetch_details(data_object, stop_at_level=1)
self.set_current_options(data_object.options)
self.print_current_options()
@ -316,7 +219,7 @@ class Downloader:
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects:
r = self.pages.download(
r = self.downloader.download(
data_object=database_object,
genre=self.genre,
**kwargs
@ -371,24 +274,15 @@ class Downloader:
indices = []
for possible_index in q.split(","):
possible_index = possible_index.strip()
if possible_index == "":
continue
if possible_index not in self.current_results:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.")
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
yield self.current_results[possible_index]
if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
selected_objects = list(get_selected_objects(query))
if do_merge:
old_selected_objects = selected_objects
@ -403,19 +297,19 @@ class Downloader:
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.downloader.fetch_details(data_object)
self.print_current_options()
return False
if do_download:
self.download(selected_objects)
self.download(list(o.value for o in selected_objects))
return False
if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0])
self.goto(selected_objects[0].value)
return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
@ -446,7 +340,7 @@ def download(
else:
print(f"{BColors.FAIL.value}Something went wrong configuring.{BColors.ENDC.value}")
shell = Downloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
shell = CliDownloader(genre=genre, process_metadata_anyway=process_metadata_anyway)
if command_list is not None:
for command in command_list:

View File

@ -0,0 +1,85 @@
from ..utils import BColors
from ..utils.shared import get_random_message
def cli_function(function):
def wrapper(*args, **kwargs):
silent = kwargs.get("no_cli", False)
if "no_cli" in kwargs:
del kwargs["no_cli"]
if silent:
return function(*args, **kwargs)
return
code = 0
print_cute_message()
print()
try:
code = function(*args, **kwargs)
except KeyboardInterrupt:
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
finally:
print()
print_cute_message()
print("See you soon! :3")
exit()
return wrapper
def print_cute_message():
message = get_random_message()
try:
print(message)
except UnicodeEncodeError:
message = str(c for c in message if 0 < ord(c) < 127)
print(message)
def highlight_placeholder(text: str) -> str:
return text.replace("<", f"{BColors.BOLD}<").replace(">", f">{BColors.ENDC}")
HELP_MESSAGE = highlight_placeholder(f"""{BColors.HEADER}To search:{BColors.ENDC}
> s: <query/url>
> s: https://musify.club/release/some-random-release-183028492
> s: #a <artist> #r <release> #t <track>
If you found the same object twice from different sources you can merge those objects.
Then it will use those sources. To do so, use the {BColors.BOLD}m{BColors.ENDC} command.
{BColors.HEADER}To download:{BColors.ENDC}
> d: <id/url>
> dm: 0, 3, 4 # merge all objects into one and download this object
> d: 1
> d: https://musify.club/release/some-random-release-183028492
{BColors.HEADER}To inspect an object:{BColors.ENDC}
If you inspect an object, you see its adjacent object. This means for example the releases of an artist, or the tracks of a release.
You can also merge objects with the {BColors.BOLD}m{BColors.ENDC} command here.
> g: <id/url>
> gm: 0, 3, 4 # merge all objects into one and inspect this object
> g: 1
> g: https://musify.club/release/some-random-release-183028492""")
class COMMANDS:
AGREE = {"y", "yes", "ok"}
DISAGREE = {"n", "no"}
EXIT = {"exit"}
HELP = {"help", "h"}
def ask_for_bool(msg: str) -> bool:
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
return i in COMMANDS.AGREE
def ask_for_create(name: str, value: str) -> bool:
return ask_for_bool(f"Do you want to create the {name} {BColors.OKBLUE}{value}{BColors.ENDC}?")

View File

@ -1,8 +1,36 @@
from dataclasses import dataclass, field
from typing import Set
from __future__ import annotations
from ..utils.config import main_settings
import logging
import random
import re
from collections import defaultdict
from copy import copy
from dataclasses import dataclass, field
from pathlib import Path
from string import Formatter
from typing import (TYPE_CHECKING, Any, Callable, Dict, Generator, List,
Optional, Set, Tuple, Type, TypedDict, Union)
import requests
from bs4 import BeautifulSoup
from ..audio import correct_codec, write_metadata_to_target
from ..connection import Connection
from ..objects import Album, Artist, Collection
from ..objects import DatabaseObject as DataObject
from ..objects import Label, Options, Song, Source, Target
from ..utils import BColors, limit_generator, output, trace
from ..utils.config import main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
from ..utils.exception import MKComposeException, MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.path_manager import LOCATIONS
from ..utils.shared import DEBUG_PAGES
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .results import SearchResults
@dataclass
@ -19,3 +47,512 @@ class DownloadOptions:
download_again_if_found: bool = False
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
class Downloader:
def __init__(
self,
auto_register_pages: bool = True,
download_options: DownloadOptions = None,
fetch_options: FetchOptions = None,
**kwargs
):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
self._registered_pages: Dict[Type[Page], Set[Page]] = defaultdict(set)
if auto_register_pages:
self.scan_for_pages(**kwargs)
# manage which pages to use
def register_page(self, page_type: Type[Page], **kwargs):
if page_type in self._registered_pages:
return
self._registered_pages[page_type].add(page_type(
download_options=self.download_options,
fetch_options=self.fetch_options,
**kwargs
))
def deregister_page(self, page_type: Type[Page]):
if page_type not in _registered_pages:
return
for p in self._registered_pages[page_type]:
p.__del__()
del self._registered_pages[page_type]
def scan_for_pages(self, **kwargs):
# assuming the wanted pages are the leaf classes of the interface
from .. import pages
leaf_classes = []
class_list = [Page]
while len(class_list):
_class = class_list.pop()
class_subclasses = _class.__subclasses__()
if len(class_subclasses) == 0:
if _class.REGISTER:
leaf_classes.append(_class)
else:
class_list.extend(class_subclasses)
if Page in leaf_classes:
self.LOGGER.warn("couldn't find any data source")
return
for leaf_class in leaf_classes:
self.register_page(leaf_class, **kwargs)
def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]:
if len(page_types) == 0:
page_types = self._registered_pages.keys()
for page_type in page_types:
yield from self._registered_pages[page_type]
# fetching/downloading data
def search(self, query: Query) -> Generator[DataObject, None, None]:
"""Yields all data objects that were found by the query.
Other than `Downloader.search_yield_pages`, this function just yields all data objects.
This looses the data, where th objects were searched originally, so this might not be the best choice.
Args:
query (Query): The query to search for.
Yields:
Generator[DataObject, None, None]: A generator that yields all found data objects.
"""
for page in self.get_pages():
yield from page.search(query=query)
def search_yield_pages(self, query: Query, results_per_page: Optional[int] = None) -> Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]:
"""Yields all data objects that were found by the query, grouped by the page they were found on.
every yield is a tuple of the page and a generator that yields the data objects.
So this could be how it is used:
```python
for page, data_objects in downloader.search_yield_pages(query):
print(f"Found on {page}:")
for data_object in data_objects:
print(data_object)
```
Args:
query (Query): The query to search for.
results_per_page (Optional[int], optional): If this is set, the generators only yield this amount of data objects per page.
Yields:
Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: yields the page and a generator that yields the data objects.
"""
for page in self.get_pages():
yield page, limit_generator(page.search(query=query), limit=results_per_page)
def fetch_details(self, data_object: DataObject, **kwargs) -> DataObject:
"""Fetches more details for the given data object.
This uses every source contained in data_object.source_collection that has a page.
Args:
data_object (DataObject): The data object to fetch details for.
Returns:
DataObject: The same data object, but with more details.
"""
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, **kwargs)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
"""Gets a data object from the given source.
Args:
source (Source): The source to get the data object from.
Returns:
Optional[DataObject]: If a data object can be retrieved, it is returned. Otherwise, None is returned.
"""
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
"""This function tries to detect the source of the given url and fetches the data object from it.
Args:
url (str): The url to fetch the data object from.
Returns:
Optional[DataObject]: The fetched data object, or None if no source could be detected or if no object could be retrieved.
"""
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
"""Determine if the given data object should be downloaded or not.
Args:
data_object (DataObject): The data object in question.
Returns:
bool: Returns True if the given data object should be skipped.
"""
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
"""Downloads the given data object.
It will recursively fetch all available details for this and every related object.
Then it will create the folder structure and download the audio file.
In the end the metadata will be written to the file.
Args:
data_object (DataObject): The data object to download. If it is a artist, it will download the whole discography, if it is an Album it will download the whole Tracklist.
Returns:
DownloadResult: Relevant information about the download success.
"""
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, **kwargs) -> DataObject:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None or source.page is None:
raise UrlNotFoundException(url=url)
return source.page.fetch_object_from_source(source=source, **kwargs)
# misc function
def get_existing_genres(self) -> Generator[str, None, None]:
"""Yields every existing genre, for the user to select from.
Yields:
Generator[str, None, None]: a generator that yields every existing genre.
"""
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
class Page:
REGISTER = True
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
@classmethod
def is_leaf_page(cls) -> bool:
return len(cls.__subclasses__()) == 0
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def __del__(self):
self.SOURCE_TYPE.deregister_page()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DataObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DataObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@ -0,0 +1,214 @@
from __future__ import annotations
import copy
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List,
Optional, Tuple, TypeVar, Union)
from ...objects import OuterProxy as DataObject
from ...utils import BColors
from ...utils.config import main_settings
from ...utils.enums import SourceType
from ...utils.exception import MKComposeException
from ...utils.shared import ALPHABET
from ...utils.string_processing import unify
P = TypeVar('P')
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option(Generic[P]):
"""
This could represent a data object, a string or a page.
"""
TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}"
ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value")
ATTRIBUTES_KEY: Tuple[str, ...] = ("index", )
def __init__(
self,
value: P,
hidden: bool = False,
additional_keys: List[Hashable] = None,
**kwargs
):
self.value = value
self.hidden = hidden
self._additional_keys = set(self._to_hash(key) for key in additional_keys or [])
for key, value in kwargs.items():
setattr(self, key, value)
super(Option, self).__init__()
def _to_option_string(self, value: Any) -> str:
if hasattr(value, "option_string"):
return value.option_string
return str(value)
@property
def text(self) -> str:
text = self.TEXT_TEMPLATE
for attribute_key in self.ATTRIBUTES_FORMATTING:
text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key)))
return text
def _to_hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def keys(self) -> set:
keys = self._additional_keys.copy()
for key in self.ATTRIBUTES_KEY:
keys.add(self._to_hash(getattr(self, key)))
def __contains__(self, key: Any) -> bool:
return self._to_hash(key) in self.keys
def __str__(self):
return self.text
def __iter__(self) -> Generator[Option[P], None, None]:
yield self
class Select(Generic[P]):
OPTION: Type[Option[P]] = Option
HUMAN_IO: Type[HumanIO] = HumanIO
CAN_CREATE_OPTIONS: bool = False
def __init__(
self,
data: Generator[P, None, None],
**kwargs
):
self.option: Type[Option[P]] = kwargs.get("option", self.OPTION)
self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO)
self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS)
self._options: List[Option[P]] = []
self.extend(data)
super(Select, self).__init__(**kwargs)
def append(self, value: P) -> Option[P]:
option = self.option(value)
self._options.append(option)
return option
def extend(self, values: Generator[P, None, None]):
for value in values:
self.append(value)
@property
def _options_to_show(self) -> Generator[Option[P], None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __iter__(self) -> Generator[Option, None, None]:
_index = 0
for i, o in enumerate(self._options_to_show):
for option in o:
option.index = _index
yield option
_index += 1
def __contains__(self, key: Any) -> bool:
for option in self._options:
if key in option:
return True
return False
def __getitem__(self, key: Any) -> Option[P]:
for option in self._options:
if key in option:
return option
raise KeyError(key)
def choose(self, key: Any) -> Optional[Option[P]]:
try:
return self[key]
except KeyError:
if self.can_create_options:
return self.append(key)
self.human_io.not_found(key)
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class Node(Generator[P]):
def __init__(
self,
value: Optional[P] = None,
children: List[Node[P]] = None,
parent: Node[P] = None,
**kwargs
):
self.value = value
self.depth = 0
self.index: int = 0
self.children: List[Node[P]] = kwargs.get("children", [])
self.parent: Optional[Node[P]] = kwargs.get("parent", None)
super(Node, self).__init__(**kwargs)
@property
def is_root(self) -> bool:
return self.parent is None
@property
def is_leaf(self) -> bool:
return not self.children
def __iter__(self, **kwargs) -> Generator[Node[P], None, None]:
_level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0))
self.index = _level_index_map[self.depth]
yield self
_level_index_map[self.depth] += 1
for child in self.children:
child.depth = self.depth + 1
for node in child.__iter__(level_index_map=_level_index_map):
yield node
def __getitem__(self, key: Any) -> Option[P]:
pass
def __contains__(self, key: Any) -> bool:
if key in self.option:
return True

View File

@ -0,0 +1 @@
from . import Option, Select

View File

@ -1,328 +0,0 @@
from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict
from pathlib import Path
import re
import logging
from . import FetchOptions, DownloadOptions
from .results import SearchResults
from ..objects import (
DatabaseObject as DataObject,
Collection,
Target,
Source,
Options,
Song,
Album,
Artist,
Label,
)
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum,
Genius,
Musify,
YoutubeMusic,
Bandcamp
}
if youtube_settings["use_youtube_alongside_youtube_music"]:
ALL_PAGES.add(YouTube)
AUDIO_PAGES: Set[Type[Page]] = {
Musify,
YouTube,
YoutubeMusic,
Bandcamp
}
SHADY_PAGES: Set[Type[Page]] = {
Musify,
}
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
if DEBUG_PAGES:
DEBUGGING_PAGE = Bandcamp
print(f"Only downloading from page {DEBUGGING_PAGE}.")
ALL_PAGES = {DEBUGGING_PAGE}
AUDIO_PAGES = ALL_PAGES.union(AUDIO_PAGES)
class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set()
if exclude_shady:
exclude_pages = exclude_pages.union(SHADY_PAGES)
if not exclude_pages.issubset(ALL_PAGES):
raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
for page_type in self.pages:
self._page_instances[page_type] = page_type(fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page:
if source_page not in self._source_to_page:
return None
return self._page_instances[self._source_to_page[source_page]]
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page_type in self.pages:
result.add(
page=page_type,
search_result=self._page_instances[page_type].search(query=query)
)
return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return data_object
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@ -1,8 +1,12 @@
from typing import Tuple, Type, Dict, List, Generator, Union
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, Generator, List, Tuple, Type, Union
from ..objects import DatabaseObject
from ..pages import Page, EncyclopaediaMetallum, Musify
if TYPE_CHECKING:
from . import Page
@dataclass

View File

@ -1,35 +1,32 @@
from __future__ import annotations
import copy
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union
import copy
from typing import Dict, List, Optional, Tuple, Type, Union
import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus
from .collection import Collection
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .contact import Contact
from .artwork import Artwork
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from .option import Options
from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.config import main_settings
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.enums.colors import BColors
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .artwork import Artwork
from .collection import Collection
from .contact import Contact
from .country import Country, Language
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .parents import OuterProxy as Base
from ..utils.config import main_settings
from ..utils.enums.colors import BColors
from .parents import P
from .source import Source, SourceCollection
from .target import Target
"""
All Objects dependent

View File

@ -1,8 +1,52 @@
from .encyclopaedia_metallum import EncyclopaediaMetallum
from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp
from .genius import Genius
import importlib
import inspect
import logging
import pkgutil
import sys
from collections import defaultdict
from copy import copy
from pathlib import Path
from typing import Dict, Generator, List, Set, Type
from .abstract import Page, INDEPENDENT_DB_OBJECTS
from ._bandcamp import Bandcamp
from ._encyclopaedia_metallum import EncyclopaediaMetallum
from ._genius import Genius
from ._musify import Musify
from ._youtube import YouTube
from ._youtube_music import YoutubeMusic
def import_children():
_page_directory = Path(__file__).parent
_stem_blacklist = set(["__pycache__", "__init__"])
for _file in _page_directory.iterdir():
if _file.stem in _stem_blacklist:
continue
logging.debug(f"importing {_file.absolute()}")
exec(f"from . import {_file.stem}")
# module_blacklist = set(sys.modules.keys())
import_children()
"""
classes = set()
print(__name__)
for module_name, module in sys.modules.items():
if module_name in module_blacklist or not module_name.startswith(__name__):
continue
print("scanning module", module_name)
for name, obj in inspect.getmembers(module, predicate=inspect.isclass):
_module = obj.__module__
if _module.startswith(__name__) and hasattr(obj, "SOURCE_TYPE"):
print("checking object", name, obj.__module__)
classes.add(obj)
print()
print(*(c.__name__ for c in classes), sep=",\t")
__all__ = [c.__name__ for c in classes]
"""

View File

@ -1,33 +1,22 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourceType,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
if DEBUG:
from ..utils import dump_to_file

View File

@ -1,31 +1,20 @@
from collections import defaultdict
from typing import List, Optional, Dict, Type, Union
from bs4 import BeautifulSoup
from typing import Dict, List, Optional, Type, Union
from urllib.parse import urlencode, urlparse
import pycountry
from urllib.parse import urlparse, urlencode
from bs4 import BeautifulSoup
from ..connection import Connection
from ..utils.config import logging_settings
from .abstract import Page
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query
from ..objects import (
Lyrics,
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Options,
DatabaseObject
)
from ..utils.shared import DEBUG
from ..download import Page
from ..objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Lyrics, Options, Song, Source)
from ..utils import dump_to_file
from ..utils.config import logging_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
from ..utils.shared import DEBUG
from ..utils.support_classes.query import Query
ALBUM_TYPE_MAP: Dict[str, AlbumType] = defaultdict(lambda: AlbumType.OTHER, {
"Full-length": AlbumType.STUDIO_ALBUM,
@ -207,6 +196,7 @@ def create_grid(
class EncyclopaediaMetallum(Page):
REGISTER = False
SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"]

View File

@ -1,33 +1,22 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse, urlencode
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from typing import List, Optional, Type
from urllib.parse import urlencode, urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourceType,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, Artwork, Contact, DatabaseObject,
FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, SourceType, Target)
from ..utils import dump_to_file, traverse_json_path
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
if DEBUG:
from ..utils import dump_to_file

View File

@ -1,34 +1,23 @@
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Type, Union, Generator, Dict, Any
from typing import Any, Dict, Generator, List, Optional, Type, Union
from urllib.parse import urlparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from .abstract import Page
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType, AlbumStatus
from ..objects import (
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Target,
DatabaseObject,
Lyrics,
Artwork
)
from ..download import Page
from ..objects import (Album, Artist, Artwork, DatabaseObject, FormattedText,
ID3Timestamp, Label, Lyrics, Song, Source, Target)
from ..utils import shared, string_processing
from ..utils.config import logging_settings, main_settings
from ..utils import string_processing, shared
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
"""
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent

View File

@ -1,29 +1,19 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock
from ..objects import Source, DatabaseObject, Song, Target
from .abstract import Page
from ..objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ..connection import Connection
from ..download import Page
from ..objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ..utils.config import logging_settings, main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.string_processing import clean_song_title
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
from ._youtube_music.super_youtube import (SuperYouTube, YouTubeUrl,
YouTubeUrlType, get_invidious_url)
"""
- https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
@ -38,7 +28,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s
class YouTube(SuperYouTube):
# CHANGE
REGISTER = youtube_settings["use_youtube_alongside_youtube_music"]
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
def __init__(self, *args, **kwargs):

View File

@ -3,7 +3,6 @@ from enum import Enum
from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,

View File

@ -6,7 +6,6 @@ from ...utils.string_processing import clean_song_title
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,

View File

@ -1,26 +1,17 @@
from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs
from enum import Enum
import requests
from typing import List, Optional, Tuple, Type
from urllib.parse import parse_qs, urlparse, urlunparse
import python_sponsorblock
import requests
from ...objects import Source, DatabaseObject, Song, Target
from ..abstract import Page
from ...objects import (
Artist,
Source,
Song,
Album,
Label,
Target,
FormattedText,
ID3Timestamp
)
from ...connection import Connection
from ...download import Page
from ...objects import (Album, Artist, DatabaseObject, FormattedText,
ID3Timestamp, Label, Song, Source, Target)
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:

View File

@ -1,46 +1,33 @@
from __future__ import unicode_literals, annotations
from __future__ import annotations, unicode_literals
from typing import Dict, List, Optional, Set, Type
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
import json
import logging
import random
import json
from dataclasses import dataclass
import re
from functools import lru_cache
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Set, Type
from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError
from ...connection import Connection
from ...download import Page
from ...objects import Album, Artist, Artwork
from ...objects import DatabaseObject as DataObject
from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target)
from ...utils import dump_to_file, get_current_millis, traverse_json_path
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.enums.album import AlbumType
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
from ..abstract import Page
from ...objects import (
DatabaseObject as DataObject,
Source,
FormattedText,
ID3Timestamp,
Artwork,
Artist,
Song,
Album,
Label,
Target,
Lyrics,
)
from ...connection import Connection
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube

View File

@ -1,157 +0,0 @@
import logging
import random
import re
from copy import copy
from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
from string import Formatter
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (
Song,
Source,
Album,
Artist,
Target,
DatabaseObject,
Options,
Collection,
Label,
)
from ..utils.enums import SourceType
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
from ..utils import trace, output, BColors
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
class Page:
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
return super().__new__(cls)
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
Perform a regex search on the given string, using a single or a list of
patterns returning the first matching group.
In case of failure return a default value or raise a WARNING or a
RegexNotFoundError, depending on fatal, specifying the field name.
"""
if isinstance(pattern, str):
mobj = re.search(pattern, string, flags)
else:
for p in pattern:
mobj = re.search(p, string, flags)
if mobj:
break
if mobj:
if group is None:
# return the first matching group
return next(g for g in mobj.groups() if g is not None)
elif isinstance(group, (list, tuple)):
return tuple(mobj.group(g) for g in group)
else:
return mobj.group(group)
return default
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return None
def get_soup_from_response(self, r: requests.Response) -> BeautifulSoup:
return BeautifulSoup(r.content, "html.parser")
# to search stuff
def search(self, query: Query) -> List[DatabaseObject]:
music_object = query.music_object
search_functions = {
Song: self.song_search,
Album: self.album_search,
Artist: self.artist_search,
Label: self.label_search
}
if type(music_object) in search_functions:
r = search_functions[type(music_object)](music_object)
if r is not None and len(r) > 0:
return r
r = []
for default_query in query.default_search:
for single_option in self.general_search(default_query):
r.append(single_option)
return r
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
pass
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@ -1,15 +1,17 @@
from datetime import datetime
from pathlib import Path
import inspect
import json
import logging
import inspect
from datetime import datetime
from itertools import takewhile
from pathlib import Path
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args
from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE)
"""
IO functions
@ -125,4 +127,9 @@ def get_current_millis() -> int:
def get_unix_time() -> int:
return int(datetime.now().timestamp())
return int(datetime.now().timestamp())
def limit_generator(generator, limit: Optional[int] = None):
return takewhile(lambda x: x < limit, generator) if limit is not None else generator

View File

@ -17,6 +17,9 @@ class SourceType:
def register_page(self, page: Page):
self.page = page
def deregister_page(self):
self.page = None
def __hash__(self):
return hash(self.name)

View File

@ -1,7 +1,7 @@
from enum import Enum
class BColors(Enum):
class BColors:
# https://stackoverflow.com/a/287944
HEADER = "\033[95m"
OKBLUE = "\033[94m"

View File

@ -3,6 +3,9 @@ class MKBaseException(Exception):
self.message = message
super().__init__(message, **kwargs)
# Compose exceptions. Those usually mean a bug on my side.
class MKComposeException(MKBaseException):
pass
# Downloading
class MKDownloadException(MKBaseException):

View File

@ -1,11 +1,11 @@
import random
from dotenv import load_dotenv
from pathlib import Path
import os
import random
from pathlib import Path
from dotenv import load_dotenv
from .path_manager import LOCATIONS
from .config import main_settings
from .path_manager import LOCATIONS
if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
load_dotenv(Path(__file__).parent.parent.parent / ".env.example")
@ -51,3 +51,6 @@ have fun :3""".strip()
URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+"
INT_PATTERN = r"^\d*$"
FLOAT_PATTERN = r"^[\d|\,|\.]*$"
ALPHABET = "abcdefghijklmnopqrstuvwxyz"

View File

@ -1,13 +1,12 @@
from typing import Tuple, Union, Optional
from pathlib import Path
import string
from functools import lru_cache
from pathlib import Path
from typing import Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs
from transliterate import translit
from transliterate.exceptions import LanguageDetectionError
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)",
@ -180,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str:
r = r.lower().strip()
return r
def hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
def remove_feature_part_from_track(title: str) -> str:
if ")" != title[-1]: