from __future__ import annotations import re from collections import defaultdict from pathlib import Path from typing import Any, Callable, Dict, Generator, List, Optional from ..objects import OuterProxy as DataObject from ..utils import BColors from ..utils.config import main_settings from ..utils.enums import SourceType from ..utils.exception import MKComposeException from ..utils.shared import ALPHABET from ..utils.string_processing import unify class HumanIO: @staticmethod def ask_to_create(option: Option) -> bool: return True @staticmethod def not_found(key: Any) -> None: return None class Option: """ This could represent a data object, a string or a page. """ def __init__( self, value: Any, text: Optional[str] = None, keys: List[Any] = None, hidden: bool = False, parse_key: Callable[[Any], Any] = lambda x: x, index: int = None, ): self._parse_key: Callable[[Any], Any] = parse_key self._index = index self.value = value self._text = text or str(value) self.hidden = hidden self._raw_keys = set(keys or []) self._raw_keys.add(self.text) try: self._raw_keys.add(self.value) except TypeError: pass self._raw_keys.add(str(self.value)) self._raw_keys.add(self._index) self.keys = set(self.parse_key(key) for key in self._raw_keys) @property def text(self) -> str: return self._text.replace("{index}", str(self.index)) @text.setter def text(self, value: str): self._text = value @property def index(self) -> int: return self._index @index.setter def index(self, value: int): p = self._parse_key(self._index) if p in self.keys: self.keys.remove(p) self._index = value self.keys.add(p) def register_key(self, key: Any): self._raw_keys.add(key) self.keys.add(self._parse_key(key)) @property def parse_key(self) -> Callable[[Any], Any]: return self._parse_key @parse_key.setter def parse_key(self, value: Callable[[Any], Any]): self._parse_key = value self.keys = set(self._parse_key(key) for key in self._raw_keys) def __str__(self): return self.text class Select: def __init__( self, options: Generator[Option, None, None] = None, option_factory: Callable[[Any], Option] = None, raw_options: List[Any] = None, parse_option_key: Callable[[Any], Any] = lambda x: x, human_io: HumanIO = HumanIO, sort: bool = False, **kwargs ): self._parse_option_key: Callable[[Any], Any] = parse_option_key self.human_io: HumanIO = human_io self._key_to_option: Dict[Any, Option] = dict() self._options: List[Option] = [] options = options or [] self.option_factory: Optional[Callable[[Any], Option]] = option_factory if self.can_create_options: _raw_options = raw_options or [] if sort: _raw_options = sorted(_raw_options) for raw_option in _raw_options: self.append(self.option_factory(raw_option)) elif raw_options is not None: raise MKComposeException("Cannot create options without a factory.") self.extend(options) @property def can_create_options(self) -> bool: return self.option_factory is not None def append(self, option: Option): option.parse_key = self._parse_option_key self._options.append(option) for key in option.keys: self._key_to_option[key] = option def _remap(self): self._key_to_option = dict() for option in self._options: for key in option.keys: self._key_to_option[key] = option def extend(self, options: List[Option]): for option in options: self.append(option) def __iter__(self) -> Generator[Option, None, None]: for option in self._options: if option.hidden: continue yield option def __contains__(self, key: Any) -> bool: return self._parse_option_key(key) in self._key_to_option def __getitem__(self, key: Any) -> Option: r = self._key_to_option[self._parse_option_key(key)] if callable(r): r = r() if callable(r.value): r.value = r.value() return r def create_option(self, key: Any, **kwargs) -> Option: if not self.can_create_options: raise MKComposeException("Cannot create options without a factory.") option = self.option_factory(key, **kwargs) self.append(option) return option def choose(self, key: Any) -> Optional[Option]: if key not in self: if self.can_create_options: c = self.create_option(key) if self.human_io.ask_to_create(c): return c self.human_io.not_found(key) return None return self[key] def pprint(self) -> str: return "\n".join(str(option) for option in self) class StringSelect(Select): def __init__(self, **kwargs): self._current_index = 0 kwargs["option_factory"] = self.next_option kwargs["parse_option_key"] = lambda x: unify(str(x)) super().__init__(**kwargs) def next_option(self, value: Any) -> Optional[Option]: o = Option(value=value, keys=[self._current_index], text=f"{BColors.BOLD.value}{self._current_index: >2}{BColors.ENDC.value}: {value}") self._current_index += 1 return o class GenreSelect(StringSelect): @staticmethod def is_valid_genre(genre: Path) -> bool: """ gets the name of all subdirectories of shared.MUSIC_DIR, but filters out all directories, where the name matches with any Patern from shared.NOT_A_GENRE_REGEX. """ if not genre.is_dir(): return False if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]): return False return True def __init__(self): super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir()))) class SourceTypeToOption(dict): def __init__(self, callback): super().__init__() self.callback = callback def __missing__(self, key): self[key] = self.callback(key) return self[key] class DataObjectSelect(Select): def __init__(self, data_objects: Generator[DataObject]): self._source_type_to_data_objects: Dict[SourceType, List[Option]] = defaultdict(list) self._source_type_to_option: Dict[SourceType, Option] = SourceTypeToOption(self.option_from_source_type) self._data_object_index: int = 0 self._source_type_index: int = 0 super().__init__( parse_option_key=lambda x: unify(str(x)), ) self.extend(data_objects) def option_from_data_object(self, data_object: DataObject) -> Option: index = self._data_object_index self._data_object_index += 1 return Option( value=data_object, keys=[index, data_object.option_string, data_object.title_string], text=f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {data_object.option_string}", index=index, ) def option_from_source_type(self, source_type: SourceType) -> Option: index = ALPHABET[self._source_type_index % len(ALPHABET)] self._source_type_index += 1 o = Option( value=lambda: DataObjectSelect(self._source_type_to_data_objects[source_type]), keys=[index, source_type], text=f"{BColors.HEADER.value}({index}) --------------------------------{source_type.name:{'-'}<{21}}--------------------{BColors.ENDC.value}", ) super().append(o) return o def append(self, option: Union[Option, DataObject]): if isinstance(option, DataObject): data_object = option option = self.option_from_data_object(data_object) else: data_object = option.value for source_type in data_object.source_collection.source_types(only_with_page=True): self._source_type_to_data_objects[source_type].append(option) super().append(option) def __iter__(self): source_types = list(sorted(self._source_type_to_data_objects.keys(), key=lambda x: x.name)) single_source = len(source_types) > 1 j = 0 for st in source_types: if single_source: yield self._source_type_to_option[st] limit = min(15, len(self._source_type_to_data_objects[st])) if single_source else len(self._source_type_to_data_objects[st]) for i in range(limit): o = self._source_type_to_data_objects[st][i] o.index = j yield o j += 1 self._remap()