12 Commits

Author SHA1 Message Date
e53e50b5d2 draft: proper help message
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-19 11:23:17 +02:00
240bd105f0 draft: genre is optional 2024-06-19 11:05:52 +02:00
ed2eeabd6a draft: new cli
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-19 11:02:58 +02:00
b236291378 feat: implemented downloader 2024-06-19 10:04:41 +02:00
097211b3cd feat: commented the download function
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-12 14:42:08 +02:00
f839cdf906 feat: commented fetch functions
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-12 14:25:37 +02:00
c306da7934 feat: improved and documented the search functions
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-12 14:18:52 +02:00
684c90a7b4 feat: renamed
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-10 13:46:40 +02:00
b30824baf9 fix: removed
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-10 11:56:26 +02:00
130f5edcfe draft: rewrite of interface
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 12:02:12 +02:00
636645e862 feat: implemented genre to external file 2024-06-04 11:00:12 +02:00
df1743c695 feat: implemented better components 2024-06-03 15:04:47 +02:00
23 changed files with 560 additions and 427 deletions

View File

@@ -1,13 +1,13 @@
import logging
import gc
import logging
import sys
from pathlib import Path
from rich.logging import RichHandler
from rich.console import Console
from rich.logging import RichHandler
from .utils.shared import DEBUG, DEBUG_LOGGING
from .utils.config import logging_settings, main_settings, read_config
from .utils.shared import DEBUG, DEBUG_LOGGING
read_config()

3
music_kraken/__meta__.py Normal file
View File

@@ -0,0 +1,3 @@
PROGRAMM: str = "music-kraken"
DESCRIPTION: str = """This program will first get the metadata of various songs from metadata providers like musicbrainz, and then search for download links on pages like bandcamp.
Then it will download the song and edit the metadata accordingly."""

View File

@@ -1,14 +1,15 @@
import mutagen
from mutagen.id3 import ID3, Frame, APIC, USLT
import logging
from pathlib import Path
from typing import List
import logging
import mutagen
from mutagen.id3 import APIC, ID3, USLT, Frame
from PIL import Image
from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata
from ..objects.metadata import Mapping
from ..connection import Connection
from ..objects import Metadata, Song, Target
from ..objects.metadata import Mapping
from ..utils.config import logging_settings, main_settings
LOGGER = logging_settings["tagging_logger"]
@@ -105,7 +106,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
APIC(
encoding=0,
mime="image/jpeg",
type=3,
type=mutagen.id3.PictureType.COVER_FRONT,
desc=u"Cover",
data=converted_target.read_bytes(),
)

View File

@@ -1,5 +0,0 @@
from .informations import print_paths
from .main_downloader import download
from .options.settings import settings
from .options.frontend import set_frontend

View File

@@ -1,47 +0,0 @@
from ..utils import BColors
from ..utils.shared import get_random_message
def cli_function(function):
def wrapper(*args, **kwargs):
silent = kwargs.get("no_cli", False)
if "no_cli" in kwargs:
del kwargs["no_cli"]
if silent:
return function(*args, **kwargs)
return
code = 0
print_cute_message()
print()
try:
code = function(*args, **kwargs)
except KeyboardInterrupt:
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
finally:
print()
print_cute_message()
print("See you soon! :3")
exit()
return wrapper
def print_cute_message():
message = get_random_message()
try:
print(message)
except UnicodeEncodeError:
message = str(c for c in message if 0 < ord(c) < 127)
print(message)
AGREE_INPUTS = {"y", "yes", "ok"}
def ask_for_bool(msg: str) -> bool:
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
return i in AGREE_INPUTS

View File

@@ -0,0 +1,82 @@
import argparse
from functools import cached_property
from ..__meta__ import DESCRIPTION, PROGRAMM
from ..download import Downloader
from ..utils import BColors
from ..utils.string_processing import unify
from .utils import HELP_MESSAGE, ask_for_bool, ask_for_create
class DevelopmentCli:
def __init__(self, args: argparse.Namespace):
self.args = args
if args.genre:
self.genre = args.genre
self.downloader: Downloader = Downloader()
@cached_property
def genre(self) -> str:
"""This is a cached property, which means if it isn't set in the constructor or before it is accessed,
the program will be thrown in a shell
Returns:
str: the genre that should be used
"""
option_string = f"{BColors.HEADER}Genres{BColors.ENDC}"
genre_map = {}
_string_list = []
for i, genre in enumerate(self.downloader.get_existing_genres()):
option_string += f"\n{BColors.BOLD}{i}{BColors.ENDC}: {genre}"
genre_map[str(i)] = genre
genre_map[unify(genre)] = genre
genre = None
while genre is None:
print(option_string)
print()
i = input("> ")
u = unify(i)
if u in genre_map:
genre = genre_map[u]
break
if ask_for_create("genre", i):
genre = i
break
return genre
def help_screen(self) -> None:
print(HELP_MESSAGE)
def shell(self) -> None:
print(f"Welcome to the {PROGRAMM} shell!")
print(f"Type '{BColors.OKBLUE}help{BColors.ENDC}' for a list of commands.")
print("")
while True:
i = input("> ")
if i == "help":
self.help_screen()
elif i == "genre":
self.genre
elif i == "exit":
break
else:
print("Unknown command. Type 'help' for a list of commands.")
def main():
parser = argparse.ArgumentParser(
prog=PROGRAMM,
description=DESCRIPTION,
epilog='This is just a development cli. The real frontend is coming soon.'
)
parser.add_argument('--genre', '-g', action='store_const', required=False, help="choose a genre to download from")
args = parser.parse_args()

View File

@@ -18,6 +18,7 @@ from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .genre import get_genre
from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function
@@ -27,30 +28,7 @@ PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21
class GenreIO(components.HumanIO):
@staticmethod
def ask_to_create(option: components.Option) -> bool:
output()
return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}")
@staticmethod
def not_found(key: str) -> None:
output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL)
def get_genre():
select_genre = components.GenreSelect()
select_genre.human_io = GenreIO
genre: Optional[components.Option] = None
while genre is None:
print(select_genre.pprint())
print()
genre = select_genre.choose(input("> "))
return genre.value
def help_message():

View File

@@ -0,0 +1,85 @@
from ..utils import BColors
from ..utils.shared import get_random_message
def cli_function(function):
def wrapper(*args, **kwargs):
silent = kwargs.get("no_cli", False)
if "no_cli" in kwargs:
del kwargs["no_cli"]
if silent:
return function(*args, **kwargs)
return
code = 0
print_cute_message()
print()
try:
code = function(*args, **kwargs)
except KeyboardInterrupt:
print("\n\nRaise an issue if I fucked up:\nhttps://github.com/HeIIow2/music-downloader/issues")
finally:
print()
print_cute_message()
print("See you soon! :3")
exit()
return wrapper
def print_cute_message():
message = get_random_message()
try:
print(message)
except UnicodeEncodeError:
message = str(c for c in message if 0 < ord(c) < 127)
print(message)
def highlight_placeholder(text: str) -> str:
return text.replace("<", f"{BColors.BOLD}<").replace(">", f">{BColors.ENDC}")
HELP_MESSAGE = highlight_placeholder(f"""{BColors.HEADER}To search:{BColors.ENDC}
> s: <query/url>
> s: https://musify.club/release/some-random-release-183028492
> s: #a <artist> #r <release> #t <track>
If you found the same object twice from different sources you can merge those objects.
Then it will use those sources. To do so, use the {BColors.BOLD}m{BColors.ENDC} command.
{BColors.HEADER}To download:{BColors.ENDC}
> d: <id/url>
> dm: 0, 3, 4 # merge all objects into one and download this object
> d: 1
> d: https://musify.club/release/some-random-release-183028492
{BColors.HEADER}To inspect an object:{BColors.ENDC}
If you inspect an object, you see its adjacent object. This means for example the releases of an artist, or the tracks of a release.
You can also merge objects with the {BColors.BOLD}m{BColors.ENDC} command here.
> g: <id/url>
> gm: 0, 3, 4 # merge all objects into one and inspect this object
> g: 1
> g: https://musify.club/release/some-random-release-183028492""")
class COMMANDS:
AGREE = {"y", "yes", "ok"}
DISAGREE = {"n", "no"}
EXIT = {"exit"}
HELP = {"help", "h"}
def ask_for_bool(msg: str) -> bool:
i = input(f"{msg} ({BColors.OKGREEN.value}Y{BColors.ENDC.value}/{BColors.FAIL.value}N{BColors.ENDC.value})? ").lower()
return i in COMMANDS.AGREE
def ask_for_create(name: str, value: str) -> bool:
return ask_for_bool(f"Do you want to create the {name} {BColors.OKBLUE}{value}{BColors.ENDC}?")

View File

@@ -19,7 +19,7 @@ from ..connection import Connection
from ..objects import Album, Artist, Collection
from ..objects import DatabaseObject as DataObject
from ..objects import Label, Options, Song, Source, Target
from ..utils import BColors, output, trace
from ..utils import BColors, limit_generator, output, trace
from ..utils.config import main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumType
@@ -74,6 +74,8 @@ class Downloader:
if auto_register_pages:
self.scan_for_pages(**kwargs)
# manage which pages to use
def register_page(self, page_type: Type[Page], **kwargs):
if page_type in self._registered_pages:
return
@@ -122,22 +124,76 @@ class Downloader:
for page_type in page_types:
yield from self._registered_pages[page_type]
# fetching/downloading data
def search(self, query: Query) -> Generator[DataObject, None, None]:
"""Yields all data objects that were found by the query.
Other than `Downloader.search_yield_pages`, this function just yields all data objects.
This looses the data, where th objects were searched originally, so this might not be the best choice.
Args:
query (Query): The query to search for.
Yields:
Generator[DataObject, None, None]: A generator that yields all found data objects.
"""
for page in self.get_pages():
yield from page.search(query=query)
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
def search_yield_pages(self, query: Query, results_per_page: Optional[int] = None) -> Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]:
"""Yields all data objects that were found by the query, grouped by the page they were found on.
every yield is a tuple of the page and a generator that yields the data objects.
So this could be how it is used:
```python
for page, data_objects in downloader.search_yield_pages(query):
print(f"Found on {page}:")
for data_object in data_objects:
print(data_object)
```
Args:
query (Query): The query to search for.
results_per_page (Optional[int], optional): If this is set, the generators only yield this amount of data objects per page.
Yields:
Generator[Tuple[Page, Generator[DataObject, None, None]], None, None]: yields the page and a generator that yields the data objects.
"""
for page in self.get_pages():
yield page, limit_generator(page.search(query=query), limit=results_per_page)
def fetch_details(self, data_object: DataObject, **kwargs) -> DataObject:
"""Fetches more details for the given data object.
This uses every source contained in data_object.source_collection that has a page.
Args:
data_object (DataObject): The data object to fetch details for.
Returns:
DataObject: The same data object, but with more details.
"""
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
new_data_object = self.fetch_from_source(source=source, **kwargs)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
"""Gets a data object from the given source.
Args:
source (Source): The source to get the data object from.
Returns:
Optional[DataObject]: If a data object can be retrieved, it is returned. Otherwise, None is returned.
"""
if not source.has_page:
return None
@@ -154,6 +210,15 @@ class Downloader:
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
"""This function tries to detect the source of the given url and fetches the data object from it.
Args:
url (str): The url to fetch the data object from.
Returns:
Optional[DataObject]: The fetched data object, or None if no source could be detected or if no object could be retrieved.
"""
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
@@ -161,6 +226,14 @@ class Downloader:
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
"""Determine if the given data object should be downloaded or not.
Args:
data_object (DataObject): The data object in question.
Returns:
bool: Returns True if the given data object should be skipped.
"""
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
@@ -168,6 +241,18 @@ class Downloader:
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
"""Downloads the given data object.
It will recursively fetch all available details for this and every related object.
Then it will create the folder structure and download the audio file.
In the end the metadata will be written to the file.
Args:
data_object (DataObject): The data object to download. If it is a artist, it will download the whole discography, if it is an Album it will download the whole Tracklist.
Returns:
DownloadResult: Relevant information about the download success.
"""
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
@@ -330,6 +415,31 @@ class Downloader:
return source.page.fetch_object_from_source(source=source, **kwargs)
# misc function
def get_existing_genres(self) -> Generator[str, None, None]:
"""Yields every existing genre, for the user to select from.
Yields:
Generator[str, None, None]: a generator that yields every existing genre.
"""
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
class Page:
REGISTER = True

View File

@@ -1,303 +0,0 @@
from __future__ import annotations
import re
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional
from ..objects import OuterProxy as DataObject
from ..utils import BColors
from ..utils.config import main_settings
from ..utils.enums import SourceType
from ..utils.exception import MKComposeException
from ..utils.shared import ALPHABET
from ..utils.string_processing import unify
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option:
"""
This could represent a data object, a string or a page.
"""
def __init__(
self,
value: Any,
text: Optional[str] = None,
keys: List[Any] = None,
hidden: bool = False,
parse_key: Callable[[Any], Any] = lambda x: x,
index: int = None,
):
self._parse_key: Callable[[Any], Any] = parse_key
self._index = index
self.value = value
self._text = text or str(value)
self.hidden = hidden
self._raw_keys = set(keys or [])
self._raw_keys.add(self.text)
try:
self._raw_keys.add(self.value)
except TypeError:
pass
self._raw_keys.add(str(self.value))
self._raw_keys.add(self._index)
self.keys = set(self.parse_key(key) for key in self._raw_keys)
@property
def text(self) -> str:
return self._text.replace("{index}", str(self.index))
@text.setter
def text(self, value: str):
self._text = value
@property
def index(self) -> int:
return self._index
@index.setter
def index(self, value: int):
p = self._parse_key(self._index)
if p in self.keys:
self.keys.remove(p)
self._index = value
self.keys.add(p)
def register_key(self, key: Any):
self._raw_keys.add(key)
self.keys.add(self._parse_key(key))
@property
def parse_key(self) -> Callable[[Any], Any]:
return self._parse_key
@parse_key.setter
def parse_key(self, value: Callable[[Any], Any]):
self._parse_key = value
self.keys = set(self._parse_key(key) for key in self._raw_keys)
def __str__(self):
return self.text
class Select:
def __init__(
self,
options: Generator[Option, None, None] = None,
option_factory: Callable[[Any], Option] = None,
raw_options: List[Any] = None,
parse_option_key: Callable[[Any], Any] = lambda x: x,
human_io: HumanIO = HumanIO,
sort: bool = False,
**kwargs
):
self._parse_option_key: Callable[[Any], Any] = parse_option_key
self.human_io: HumanIO = human_io
self._key_to_option: Dict[Any, Option] = dict()
self._options: List[Option] = []
options = options or []
self.option_factory: Optional[Callable[[Any], Option]] = option_factory
if self.can_create_options:
_raw_options = raw_options or []
if sort:
_raw_options = sorted(_raw_options)
for raw_option in _raw_options:
self.append(self.option_factory(raw_option))
elif raw_options is not None:
raise MKComposeException("Cannot create options without a factory.")
self.extend(options)
@property
def can_create_options(self) -> bool:
return self.option_factory is not None
def append(self, option: Option):
option.parse_key = self._parse_option_key
self._options.append(option)
for key in option.keys:
self._key_to_option[key] = option
def _remap(self):
self._key_to_option = dict()
for option in self._options:
for key in option.keys:
self._key_to_option[key] = option
def extend(self, options: List[Option]):
for option in options:
self.append(option)
def __iter__(self) -> Generator[Option, None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __contains__(self, key: Any) -> bool:
return self._parse_option_key(key) in self._key_to_option
def __getitem__(self, key: Any) -> Option:
r = self._key_to_option[self._parse_option_key(key)]
if callable(r):
r = r()
if callable(r.value):
r.value = r.value()
return r
def create_option(self, key: Any, **kwargs) -> Option:
if not self.can_create_options:
raise MKComposeException("Cannot create options without a factory.")
option = self.option_factory(key, **kwargs)
self.append(option)
return option
def choose(self, key: Any) -> Optional[Option]:
if key not in self:
if self.can_create_options:
c = self.create_option(key)
if self.human_io.ask_to_create(c):
return c
self.human_io.not_found(key)
return None
return self[key]
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class StringSelect(Select):
def __init__(self, **kwargs):
self._current_index = 0
kwargs["option_factory"] = self.next_option
kwargs["parse_option_key"] = lambda x: unify(str(x))
super().__init__(**kwargs)
def next_option(self, value: Any) -> Optional[Option]:
o = Option(value=value, keys=[self._current_index], text=f"{BColors.BOLD.value}{self._current_index: >2}{BColors.ENDC.value}: {value}")
self._current_index += 1
return o
class GenreSelect(StringSelect):
@staticmethod
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
def __init__(self):
super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir())))
class SourceTypeToOption(dict):
def __init__(self, callback):
super().__init__()
self.callback = callback
def __missing__(self, key):
self[key] = self.callback(key)
return self[key]
class DataObjectSelect(Select):
def __init__(self, data_objects: Generator[DataObject]):
self._source_type_to_data_objects: Dict[SourceType, List[Option]] = defaultdict(list)
self._source_type_to_option: Dict[SourceType, Option] = SourceTypeToOption(self.option_from_source_type)
self._data_object_index: int = 0
self._source_type_index: int = 0
super().__init__(
parse_option_key=lambda x: unify(str(x)),
)
self.extend(data_objects)
def option_from_data_object(self, data_object: DataObject) -> Option:
index = self._data_object_index
self._data_object_index += 1
return Option(
value=data_object,
keys=[index, data_object.option_string, data_object.title_string],
text=f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {data_object.option_string}",
index=index,
)
def option_from_source_type(self, source_type: SourceType) -> Option:
index = ALPHABET[self._source_type_index % len(ALPHABET)]
self._source_type_index += 1
o = Option(
value=lambda: DataObjectSelect(self._source_type_to_data_objects[source_type]),
keys=[index, source_type],
text=f"{BColors.HEADER.value}({index}) --------------------------------{source_type.name:{'-'}<{21}}--------------------{BColors.ENDC.value}",
)
super().append(o)
return o
def append(self, option: Union[Option, DataObject]):
if isinstance(option, DataObject):
data_object = option
option = self.option_from_data_object(data_object)
else:
data_object = option.value
for source_type in data_object.source_collection.source_types(only_with_page=True):
self._source_type_to_data_objects[source_type].append(option)
super().append(option)
def __iter__(self):
source_types = list(sorted(self._source_type_to_data_objects.keys(), key=lambda x: x.name))
single_source = len(source_types) > 1
j = 0
for st in source_types:
if single_source:
yield self._source_type_to_option[st]
limit = min(15, len(self._source_type_to_data_objects[st])) if single_source else len(self._source_type_to_data_objects[st])
for i in range(limit):
o = self._source_type_to_data_objects[st][i]
o.index = j
yield o
j += 1
self._remap()

View File

@@ -0,0 +1,214 @@
from __future__ import annotations
import copy
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List,
Optional, Tuple, TypeVar, Union)
from ...objects import OuterProxy as DataObject
from ...utils import BColors
from ...utils.config import main_settings
from ...utils.enums import SourceType
from ...utils.exception import MKComposeException
from ...utils.shared import ALPHABET
from ...utils.string_processing import unify
P = TypeVar('P')
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option(Generic[P]):
"""
This could represent a data object, a string or a page.
"""
TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}"
ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value")
ATTRIBUTES_KEY: Tuple[str, ...] = ("index", )
def __init__(
self,
value: P,
hidden: bool = False,
additional_keys: List[Hashable] = None,
**kwargs
):
self.value = value
self.hidden = hidden
self._additional_keys = set(self._to_hash(key) for key in additional_keys or [])
for key, value in kwargs.items():
setattr(self, key, value)
super(Option, self).__init__()
def _to_option_string(self, value: Any) -> str:
if hasattr(value, "option_string"):
return value.option_string
return str(value)
@property
def text(self) -> str:
text = self.TEXT_TEMPLATE
for attribute_key in self.ATTRIBUTES_FORMATTING:
text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key)))
return text
def _to_hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def keys(self) -> set:
keys = self._additional_keys.copy()
for key in self.ATTRIBUTES_KEY:
keys.add(self._to_hash(getattr(self, key)))
def __contains__(self, key: Any) -> bool:
return self._to_hash(key) in self.keys
def __str__(self):
return self.text
def __iter__(self) -> Generator[Option[P], None, None]:
yield self
class Select(Generic[P]):
OPTION: Type[Option[P]] = Option
HUMAN_IO: Type[HumanIO] = HumanIO
CAN_CREATE_OPTIONS: bool = False
def __init__(
self,
data: Generator[P, None, None],
**kwargs
):
self.option: Type[Option[P]] = kwargs.get("option", self.OPTION)
self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO)
self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS)
self._options: List[Option[P]] = []
self.extend(data)
super(Select, self).__init__(**kwargs)
def append(self, value: P) -> Option[P]:
option = self.option(value)
self._options.append(option)
return option
def extend(self, values: Generator[P, None, None]):
for value in values:
self.append(value)
@property
def _options_to_show(self) -> Generator[Option[P], None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __iter__(self) -> Generator[Option, None, None]:
_index = 0
for i, o in enumerate(self._options_to_show):
for option in o:
option.index = _index
yield option
_index += 1
def __contains__(self, key: Any) -> bool:
for option in self._options:
if key in option:
return True
return False
def __getitem__(self, key: Any) -> Option[P]:
for option in self._options:
if key in option:
return option
raise KeyError(key)
def choose(self, key: Any) -> Optional[Option[P]]:
try:
return self[key]
except KeyError:
if self.can_create_options:
return self.append(key)
self.human_io.not_found(key)
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class Node(Generator[P]):
def __init__(
self,
value: Optional[P] = None,
children: List[Node[P]] = None,
parent: Node[P] = None,
**kwargs
):
self.value = value
self.depth = 0
self.index: int = 0
self.children: List[Node[P]] = kwargs.get("children", [])
self.parent: Optional[Node[P]] = kwargs.get("parent", None)
super(Node, self).__init__(**kwargs)
@property
def is_root(self) -> bool:
return self.parent is None
@property
def is_leaf(self) -> bool:
return not self.children
def __iter__(self, **kwargs) -> Generator[Node[P], None, None]:
_level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0))
self.index = _level_index_map[self.depth]
yield self
_level_index_map[self.depth] += 1
for child in self.children:
child.depth = self.depth + 1
for node in child.__iter__(level_index_map=_level_index_map):
yield node
def __getitem__(self, key: Any) -> Option[P]:
pass
def __contains__(self, key: Any) -> bool:
if key in self.option:
return True

View File

@@ -0,0 +1 @@
from . import Option, Select

View File

@@ -1,35 +1,32 @@
from __future__ import annotations
import copy
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union
import copy
from typing import Dict, List, Optional, Tuple, Type, Union
import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus
from .collection import Collection
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .contact import Contact
from .artwork import Artwork
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from .option import Options
from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.config import main_settings
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.enums.colors import BColors
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .artwork import Artwork
from .collection import Collection
from .contact import Contact
from .country import Country, Language
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .parents import OuterProxy as Base
from ..utils.config import main_settings
from ..utils.enums.colors import BColors
from .parents import P
from .source import Source, SourceCollection
from .target import Target
"""
All Objects dependent

View File

@@ -1,15 +1,17 @@
from datetime import datetime
from pathlib import Path
import inspect
import json
import logging
import inspect
from datetime import datetime
from itertools import takewhile
from pathlib import Path
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args
from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE)
"""
IO functions
@@ -125,4 +127,9 @@ def get_current_millis() -> int:
def get_unix_time() -> int:
return int(datetime.now().timestamp())
return int(datetime.now().timestamp())
def limit_generator(generator, limit: Optional[int] = None):
return takewhile(lambda x: x < limit, generator) if limit is not None else generator

View File

@@ -1,7 +1,7 @@
from enum import Enum
class BColors(Enum):
class BColors:
# https://stackoverflow.com/a/287944
HEADER = "\033[95m"
OKBLUE = "\033[94m"

View File

@@ -1,13 +1,12 @@
from typing import Tuple, Union, Optional
from pathlib import Path
import string
from functools import lru_cache
from pathlib import Path
from typing import Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs
from transliterate import translit
from transliterate.exceptions import LanguageDetectionError
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)",
@@ -180,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str:
r = r.lower().strip()
return r
def hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
def remove_feature_part_from_track(title: str) -> str:
if ")" != title[-1]: