Compare commits
22 Commits
49145a7d93
...
feature/cl
| Author | SHA1 | Date | |
|---|---|---|---|
| e53e50b5d2 | |||
| 240bd105f0 | |||
| ed2eeabd6a | |||
| b236291378 | |||
| 097211b3cd | |||
| f839cdf906 | |||
| c306da7934 | |||
| 684c90a7b4 | |||
| b30824baf9 | |||
| 130f5edcfe | |||
| 636645e862 | |||
| df1743c695 | |||
| ead4f83456 | |||
| 4b2dd4a36a | |||
| d4fe99ffc7 | |||
| 413d422e2f | |||
| 999299c32a | |||
| a0e42fc6ee | |||
| 5cdd4fb6a9 | |||
| 71ec309953 | |||
| 850c68f3e5 | |||
| 7219048422 |
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@@ -22,7 +22,7 @@
|
||||
"name": "Python Debugger: Music Kraken",
|
||||
"type": "debugpy",
|
||||
"request": "launch", // run the module
|
||||
"module": "music_kraken",
|
||||
"program": "${workspaceFolder}/.vscode/run_script.py",
|
||||
}
|
||||
]
|
||||
}
|
||||
3
.vscode/run_script.py
vendored
Normal file
3
.vscode/run_script.py
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
from music_kraken.__main__ import cli
|
||||
|
||||
cli()
|
||||
@@ -1,13 +1,13 @@
|
||||
import logging
|
||||
import gc
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from rich.logging import RichHandler
|
||||
from rich.console import Console
|
||||
from rich.logging import RichHandler
|
||||
|
||||
from .utils.shared import DEBUG, DEBUG_LOGGING
|
||||
from .utils.config import logging_settings, main_settings, read_config
|
||||
from .utils.shared import DEBUG, DEBUG_LOGGING
|
||||
|
||||
read_config()
|
||||
|
||||
|
||||
3
music_kraken/__meta__.py
Normal file
3
music_kraken/__meta__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
PROGRAMM: str = "music-kraken"
|
||||
DESCRIPTION: str = """This program will first get the metadata of various songs from metadata providers like musicbrainz, and then search for download links on pages like bandcamp.
|
||||
Then it will download the song and edit the metadata accordingly."""
|
||||
@@ -1,14 +1,15 @@
|
||||
import mutagen
|
||||
from mutagen.id3 import ID3, Frame, APIC, USLT
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
import logging
|
||||
|
||||
import mutagen
|
||||
from mutagen.id3 import APIC, ID3, USLT, Frame
|
||||
from PIL import Image
|
||||
|
||||
from ..utils.config import logging_settings, main_settings
|
||||
from ..objects import Song, Target, Metadata
|
||||
from ..objects.metadata import Mapping
|
||||
from ..connection import Connection
|
||||
from ..objects import Metadata, Song, Target
|
||||
from ..objects.metadata import Mapping
|
||||
from ..utils.config import logging_settings, main_settings
|
||||
|
||||
LOGGER = logging_settings["tagging_logger"]
|
||||
|
||||
@@ -105,7 +106,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
|
||||
APIC(
|
||||
encoding=0,
|
||||
mime="image/jpeg",
|
||||
type=3,
|
||||
type=mutagen.id3.PictureType.COVER_FRONT,
|
||||
desc=u"Cover",
|
||||
data=converted_target.read_bytes(),
|
||||
)
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
from .informations import print_paths
|
||||
from .main_downloader import download
|
||||
from .options.settings import settings
|
||||
from .options.frontend import set_frontend
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
from ..utils.shared import get_random_message
|
||||
|
||||
|
||||
def cli_function(function):
|
||||
def wrapper(*args, **kwargs):
|
||||
silent = kwargs.get("no_cli", False)
|
||||
if "no_cli" in kwargs:
|
||||
del kwargs["no_cli"]
|
||||
|
||||
if silent:
|
||||
return function(*args, **kwargs)
|
||||
return
|
||||
|
||||
code = 0
|
||||
|
||||
print_cute_message()
|
||||
print()
|
||||
try:
|
||||
code = function(*args, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
|
||||
|
||||
finally:
|
||||
print()
|
||||
print_cute_message()
|
||||
print("See you soon! :3")
|
||||
|
||||
exit()
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def print_cute_message():
|
||||
message = get_random_message()
|
||||
try:
|
||||
print(message)
|
||||
except UnicodeEncodeError:
|
||||
message = str(c for c in message if 0 < ord(c) < 127)
|
||||
print(message)
|
||||
|
||||
|
||||
AGREE_INPUTS = {"y", "yes", "ok"}
|
||||
def ask_for_bool(msg: str) -> bool:
|
||||
i = input(msg + " (Y/N):").lower()
|
||||
return i in AGREE_INPUTS
|
||||
|
||||
82
music_kraken/development_cli/__init__.py
Normal file
82
music_kraken/development_cli/__init__.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import argparse
|
||||
from functools import cached_property
|
||||
|
||||
from ..__meta__ import DESCRIPTION, PROGRAMM
|
||||
from ..download import Downloader
|
||||
from ..utils import BColors
|
||||
from ..utils.string_processing import unify
|
||||
from .utils import HELP_MESSAGE, ask_for_bool, ask_for_create
|
||||
|
||||
|
||||
class DevelopmentCli:
|
||||
def __init__(self, args: argparse.Namespace):
|
||||
self.args = args
|
||||
|
||||
if args.genre:
|
||||
self.genre = args.genre
|
||||
|
||||
self.downloader: Downloader = Downloader()
|
||||
|
||||
@cached_property
|
||||
def genre(self) -> str:
|
||||
"""This is a cached property, which means if it isn't set in the constructor or before it is accessed,
|
||||
the program will be thrown in a shell
|
||||
|
||||
Returns:
|
||||
str: the genre that should be used
|
||||
"""
|
||||
option_string = f"{BColors.HEADER}Genres{BColors.ENDC}"
|
||||
genre_map = {}
|
||||
|
||||
_string_list = []
|
||||
for i, genre in enumerate(self.downloader.get_existing_genres()):
|
||||
option_string += f"\n{BColors.BOLD}{i}{BColors.ENDC}: {genre}"
|
||||
|
||||
genre_map[str(i)] = genre
|
||||
genre_map[unify(genre)] = genre
|
||||
|
||||
genre = None
|
||||
while genre is None:
|
||||
print(option_string)
|
||||
print()
|
||||
|
||||
i = input("> ")
|
||||
u = unify(i)
|
||||
if u in genre_map:
|
||||
genre = genre_map[u]
|
||||
break
|
||||
|
||||
if ask_for_create("genre", i):
|
||||
genre = i
|
||||
break
|
||||
|
||||
return genre
|
||||
|
||||
|
||||
def help_screen(self) -> None:
|
||||
print(HELP_MESSAGE)
|
||||
|
||||
def shell(self) -> None:
|
||||
print(f"Welcome to the {PROGRAMM} shell!")
|
||||
print(f"Type '{BColors.OKBLUE}help{BColors.ENDC}' for a list of commands.")
|
||||
print("")
|
||||
|
||||
while True:
|
||||
i = input("> ")
|
||||
if i == "help":
|
||||
self.help_screen()
|
||||
elif i == "genre":
|
||||
self.genre
|
||||
elif i == "exit":
|
||||
break
|
||||
else:
|
||||
print("Unknown command. Type 'help' for a list of commands.")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog=PROGRAMM,
|
||||
description=DESCRIPTION,
|
||||
epilog='This is just a development cli. The real frontend is coming soon.'
|
||||
)
|
||||
parser.add_argument('--genre', '-g', action='store_const', required=False, help="choose a genre to download from")
|
||||
args = parser.parse_args()
|
||||
@@ -1,7 +1,7 @@
|
||||
import random
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, Generator, List, Set, Type
|
||||
from typing import Dict, Generator, List, Set, Type, Union
|
||||
|
||||
from .. import console
|
||||
from ..download import Downloader, Page, components
|
||||
@@ -14,10 +14,11 @@ from ..utils.config import main_settings, write_config
|
||||
from ..utils.enums.colors import BColors
|
||||
from ..utils.exception import MKInvalidInputException
|
||||
from ..utils.exception.download import UrlNotFoundException
|
||||
from ..utils.shared import URL_PATTERN
|
||||
from ..utils.shared import HELP_MESSAGE, URL_PATTERN
|
||||
from ..utils.string_processing import fit_to_file_system
|
||||
from ..utils.support_classes.download_result import DownloadResult
|
||||
from ..utils.support_classes.query import Query
|
||||
from .genre import get_genre
|
||||
from .options.first_config import initial_config
|
||||
from .utils import ask_for_bool, cli_function
|
||||
|
||||
@@ -27,22 +28,11 @@ PAGE_NAME_FILL = "-"
|
||||
MAX_PAGE_LEN = 21
|
||||
|
||||
|
||||
def get_genre():
|
||||
select_genre = components.GenreSelect()
|
||||
select_genre._ask_for_creating_option = lambda key: ask_for_bool(f"Create the genre \"{key}\"")
|
||||
|
||||
genre: Optional[components.Option] = None
|
||||
|
||||
while genre is None:
|
||||
for genre in select_genre:
|
||||
print(genre)
|
||||
|
||||
genre = select_genre.choose(input("Id or new genre: "))
|
||||
|
||||
return genre.value
|
||||
|
||||
|
||||
def help_message():
|
||||
print(HELP_MESSAGE)
|
||||
print()
|
||||
print(random.choice(main_settings["happy_messages"]))
|
||||
print()
|
||||
@@ -58,7 +48,7 @@ class CliDownloader:
|
||||
genre: str = None,
|
||||
process_metadata_anyway: bool = False,
|
||||
) -> None:
|
||||
self.pages: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
|
||||
self.downloader: Downloader = Downloader(exclude_pages=exclude_pages, exclude_shady=exclude_shady)
|
||||
|
||||
self.page_dict: Dict[str, Type[Page]] = dict()
|
||||
|
||||
@@ -76,10 +66,13 @@ class CliDownloader:
|
||||
output()
|
||||
|
||||
def print_current_options(self):
|
||||
self.page_dict = dict()
|
||||
|
||||
print()
|
||||
print(self.current_results.pprint())
|
||||
|
||||
"""
|
||||
self.page_dict = dict()
|
||||
|
||||
page_count = 0
|
||||
for option in self.current_results.formatted_generator():
|
||||
if isinstance(option, ResultOption):
|
||||
@@ -94,10 +87,13 @@ class CliDownloader:
|
||||
self.page_dict[option.__name__] = option
|
||||
|
||||
page_count += 1
|
||||
"""
|
||||
|
||||
print()
|
||||
|
||||
def set_current_options(self, current_options: Results):
|
||||
def set_current_options(self, current_options: Union[Generator[DatabaseObject, None, None], components.Select]):
|
||||
current_options = current_options if isinstance(current_options, components.Select) else components.DataObjectSelect(current_options)
|
||||
|
||||
if main_settings["result_history"]:
|
||||
self._result_history.append(current_options)
|
||||
|
||||
@@ -147,7 +143,7 @@ class CliDownloader:
|
||||
def search(self, query: str):
|
||||
if re.match(URL_PATTERN, query) is not None:
|
||||
try:
|
||||
page, data_object = self.pages.fetch_url(query)
|
||||
data_object = self.downloader.fetch_url(query)
|
||||
except UrlNotFoundException as e:
|
||||
print(f"{e.url} could not be attributed/parsed to any yet implemented site.\n"
|
||||
f"PR appreciated if the site isn't implemented.\n"
|
||||
@@ -201,15 +197,17 @@ class CliDownloader:
|
||||
|
||||
parsed_query: Query = self._process_parsed(key_text, query)
|
||||
|
||||
self.set_current_options(self.pages.search(parsed_query))
|
||||
self.set_current_options(self.downloader.search(parsed_query))
|
||||
self.print_current_options()
|
||||
|
||||
def goto(self, data_object: DatabaseObject):
|
||||
def goto(self, data_object: Union[DatabaseObject, components.Select]):
|
||||
page: Type[Page]
|
||||
|
||||
self.pages.fetch_details(data_object, stop_at_level=1)
|
||||
|
||||
self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options))
|
||||
if isinstance(data_object, components.Select):
|
||||
self.set_current_options(data_object)
|
||||
else:
|
||||
self.downloader.fetch_details(data_object, stop_at_level=1)
|
||||
self.set_current_options(data_object.options)
|
||||
|
||||
self.print_current_options()
|
||||
|
||||
@@ -221,7 +219,7 @@ class CliDownloader:
|
||||
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
|
||||
|
||||
for database_object in data_objects:
|
||||
r = self.pages.download(
|
||||
r = self.downloader.download(
|
||||
data_object=database_object,
|
||||
genre=self.genre,
|
||||
**kwargs
|
||||
@@ -276,24 +274,15 @@ class CliDownloader:
|
||||
|
||||
indices = []
|
||||
for possible_index in q.split(","):
|
||||
possible_index = possible_index.strip()
|
||||
if possible_index == "":
|
||||
continue
|
||||
|
||||
if possible_index not in self.current_results:
|
||||
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not in the current options.")
|
||||
|
||||
i = 0
|
||||
try:
|
||||
i = int(possible_index)
|
||||
except ValueError:
|
||||
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
|
||||
yield self.current_results[possible_index]
|
||||
|
||||
if i < 0 or i >= len(self.current_results):
|
||||
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
|
||||
|
||||
indices.append(i)
|
||||
|
||||
return [self.current_results[i] for i in indices]
|
||||
|
||||
selected_objects = get_selected_objects(query)
|
||||
selected_objects = list(get_selected_objects(query))
|
||||
|
||||
if do_merge:
|
||||
old_selected_objects = selected_objects
|
||||
@@ -308,19 +297,19 @@ class CliDownloader:
|
||||
|
||||
if do_fetch:
|
||||
for data_object in selected_objects:
|
||||
self.pages.fetch_details(data_object)
|
||||
self.downloader.fetch_details(data_object)
|
||||
|
||||
self.print_current_options()
|
||||
return False
|
||||
|
||||
if do_download:
|
||||
self.download(selected_objects)
|
||||
self.download(list(o.value for o in selected_objects))
|
||||
return False
|
||||
|
||||
if len(selected_objects) != 1:
|
||||
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
|
||||
|
||||
self.goto(selected_objects[0])
|
||||
self.goto(selected_objects[0].value)
|
||||
return False
|
||||
except MKInvalidInputException as e:
|
||||
output("\n" + e.message + "\n", color=BColors.FAIL)
|
||||
85
music_kraken/development_cli/utils.py
Normal file
85
music_kraken/development_cli/utils.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from ..utils import BColors
|
||||
from ..utils.shared import get_random_message
|
||||
|
||||
|
||||
def cli_function(function):
|
||||
def wrapper(*args, **kwargs):
|
||||
silent = kwargs.get("no_cli", False)
|
||||
if "no_cli" in kwargs:
|
||||
del kwargs["no_cli"]
|
||||
|
||||
if silent:
|
||||
return function(*args, **kwargs)
|
||||
return
|
||||
|
||||
code = 0
|
||||
|
||||
print_cute_message()
|
||||
print()
|
||||
try:
|
||||
code = function(*args, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
|
||||
|
||||
finally:
|
||||
print()
|
||||
print_cute_message()
|
||||
print("See you soon! :3")
|
||||
|
||||
exit()
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def print_cute_message():
|
||||
message = get_random_message()
|
||||
try:
|
||||
print(message)
|
||||
except UnicodeEncodeError:
|
||||
message = str(c for c in message if 0 < ord(c) < 127)
|
||||
print(message)
|
||||
|
||||
|
||||
def highlight_placeholder(text: str) -> str:
|
||||
return text.replace("<", f"{BColors.BOLD}<").replace(">", f">{BColors.ENDC}")
|
||||
|
||||
|
||||
HELP_MESSAGE = highlight_placeholder(f"""{BColors.HEADER}To search:{BColors.ENDC}
|
||||
> s: <query/url>
|
||||
> s: https://musify.club/release/some-random-release-183028492
|
||||
> s: #a <artist> #r <release> #t <track>
|
||||
|
||||
If you found the same object twice from different sources you can merge those objects.
|
||||
Then it will use those sources. To do so, use the {BColors.BOLD}m{BColors.ENDC} command.
|
||||
|
||||
{BColors.HEADER}To download:{BColors.ENDC}
|
||||
> d: <id/url>
|
||||
> dm: 0, 3, 4 # merge all objects into one and download this object
|
||||
> d: 1
|
||||
> d: https://musify.club/release/some-random-release-183028492
|
||||
|
||||
{BColors.HEADER}To inspect an object:{BColors.ENDC}
|
||||
If you inspect an object, you see its adjacent object. This means for example the releases of an artist, or the tracks of a release.
|
||||
You can also merge objects with the {BColors.BOLD}m{BColors.ENDC} command here.
|
||||
|
||||
> g: <id/url>
|
||||
> gm: 0, 3, 4 # merge all objects into one and inspect this object
|
||||
> g: 1
|
||||
> g: https://musify.club/release/some-random-release-183028492""")
|
||||
|
||||
|
||||
class COMMANDS:
|
||||
AGREE = {"y", "yes", "ok"}
|
||||
DISAGREE = {"n", "no"}
|
||||
EXIT = {"exit"}
|
||||
HELP = {"help", "h"}
|
||||
|
||||
|
||||
def ask_for_bool(msg: str) -> bool:
|
||||
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
|
||||
return i in COMMANDS.AGREE
|
||||
|
||||
|
||||
def ask_for_create(name: str, value: str) -> bool:
|
||||
return ask_for_bool(f"Do you want to create the {name} {BColors.OKBLUE}{value}{BColors.ENDC}?")
|
||||
|
||||
@@ -19,7 +19,7 @@ from ..connection import Connection
|
||||
from ..objects import Album, Artist, Collection
|
||||
from ..objects import DatabaseObject as DataObject
|
||||
from ..objects import Label, Options, Song, Source, Target
|
||||
from ..utils import BColors, output, trace
|
||||
from ..utils import BColors, limit_generator, output, trace
|
||||
from ..utils.config import main_settings, youtube_settings
|
||||
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
|
||||
from ..utils.enums.album import AlbumType
|
||||
@@ -74,6 +74,8 @@ class Downloader:
|
||||
if auto_register_pages:
|
||||
self.scan_for_pages(**kwargs)
|
||||
|
||||
# manage which pages to use
|
||||
|
||||
def register_page(self, page_type: Type[Page], **kwargs):
|
||||
if page_type in self._registered_pages:
|
||||
return
|
||||
@@ -117,34 +119,81 @@ class Downloader:
|
||||
|
||||
def get_pages(self, *page_types: List[Type[Page]]) -> Generator[Page, None, None]:
|
||||
if len(page_types) == 0:
|
||||
page_types = _registered_pages.keys()
|
||||
page_types = self._registered_pages.keys()
|
||||
|
||||
for page_type in page_types:
|
||||
yield from self._registered_pages[page_type]
|
||||
|
||||
def search(self, query: Query) -> SearchResults:
|
||||
result = SearchResults()
|
||||
# fetching/downloading data
|
||||
|
||||
def search(self, query: Query) -> Generator[DataObject, None, None]:
|
||||
"""Yields all data objects that were found by the query.
|
||||
Other than `Downloader.search_yield_pages`, this function just yields all data objects.
|
||||
This looses the data, where th objects were searched originally, so this might not be the best choice.
|
||||
|
||||
Args:
|
||||
query (Query): The query to search for.
|
||||
|
||||
Yields:
|
||||
Generator[DataObject, None, None]: A generator that yields all found data objects.
|
||||
"""
|
||||
|
||||
for page in self.get_pages():
|
||||
yield from page.search(query=query)
|
||||
|
||||
def search_yield_pages(self, query: Query, results_per_page: Optional[int] = None) -> Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]:
|
||||
"""Yields all data objects that were found by the query, grouped by the page they were found on.
|
||||
every yield is a tuple of the page and a generator that yields the data objects.
|
||||
So this could be how it is used:
|
||||
|
||||
```python
|
||||
for page, data_objects in downloader.search_yield_pages(query):
|
||||
print(f"Found on {page}:")
|
||||
for data_object in data_objects:
|
||||
print(data_object)
|
||||
```
|
||||
|
||||
Args:
|
||||
query (Query): The query to search for.
|
||||
results_per_page (Optional[int], optional): If this is set, the generators only yield this amount of data objects per page.
|
||||
|
||||
Yields:
|
||||
Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: yields the page and a generator that yields the data objects.
|
||||
"""
|
||||
|
||||
for page in self.get_pages():
|
||||
result.add(
|
||||
page=type(page),
|
||||
search_result=page.search(query=query)
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
|
||||
yield page, limit_generator(page.search(query=query), limit=results_per_page)
|
||||
|
||||
def fetch_details(self, data_object: DataObject, **kwargs) -> DataObject:
|
||||
"""Fetches more details for the given data object.
|
||||
This uses every source contained in data_object.source_collection that has a page.
|
||||
|
||||
Args:
|
||||
data_object (DataObject): The data object to fetch details for.
|
||||
|
||||
Returns:
|
||||
DataObject: The same data object, but with more details.
|
||||
"""
|
||||
|
||||
source: Source
|
||||
for source in data_object.source_collection.get_sources(source_type_sorting={
|
||||
"only_with_page": True,
|
||||
}):
|
||||
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
|
||||
new_data_object = self.fetch_from_source(source=source, **kwargs)
|
||||
if new_data_object is not None:
|
||||
data_object.merge(new_data_object)
|
||||
|
||||
return data_object
|
||||
|
||||
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
|
||||
"""Gets a data object from the given source.
|
||||
|
||||
Args:
|
||||
source (Source): The source to get the data object from.
|
||||
|
||||
Returns:
|
||||
Optional[DataObject]: If a data object can be retrieved, it is returned. Otherwise, None is returned.
|
||||
"""
|
||||
if not source.has_page:
|
||||
return None
|
||||
|
||||
@@ -161,6 +210,15 @@ class Downloader:
|
||||
return data_object
|
||||
|
||||
def fetch_from_url(self, url: str) -> Optional[DataObject]:
|
||||
"""This function tries to detect the source of the given url and fetches the data object from it.
|
||||
|
||||
Args:
|
||||
url (str): The url to fetch the data object from.
|
||||
|
||||
Returns:
|
||||
Optional[DataObject]: The fetched data object, or None if no source could be detected or if no object could be retrieved.
|
||||
"""
|
||||
|
||||
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
|
||||
if source is None:
|
||||
return None
|
||||
@@ -168,6 +226,14 @@ class Downloader:
|
||||
return self.fetch_from_source(source=source)
|
||||
|
||||
def _skip_object(self, data_object: DataObject) -> bool:
|
||||
"""Determine if the given data object should be downloaded or not.
|
||||
|
||||
Args:
|
||||
data_object (DataObject): The data object in question.
|
||||
|
||||
Returns:
|
||||
bool: Returns True if the given data object should be skipped.
|
||||
"""
|
||||
if isinstance(data_object, Album):
|
||||
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
|
||||
return True
|
||||
@@ -175,6 +241,18 @@ class Downloader:
|
||||
return False
|
||||
|
||||
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
|
||||
"""Downloads the given data object.
|
||||
It will recursively fetch all available details for this and every related object.
|
||||
Then it will create the folder structure and download the audio file.
|
||||
In the end the metadata will be written to the file.
|
||||
|
||||
Args:
|
||||
data_object (DataObject): The data object to download. If it is a artist, it will download the whole discography, if it is an Album it will download the whole Tracklist.
|
||||
|
||||
Returns:
|
||||
DownloadResult: Relevant information about the download success.
|
||||
"""
|
||||
|
||||
# fetch the given object
|
||||
self.fetch_details(data_object)
|
||||
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
|
||||
@@ -337,6 +415,31 @@ class Downloader:
|
||||
|
||||
return source.page.fetch_object_from_source(source=source, **kwargs)
|
||||
|
||||
# misc function
|
||||
|
||||
def get_existing_genres(self) -> Generator[str, None, None]:
|
||||
"""Yields every existing genre, for the user to select from.
|
||||
|
||||
Yields:
|
||||
Generator[str, None, None]: a generator that yields every existing genre.
|
||||
"""
|
||||
|
||||
def is_valid_genre(genre: Path) -> bool:
|
||||
"""
|
||||
gets the name of all subdirectories of shared.MUSIC_DIR,
|
||||
but filters out all directories, where the name matches with any Patern
|
||||
from shared.NOT_A_GENRE_REGEX.
|
||||
"""
|
||||
if not genre.is_dir():
|
||||
return False
|
||||
|
||||
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
|
||||
yield genre.name
|
||||
|
||||
class Page:
|
||||
REGISTER = True
|
||||
|
||||
@@ -1,158 +0,0 @@
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Generator, List, Optional
|
||||
|
||||
from ..utils.config import main_settings
|
||||
from ..utils.exception import MKComposeException
|
||||
from ..utils.string_processing import unify
|
||||
|
||||
|
||||
class Option:
|
||||
"""
|
||||
This could represent a data object, a string or a page.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: Any,
|
||||
text: Optional[str] = None,
|
||||
keys: List[Any] = None,
|
||||
hidden: bool = False,
|
||||
parse_key: Callable[[Any], Any] = lambda x: x,
|
||||
):
|
||||
self._parse_key: Callable[[Any], Any] = parse_key
|
||||
|
||||
self.value = value
|
||||
self.text = text or str(value)
|
||||
self.hidden = hidden
|
||||
|
||||
self._raw_keys = set(keys or [])
|
||||
self._raw_keys.add(self.text)
|
||||
self.keys = set(self.parse_key(key) for key in self._raw_keys)
|
||||
|
||||
def register_key(self, key: Any):
|
||||
self._raw_keys.add(key)
|
||||
self.keys.add(self._parse_key(key))
|
||||
|
||||
@property
|
||||
def parse_key(self) -> Callable[[Any], Any]:
|
||||
return self._parse_key
|
||||
|
||||
@parse_key.setter
|
||||
def parse_key(self, value: Callable[[Any], Any]):
|
||||
self._parse_key = value
|
||||
self.keys = set(self._parse_key(key) for key in self._raw_keys)
|
||||
|
||||
def __str__(self):
|
||||
return self.text
|
||||
|
||||
|
||||
class Select:
|
||||
def __init__(
|
||||
self,
|
||||
options: Generator[Option, None, None] = None,
|
||||
option_factory: Callable[[Any], Option] = None,
|
||||
raw_options: List[Any] = None,
|
||||
parse_option_key: Callable[[Any], Any] = lambda x: x,
|
||||
ask_for_creating_option: Callable[[Option], bool] = lambda x: True,
|
||||
sort: bool = False,
|
||||
**kwargs
|
||||
):
|
||||
self._parse_option_key: Callable[[Any], Any] = parse_option_key
|
||||
self._ask_for_creating_option: Callable[[Option], bool] = ask_for_creating_option
|
||||
|
||||
self._key_to_option: Dict[Any, Option] = dict()
|
||||
self._options: List[Option] = []
|
||||
|
||||
options = options or []
|
||||
self.option_factory: Optional[Callable[[Any], Option]] = option_factory
|
||||
if self.can_create_options:
|
||||
_raw_options = raw_options or []
|
||||
if sort:
|
||||
_raw_options = sorted(_raw_options)
|
||||
|
||||
for raw_option in _raw_options:
|
||||
self.append(self.option_factory(raw_option))
|
||||
elif raw_options is not None:
|
||||
raise MKComposeException("Cannot create options without a factory.")
|
||||
|
||||
self.extend(options)
|
||||
|
||||
@property
|
||||
def can_create_options(self) -> bool:
|
||||
return self.option_factory is not None
|
||||
|
||||
def append(self, option: Option):
|
||||
option.parse_key = self._parse_option_key
|
||||
self._options.append(option)
|
||||
for key in option.keys:
|
||||
self._key_to_option[key] = option
|
||||
|
||||
def extend(self, options: List[Option]):
|
||||
for option in options:
|
||||
self.append(option)
|
||||
|
||||
def __iter__(self) -> Generator[Option, None, None]:
|
||||
for option in self._options:
|
||||
if option.hidden:
|
||||
continue
|
||||
|
||||
yield option
|
||||
|
||||
def __contains__(self, key: Any) -> bool:
|
||||
return key in self._key_to_option
|
||||
|
||||
def __getitem__(self, key: Any) -> Option:
|
||||
return self._key_to_option[key]
|
||||
|
||||
def create_option(self, key: Any, **kwargs) -> Option:
|
||||
if not self.can_create_options:
|
||||
raise MKComposeException("Cannot create options without a factory.")
|
||||
|
||||
option = self.option_factory(key, **kwargs)
|
||||
self.append(option)
|
||||
return option
|
||||
|
||||
def choose(self, key: Any) -> Optional[Option]:
|
||||
if key not in self:
|
||||
if self.can_create_options and self._ask_for_creating_option(key):
|
||||
return self.create_option(key)
|
||||
return None
|
||||
|
||||
return self[key]
|
||||
|
||||
|
||||
|
||||
class StringSelect(Select):
|
||||
def __init__(self, **kwargs):
|
||||
self._current_index = 0
|
||||
kwargs["option_factory"] = self.next_option
|
||||
kwargs["parse_option_key"] = lambda x: unify(str(x))
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def next_option(self, value: Any) -> Optional[Option]:
|
||||
o = Option(value=value, keys=[self._current_index], text=f"{self._current_index:0>2}: {value}")
|
||||
self._current_index += 1
|
||||
return o
|
||||
|
||||
|
||||
class GenreSelect(StringSelect):
|
||||
@staticmethod
|
||||
def is_valid_genre(genre: Path) -> bool:
|
||||
"""
|
||||
gets the name of all subdirectories of shared.MUSIC_DIR,
|
||||
but filters out all directories, where the name matches with any Patern
|
||||
from shared.NOT_A_GENRE_REGEX.
|
||||
"""
|
||||
if not genre.is_dir():
|
||||
return False
|
||||
|
||||
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir())))
|
||||
|
||||
214
music_kraken/download/components/__init__.py
Normal file
214
music_kraken/download/components/__init__.py
Normal file
@@ -0,0 +1,214 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List,
|
||||
Optional, Tuple, TypeVar, Union)
|
||||
|
||||
from ...objects import OuterProxy as DataObject
|
||||
from ...utils import BColors
|
||||
from ...utils.config import main_settings
|
||||
from ...utils.enums import SourceType
|
||||
from ...utils.exception import MKComposeException
|
||||
from ...utils.shared import ALPHABET
|
||||
from ...utils.string_processing import unify
|
||||
|
||||
P = TypeVar('P')
|
||||
|
||||
|
||||
class HumanIO:
|
||||
@staticmethod
|
||||
def ask_to_create(option: Option) -> bool:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def not_found(key: Any) -> None:
|
||||
return None
|
||||
|
||||
class Option(Generic[P]):
|
||||
"""
|
||||
This could represent a data object, a string or a page.
|
||||
"""
|
||||
TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}"
|
||||
ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value")
|
||||
ATTRIBUTES_KEY: Tuple[str, ...] = ("index", )
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
value: P,
|
||||
hidden: bool = False,
|
||||
additional_keys: List[Hashable] = None,
|
||||
**kwargs
|
||||
):
|
||||
self.value = value
|
||||
self.hidden = hidden
|
||||
self._additional_keys = set(self._to_hash(key) for key in additional_keys or [])
|
||||
|
||||
for key, value in kwargs.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
super(Option, self).__init__()
|
||||
|
||||
def _to_option_string(self, value: Any) -> str:
|
||||
if hasattr(value, "option_string"):
|
||||
return value.option_string
|
||||
|
||||
return str(value)
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
text = self.TEXT_TEMPLATE
|
||||
|
||||
for attribute_key in self.ATTRIBUTES_FORMATTING:
|
||||
text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key)))
|
||||
|
||||
return text
|
||||
|
||||
def _to_hash(self, key: Any) -> int:
|
||||
try:
|
||||
key = int(key)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if isinstance(key, str):
|
||||
return hash(unify(key))
|
||||
|
||||
return hash(key)
|
||||
|
||||
@property
|
||||
def keys(self) -> set:
|
||||
keys = self._additional_keys.copy()
|
||||
|
||||
for key in self.ATTRIBUTES_KEY:
|
||||
keys.add(self._to_hash(getattr(self, key)))
|
||||
|
||||
def __contains__(self, key: Any) -> bool:
|
||||
return self._to_hash(key) in self.keys
|
||||
|
||||
def __str__(self):
|
||||
return self.text
|
||||
|
||||
def __iter__(self) -> Generator[Option[P], None, None]:
|
||||
yield self
|
||||
|
||||
|
||||
class Select(Generic[P]):
|
||||
OPTION: Type[Option[P]] = Option
|
||||
HUMAN_IO: Type[HumanIO] = HumanIO
|
||||
CAN_CREATE_OPTIONS: bool = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: Generator[P, None, None],
|
||||
**kwargs
|
||||
):
|
||||
self.option: Type[Option[P]] = kwargs.get("option", self.OPTION)
|
||||
self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO)
|
||||
self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS)
|
||||
|
||||
self._options: List[Option[P]] = []
|
||||
self.extend(data)
|
||||
|
||||
super(Select, self).__init__(**kwargs)
|
||||
|
||||
def append(self, value: P) -> Option[P]:
|
||||
option = self.option(value)
|
||||
self._options.append(option)
|
||||
return option
|
||||
|
||||
def extend(self, values: Generator[P, None, None]):
|
||||
for value in values:
|
||||
self.append(value)
|
||||
|
||||
@property
|
||||
def _options_to_show(self) -> Generator[Option[P], None, None]:
|
||||
for option in self._options:
|
||||
if option.hidden:
|
||||
continue
|
||||
|
||||
yield option
|
||||
|
||||
def __iter__(self) -> Generator[Option, None, None]:
|
||||
_index = 0
|
||||
|
||||
for i, o in enumerate(self._options_to_show):
|
||||
for option in o:
|
||||
option.index = _index
|
||||
yield option
|
||||
_index += 1
|
||||
|
||||
def __contains__(self, key: Any) -> bool:
|
||||
for option in self._options:
|
||||
if key in option:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def __getitem__(self, key: Any) -> Option[P]:
|
||||
for option in self._options:
|
||||
if key in option:
|
||||
return option
|
||||
|
||||
raise KeyError(key)
|
||||
|
||||
def choose(self, key: Any) -> Optional[Option[P]]:
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
if self.can_create_options:
|
||||
return self.append(key)
|
||||
|
||||
self.human_io.not_found(key)
|
||||
|
||||
def pprint(self) -> str:
|
||||
return "\n".join(str(option) for option in self)
|
||||
|
||||
|
||||
class Node(Generator[P]):
|
||||
def __init__(
|
||||
self,
|
||||
value: Optional[P] = None,
|
||||
children: List[Node[P]] = None,
|
||||
parent: Node[P] = None,
|
||||
**kwargs
|
||||
):
|
||||
self.value = value
|
||||
self.depth = 0
|
||||
self.index: int = 0
|
||||
|
||||
|
||||
self.children: List[Node[P]] = kwargs.get("children", [])
|
||||
self.parent: Optional[Node[P]] = kwargs.get("parent", None)
|
||||
|
||||
super(Node, self).__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def is_root(self) -> bool:
|
||||
return self.parent is None
|
||||
|
||||
@property
|
||||
def is_leaf(self) -> bool:
|
||||
return not self.children
|
||||
|
||||
def __iter__(self, **kwargs) -> Generator[Node[P], None, None]:
|
||||
_level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0))
|
||||
|
||||
self.index = _level_index_map[self.depth]
|
||||
yield self
|
||||
_level_index_map[self.depth] += 1
|
||||
|
||||
for child in self.children:
|
||||
child.depth = self.depth + 1
|
||||
|
||||
for node in child.__iter__(level_index_map=_level_index_map):
|
||||
yield node
|
||||
|
||||
def __getitem__(self, key: Any) -> Option[P]:
|
||||
pass
|
||||
|
||||
def __contains__(self, key: Any) -> bool:
|
||||
if key in self.option:
|
||||
return True
|
||||
1
music_kraken/download/components/generic.py
Normal file
1
music_kraken/download/components/generic.py
Normal file
@@ -0,0 +1 @@
|
||||
from . import Option, Select
|
||||
@@ -1,35 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import random
|
||||
from collections import defaultdict
|
||||
from typing import List, Optional, Dict, Tuple, Type, Union
|
||||
import copy
|
||||
from typing import Dict, List, Optional, Tuple, Type, Union
|
||||
|
||||
import pycountry
|
||||
|
||||
from ..utils.enums.album import AlbumType, AlbumStatus
|
||||
from .collection import Collection
|
||||
from .formatted_text import FormattedText
|
||||
from .lyrics import Lyrics
|
||||
from .contact import Contact
|
||||
from .artwork import Artwork
|
||||
from .metadata import (
|
||||
Mapping as id3Mapping,
|
||||
ID3Timestamp,
|
||||
Metadata
|
||||
)
|
||||
from .option import Options
|
||||
from .parents import OuterProxy, P
|
||||
from .source import Source, SourceCollection
|
||||
from .target import Target
|
||||
from .country import Language, Country
|
||||
from ..utils.config import main_settings
|
||||
from ..utils.enums.album import AlbumStatus, AlbumType
|
||||
from ..utils.enums.colors import BColors
|
||||
from ..utils.shared import DEBUG_PRINT_ID
|
||||
from ..utils.string_processing import unify
|
||||
|
||||
from .artwork import Artwork
|
||||
from .collection import Collection
|
||||
from .contact import Contact
|
||||
from .country import Country, Language
|
||||
from .formatted_text import FormattedText
|
||||
from .lyrics import Lyrics
|
||||
from .metadata import ID3Timestamp
|
||||
from .metadata import Mapping as id3Mapping
|
||||
from .metadata import Metadata
|
||||
from .option import Options
|
||||
from .parents import OuterProxy
|
||||
from .parents import OuterProxy as Base
|
||||
|
||||
from ..utils.config import main_settings
|
||||
from ..utils.enums.colors import BColors
|
||||
from .parents import P
|
||||
from .source import Source, SourceCollection
|
||||
from .target import Target
|
||||
|
||||
"""
|
||||
All Objects dependent
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import inspect
|
||||
from datetime import datetime
|
||||
from itertools import takewhile
|
||||
from pathlib import Path
|
||||
from typing import List, Union
|
||||
|
||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
|
||||
from .config import config, read_config, write_config
|
||||
from .enums.colors import BColors
|
||||
from .path_manager import LOCATIONS
|
||||
from .hacking import merge_args
|
||||
from .path_manager import LOCATIONS
|
||||
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
|
||||
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE)
|
||||
|
||||
"""
|
||||
IO functions
|
||||
@@ -125,4 +127,9 @@ def get_current_millis() -> int:
|
||||
|
||||
|
||||
def get_unix_time() -> int:
|
||||
return int(datetime.now().timestamp())
|
||||
return int(datetime.now().timestamp())
|
||||
|
||||
|
||||
def limit_generator(generator, limit: Optional[int] = None):
|
||||
return takewhile(lambda x: x < limit, generator) if limit is not None else generator
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class BColors(Enum):
|
||||
class BColors:
|
||||
# https://stackoverflow.com/a/287944
|
||||
HEADER = "\033[95m"
|
||||
OKBLUE = "\033[94m"
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import random
|
||||
from dotenv import load_dotenv
|
||||
from pathlib import Path
|
||||
import os
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from .path_manager import LOCATIONS
|
||||
from .config import main_settings
|
||||
from .path_manager import LOCATIONS
|
||||
|
||||
if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
|
||||
load_dotenv(Path(__file__).parent.parent.parent / ".env.example")
|
||||
@@ -51,3 +51,6 @@ have fun :3""".strip()
|
||||
URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+"
|
||||
INT_PATTERN = r"^\d*$"
|
||||
FLOAT_PATTERN = r"^[\d|\,|\.]*$"
|
||||
|
||||
|
||||
ALPHABET = "abcdefghijklmnopqrstuvwxyz"
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
from typing import Tuple, Union, Optional
|
||||
from pathlib import Path
|
||||
import string
|
||||
from functools import lru_cache
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Union
|
||||
from urllib.parse import ParseResult, parse_qs, urlparse
|
||||
|
||||
from transliterate.exceptions import LanguageDetectionError
|
||||
from transliterate import translit
|
||||
from pathvalidate import sanitize_filename
|
||||
from urllib.parse import urlparse, ParseResult, parse_qs
|
||||
|
||||
from transliterate import translit
|
||||
from transliterate.exceptions import LanguageDetectionError
|
||||
|
||||
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
|
||||
"(official video)",
|
||||
@@ -180,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str:
|
||||
r = r.lower().strip()
|
||||
return r
|
||||
|
||||
def hash(self, key: Any) -> int:
|
||||
try:
|
||||
key = int(key)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if isinstance(key, str):
|
||||
return hash(unify(key))
|
||||
|
||||
return hash(key)
|
||||
|
||||
|
||||
def remove_feature_part_from_track(title: str) -> str:
|
||||
if ")" != title[-1]:
|
||||
|
||||
Reference in New Issue
Block a user