music-kraken-core/music_kraken/download/components.py

238 lines
7.5 KiB
Python

from __future__ import annotations
import re
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional
from ..objects import OuterProxy as DataObject
from ..utils import BColors
from ..utils.config import main_settings
from ..utils.enums import SourceType
from ..utils.exception import MKComposeException
from ..utils.shared import ALPHABET
from ..utils.string_processing import unify
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option:
"""
This could represent a data object, a string or a page.
"""
def __init__(
self,
value: Any,
text: Optional[str] = None,
keys: List[Any] = None,
hidden: bool = False,
parse_key: Callable[[Any], Any] = lambda x: x,
):
self._parse_key: Callable[[Any], Any] = parse_key
self.value = value
self.text = text or str(value)
self.hidden = hidden
self._raw_keys = set(keys or [])
self._raw_keys.add(self.text)
try:
self._raw_keys.add(self.value)
except TypeError:
pass
self._raw_keys.add(str(self.value))
self.keys = set(self.parse_key(key) for key in self._raw_keys)
def register_key(self, key: Any):
self._raw_keys.add(key)
self.keys.add(self._parse_key(key))
@property
def parse_key(self) -> Callable[[Any], Any]:
return self._parse_key
@parse_key.setter
def parse_key(self, value: Callable[[Any], Any]):
self._parse_key = value
self.keys = set(self._parse_key(key) for key in self._raw_keys)
def __str__(self):
return self.text
class Select:
def __init__(
self,
options: Generator[Option, None, None] = None,
option_factory: Callable[[Any], Option] = None,
raw_options: List[Any] = None,
parse_option_key: Callable[[Any], Any] = lambda x: x,
human_io: HumanIO = HumanIO,
sort: bool = False,
**kwargs
):
self._parse_option_key: Callable[[Any], Any] = parse_option_key
self.human_io: HumanIO = human_io
self._key_to_option: Dict[Any, Option] = dict()
self._options: List[Option] = []
options = options or []
self.option_factory: Optional[Callable[[Any], Option]] = option_factory
if self.can_create_options:
_raw_options = raw_options or []
if sort:
_raw_options = sorted(_raw_options)
for raw_option in _raw_options:
self.append(self.option_factory(raw_option))
elif raw_options is not None:
raise MKComposeException("Cannot create options without a factory.")
self.extend(options)
@property
def can_create_options(self) -> bool:
return self.option_factory is not None
def append(self, option: Option):
option.parse_key = self._parse_option_key
self._options.append(option)
for key in option.keys:
self._key_to_option[key] = option
def extend(self, options: List[Option]):
for option in options:
self.append(option)
def __iter__(self) -> Generator[Option, None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __contains__(self, key: Any) -> bool:
return self._parse_option_key(key) in self._key_to_option
def __getitem__(self, key: Any) -> Option:
return self._key_to_option[self._parse_option_key(key)]
def create_option(self, key: Any, **kwargs) -> Option:
if not self.can_create_options:
raise MKComposeException("Cannot create options without a factory.")
option = self.option_factory(key, **kwargs)
self.append(option)
return option
def choose(self, key: Any) -> Optional[Option]:
if key not in self:
if self.can_create_options:
c = self.create_option(key)
if self.human_io.ask_to_create(c):
return c
self.human_io.not_found(key)
return None
return self[key]
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class StringSelect(Select):
def __init__(self, **kwargs):
self._current_index = 0
kwargs["option_factory"] = self.next_option
kwargs["parse_option_key"] = lambda x: unify(str(x))
super().__init__(**kwargs)
def next_option(self, value: Any) -> Optional[Option]:
o = Option(value=value, keys=[self._current_index], text=f"{BColors.BOLD.value}{self._current_index: >2}{BColors.ENDC.value}: {value}")
self._current_index += 1
return o
class GenreSelect(StringSelect):
@staticmethod
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
def __init__(self):
super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir())))
class DataObjectSelect(Select):
def __init__(self, data_objects: Generator[DataObject]):
self._source_type_to_data_objects: Dict[SourceType, List[Option]] = defaultdict(list)
self._data_object_index: int = 0
self._source_type_index: int = 0
super().__init__()
self.extend(data_objects)
def option_from_data_object(self, data_object: DataObject) -> Option:
index = self._data_object_index
self._data_object_index += 1
return Option(
value=data_object,
keys=[index, data_object.option_string, data_object.title_string],
text=f"{BColors.BOLD.value}{index: >2}{BColors.ENDC.value}: {data_object.option_string}",
)
def option_from_source_type(self, source_type: SourceType) -> Option:
index = ALPHABET[self._source_type_index % len(ALPHABET)]
self._source_type_index += 1
return Option(
value=self._source_type_to_data_objects[source_type],
keys=[index, source_type],
text=f"{BColors.HEADER.value}({index}) --------------------------------{source_type.name:{'-'}<{21}}--------------------{BColors.ENDC.value}",
)
def append(self, option: Union[Option, DataObject]):
if isinstance(option, DataObject):
data_object = option
option = self.option_from_data_object(data_object)
else:
data_object = option.value
for source_type in data_object.source_collection.source_types(only_with_page=True):
if source_type not in self._source_type_to_data_objects:
st_option = self.option_from_source_type(source_type)
self._source_type_to_data_objects[source_type].append(st_option)
super().append(st_option)
self._source_type_to_data_objects[source_type].append(option)
super().append(option)
def __iter__(self):
for options in self._source_type_to_data_objects.values():
yield from options