3 Commits

Author SHA1 Message Date
130f5edcfe draft: rewrite of interface
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 12:02:12 +02:00
636645e862 feat: implemented genre to external file 2024-06-04 11:00:12 +02:00
df1743c695 feat: implemented better components 2024-06-03 15:04:47 +02:00
10 changed files with 333 additions and 370 deletions

View File

@@ -1,13 +1,13 @@
import logging
import gc import gc
import logging
import sys import sys
from pathlib import Path from pathlib import Path
from rich.logging import RichHandler
from rich.console import Console from rich.console import Console
from rich.logging import RichHandler
from .utils.shared import DEBUG, DEBUG_LOGGING
from .utils.config import logging_settings, main_settings, read_config from .utils.config import logging_settings, main_settings, read_config
from .utils.shared import DEBUG, DEBUG_LOGGING
read_config() read_config()

View File

@@ -1,14 +1,15 @@
import mutagen import logging
from mutagen.id3 import ID3, Frame, APIC, USLT
from pathlib import Path from pathlib import Path
from typing import List from typing import List
import logging
import mutagen
from mutagen.id3 import APIC, ID3, USLT, Frame
from PIL import Image from PIL import Image
from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata
from ..objects.metadata import Mapping
from ..connection import Connection from ..connection import Connection
from ..objects import Metadata, Song, Target
from ..objects.metadata import Mapping
from ..utils.config import logging_settings, main_settings
LOGGER = logging_settings["tagging_logger"] LOGGER = logging_settings["tagging_logger"]
@@ -105,7 +106,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
APIC( APIC(
encoding=0, encoding=0,
mime="image/jpeg", mime="image/jpeg",
type=3, type=mutagen.id3.PictureType.COVER_FRONT,
desc=u"Cover", desc=u"Cover",
data=converted_target.read_bytes(), data=converted_target.read_bytes(),
) )

53
music_kraken/cli/genre.py Normal file
View File

@@ -0,0 +1,53 @@
import re
from pathlib import Path
from typing import Dict, Generator, List, Set, Type, Union
from ..download import Downloader, Page, components
from ..utils.config import main_settings
from .utils import ask_for_bool, cli_function
class GenreIO(components.HumanIO):
@staticmethod
def ask_to_create(option: components.Option) -> bool:
output()
return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}")
@staticmethod
def not_found(key: str) -> None:
output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL)
def _genre_generator() -> Generator[str, None, None]:
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
for genre in filter(is_valid_genre, main_settings["music_directory"].iterdir()):
yield genre.name
def get_genre() -> str:
select_genre = components.Select(
human_io=GenreIO,
can_create_options=True,
data=_genre_generator(),
)
genre: Optional[components.Option[str]] = None
while genre is None:
print(select_genre.pprint())
print()
genre = select_genre.choose(input("> "))
return genre.value

View File

@@ -18,6 +18,7 @@ from ..utils.shared import HELP_MESSAGE, URL_PATTERN
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from .genre import get_genre
from .options.first_config import initial_config from .options.first_config import initial_config
from .utils import ask_for_bool, cli_function from .utils import ask_for_bool, cli_function
@@ -27,30 +28,7 @@ PAGE_NAME_FILL = "-"
MAX_PAGE_LEN = 21 MAX_PAGE_LEN = 21
class GenreIO(components.HumanIO):
@staticmethod
def ask_to_create(option: components.Option) -> bool:
output()
return ask_for_bool(f"create the genre {BColors.OKBLUE.value}{option.value}{BColors.ENDC.value}")
@staticmethod
def not_found(key: str) -> None:
output(f"\ngenre {BColors.BOLD.value}{key}{BColors.ENDC.value} not found\n", color=BColors.FAIL)
def get_genre():
select_genre = components.GenreSelect()
select_genre.human_io = GenreIO
genre: Optional[components.Option] = None
while genre is None:
print(select_genre.pprint())
print()
genre = select_genre.choose(input("> "))
return genre.value
def help_message(): def help_message():

View File

@@ -1,303 +0,0 @@
from __future__ import annotations
import re
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional
from ..objects import OuterProxy as DataObject
from ..utils import BColors
from ..utils.config import main_settings
from ..utils.enums import SourceType
from ..utils.exception import MKComposeException
from ..utils.shared import ALPHABET
from ..utils.string_processing import unify
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option:
"""
This could represent a data object, a string or a page.
"""
def __init__(
self,
value: Any,
text: Optional[str] = None,
keys: List[Any] = None,
hidden: bool = False,
parse_key: Callable[[Any], Any] = lambda x: x,
index: int = None,
):
self._parse_key: Callable[[Any], Any] = parse_key
self._index = index
self.value = value
self._text = text or str(value)
self.hidden = hidden
self._raw_keys = set(keys or [])
self._raw_keys.add(self.text)
try:
self._raw_keys.add(self.value)
except TypeError:
pass
self._raw_keys.add(str(self.value))
self._raw_keys.add(self._index)
self.keys = set(self.parse_key(key) for key in self._raw_keys)
@property
def text(self) -> str:
return self._text.replace("{index}", str(self.index))
@text.setter
def text(self, value: str):
self._text = value
@property
def index(self) -> int:
return self._index
@index.setter
def index(self, value: int):
p = self._parse_key(self._index)
if p in self.keys:
self.keys.remove(p)
self._index = value
self.keys.add(p)
def register_key(self, key: Any):
self._raw_keys.add(key)
self.keys.add(self._parse_key(key))
@property
def parse_key(self) -> Callable[[Any], Any]:
return self._parse_key
@parse_key.setter
def parse_key(self, value: Callable[[Any], Any]):
self._parse_key = value
self.keys = set(self._parse_key(key) for key in self._raw_keys)
def __str__(self):
return self.text
class Select:
def __init__(
self,
options: Generator[Option, None, None] = None,
option_factory: Callable[[Any], Option] = None,
raw_options: List[Any] = None,
parse_option_key: Callable[[Any], Any] = lambda x: x,
human_io: HumanIO = HumanIO,
sort: bool = False,
**kwargs
):
self._parse_option_key: Callable[[Any], Any] = parse_option_key
self.human_io: HumanIO = human_io
self._key_to_option: Dict[Any, Option] = dict()
self._options: List[Option] = []
options = options or []
self.option_factory: Optional[Callable[[Any], Option]] = option_factory
if self.can_create_options:
_raw_options = raw_options or []
if sort:
_raw_options = sorted(_raw_options)
for raw_option in _raw_options:
self.append(self.option_factory(raw_option))
elif raw_options is not None:
raise MKComposeException("Cannot create options without a factory.")
self.extend(options)
@property
def can_create_options(self) -> bool:
return self.option_factory is not None
def append(self, option: Option):
option.parse_key = self._parse_option_key
self._options.append(option)
for key in option.keys:
self._key_to_option[key] = option
def _remap(self):
self._key_to_option = dict()
for option in self._options:
for key in option.keys:
self._key_to_option[key] = option
def extend(self, options: List[Option]):
for option in options:
self.append(option)
def __iter__(self) -> Generator[Option, None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __contains__(self, key: Any) -> bool:
return self._parse_option_key(key) in self._key_to_option
def __getitem__(self, key: Any) -> Option:
r = self._key_to_option[self._parse_option_key(key)]
if callable(r):
r = r()
if callable(r.value):
r.value = r.value()
return r
def create_option(self, key: Any, **kwargs) -> Option:
if not self.can_create_options:
raise MKComposeException("Cannot create options without a factory.")
option = self.option_factory(key, **kwargs)
self.append(option)
return option
def choose(self, key: Any) -> Optional[Option]:
if key not in self:
if self.can_create_options:
c = self.create_option(key)
if self.human_io.ask_to_create(c):
return c
self.human_io.not_found(key)
return None
return self[key]
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class StringSelect(Select):
def __init__(self, **kwargs):
self._current_index = 0
kwargs["option_factory"] = self.next_option
kwargs["parse_option_key"] = lambda x: unify(str(x))
super().__init__(**kwargs)
def next_option(self, value: Any) -> Optional[Option]:
o = Option(value=value, keys=[self._current_index], text=f"{BColors.BOLD.value}{self._current_index: >2}{BColors.ENDC.value}: {value}")
self._current_index += 1
return o
class GenreSelect(StringSelect):
@staticmethod
def is_valid_genre(genre: Path) -> bool:
"""
gets the name of all subdirectories of shared.MUSIC_DIR,
but filters out all directories, where the name matches with any Patern
from shared.NOT_A_GENRE_REGEX.
"""
if not genre.is_dir():
return False
if any(re.match(regex_pattern, genre.name) for regex_pattern in main_settings["not_a_genre_regex"]):
return False
return True
def __init__(self):
super().__init__(sort=True, raw_options=(genre.name for genre in filter(self.is_valid_genre, main_settings["music_directory"].iterdir())))
class SourceTypeToOption(dict):
def __init__(self, callback):
super().__init__()
self.callback = callback
def __missing__(self, key):
self[key] = self.callback(key)
return self[key]
class DataObjectSelect(Select):
def __init__(self, data_objects: Generator[DataObject]):
self._source_type_to_data_objects: Dict[SourceType, List[Option]] = defaultdict(list)
self._source_type_to_option: Dict[SourceType, Option] = SourceTypeToOption(self.option_from_source_type)
self._data_object_index: int = 0
self._source_type_index: int = 0
super().__init__(
parse_option_key=lambda x: unify(str(x)),
)
self.extend(data_objects)
def option_from_data_object(self, data_object: DataObject) -> Option:
index = self._data_object_index
self._data_object_index += 1
return Option(
value=data_object,
keys=[index, data_object.option_string, data_object.title_string],
text=f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {data_object.option_string}",
index=index,
)
def option_from_source_type(self, source_type: SourceType) -> Option:
index = ALPHABET[self._source_type_index % len(ALPHABET)]
self._source_type_index += 1
o = Option(
value=lambda: DataObjectSelect(self._source_type_to_data_objects[source_type]),
keys=[index, source_type],
text=f"{BColors.HEADER.value}({index}) --------------------------------{source_type.name:{'-'}<{21}}--------------------{BColors.ENDC.value}",
)
super().append(o)
return o
def append(self, option: Union[Option, DataObject]):
if isinstance(option, DataObject):
data_object = option
option = self.option_from_data_object(data_object)
else:
data_object = option.value
for source_type in data_object.source_collection.source_types(only_with_page=True):
self._source_type_to_data_objects[source_type].append(option)
super().append(option)
def __iter__(self):
source_types = list(sorted(self._source_type_to_data_objects.keys(), key=lambda x: x.name))
single_source = len(source_types) > 1
j = 0
for st in source_types:
if single_source:
yield self._source_type_to_option[st]
limit = min(15, len(self._source_type_to_data_objects[st])) if single_source else len(self._source_type_to_data_objects[st])
for i in range(limit):
o = self._source_type_to_data_objects[st][i]
o.index = j
yield o
j += 1
self._remap()

View File

@@ -0,0 +1,225 @@
from __future__ import annotations
import copy
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import (Any, Callable, Dict, Generator, Generic, Hashable, List,
Optional, Tuple, TypeVar, Union)
from ...objects import OuterProxy as DataObject
from ...utils import BColors
from ...utils.config import main_settings
from ...utils.enums import SourceType
from ...utils.exception import MKComposeException
from ...utils.shared import ALPHABET
from ...utils.string_processing import unify
P = TypeVar('P')
class HumanIO:
@staticmethod
def ask_to_create(option: Option) -> bool:
return True
@staticmethod
def not_found(key: Any) -> None:
return None
class Option(Generic[P]):
"""
This could represent a data object, a string or a page.
"""
TEXT_TEMPLATE: str = f"{BColors.BOLD.value}{{index}}{BColors.ENDC.value}: {{value}}"
ATTRIBUTES_FORMATTING: Tuple[str, ...] = ("index", "value")
ATTRIBUTES_KEY: Tuple[str, ...] = ("index", )
def __init__(
self,
value: P,
hidden: bool = False,
additional_keys: List[Hashable] = None,
**kwargs
):
self.value = value
self.hidden = hidden
self._additional_keys = set(self._to_hash(key) for key in additional_keys or [])
for key, value in kwargs.items():
setattr(self, key, value)
super(Option, self).__init__()
def _to_option_string(self, value: Any) -> str:
if hasattr(value, "option_string"):
return value.option_string
return str(value)
@property
def text(self) -> str:
text = self.TEXT_TEMPLATE
for attribute_key in self.ATTRIBUTES_FORMATTING:
text = text.replace(f"{{{attribute_key}}}", self._to_option_string(getattr(self, attribute_key)))
return text
def _to_hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def keys(self) -> set:
keys = self._additional_keys.copy()
for key in self.ATTRIBUTES_KEY:
keys.add(self._to_hash(getattr(self, key)))
def __contains__(self, key: Any) -> bool:
return self._to_hash(key) in self.keys
def __str__(self):
return self.text
def __iter__(self) -> Generator[Option[P], None, None]:
yield self
class Select(Generic[P]):
OPTION: Type[Option[P]] = Option
HUMAN_IO: Type[HumanIO] = HumanIO
CAN_CREATE_OPTIONS: bool = False
def __init__(
self,
data: Generator[P, None, None],
**kwargs
):
self.option: Type[Option[P]] = kwargs.get("option", self.OPTION)
self.human_io: Type[HumanIO] = kwargs.get("human_io", self.HUMAN_IO)
self.can_create_options: bool = kwargs.get("can_create_options", self.CAN_CREATE_OPTIONS)
self._options: List[Option[P]] = []
self.extend(data)
super(Select, self).__init__(**kwargs)
def append(self, value: P) -> Option[P]:
option = self.option(value)
self._options.append(option)
return option
def extend(self, values: Generator[P, None, None]):
for value in values:
self.append(value)
@property
def _options_to_show(self) -> Generator[Option[P], None, None]:
for option in self._options:
if option.hidden:
continue
yield option
def __iter__(self) -> Generator[Option, None, None]:
_index = 0
for i, o in enumerate(self._options_to_show):
for option in o:
option.index = _index
yield option
_index += 1
def __contains__(self, key: Any) -> bool:
for option in self._options:
if key in option:
return True
return False
def __getitem__(self, key: Any) -> Option[P]:
for option in self._options:
if key in option:
return option
raise KeyError(key)
def choose(self, key: Any) -> Optional[Option[P]]:
try:
return self[key]
except KeyError:
if self.can_create_options:
return self.append(key)
self.human_io.not_found(key)
def pprint(self) -> str:
return "\n".join(str(option) for option in self)
class Node(Generator[P]):
def __init__(
self,
value: Optional[P] = None,
children: List[Node[P]] = None,
parent: Node[P] = None,
**kwargs
):
self.value = value
self.depth = 0
self.same_level_index: int = 0
self.children: List[Node[P]] = kwargs.get("children", [])
self.parent: Optional[Node[P]] = kwargs.get("parent", None)
super(Node, self).__init__(**kwargs)
def hash_key(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
@property
def is_root(self) -> bool:
return self.parent is None
@property
def is_leaf(self) -> bool:
return not self.children
def __iter__(self, **kwargs) -> Generator[Node[P], None, None]:
_level_index_map: Dict[int, int] = kwargs.get("level_index_map", defaultdict(lambda: 0))
self.same_level_index = _level_index_map[self.depth]
yield self
_level_index_map[self.depth] += 1
for child in self.children:
child.depth = self.depth + 1
for node in child.__iter__(level_index_map=_level_index_map):
yield node
def __getitem__(self, key: Any) -> Option[P]:
pass
def __contains__(self, key: Any) -> bool:
if key in self.option:
return True

View File

@@ -0,0 +1 @@
from . import Option, Select

View File

@@ -1,35 +1,32 @@
from __future__ import annotations from __future__ import annotations
import copy
import random import random
from collections import defaultdict from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union from typing import Dict, List, Optional, Tuple, Type, Union
import copy
import pycountry import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus from ..utils.config import main_settings
from .collection import Collection from ..utils.enums.album import AlbumStatus, AlbumType
from .formatted_text import FormattedText from ..utils.enums.colors import BColors
from .lyrics import Lyrics
from .contact import Contact
from .artwork import Artwork
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from .option import Options
from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .artwork import Artwork
from .collection import Collection
from .contact import Contact
from .country import Country, Language
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
from .parents import P
from ..utils.config import main_settings from .source import Source, SourceCollection
from ..utils.enums.colors import BColors from .target import Target
""" """
All Objects dependent All Objects dependent

View File

@@ -1,15 +1,16 @@
from datetime import datetime import inspect
from pathlib import Path
import json import json
import logging import logging
import inspect from datetime import datetime
from pathlib import Path
from typing import List, Union from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args from .hacking import merge_args
from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE)
""" """
IO functions IO functions
@@ -125,4 +126,4 @@ def get_current_millis() -> int:
def get_unix_time() -> int: def get_unix_time() -> int:
return int(datetime.now().timestamp()) return int(datetime.now().timestamp())

View File

@@ -1,13 +1,12 @@
from typing import Tuple, Union, Optional
from pathlib import Path
import string import string
from functools import lru_cache from functools import lru_cache
from pathlib import Path
from typing import Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs from transliterate import translit
from transliterate.exceptions import LanguageDetectionError
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)", "(official video)",
@@ -180,6 +179,17 @@ def hash_url(url: Union[str, ParseResult]) -> str:
r = r.lower().strip() r = r.lower().strip()
return r return r
def hash(self, key: Any) -> int:
try:
key = int(key)
except ValueError:
pass
if isinstance(key, str):
return hash(unify(key))
return hash(key)
def remove_feature_part_from_track(title: str) -> str: def remove_feature_part_from_track(title: str) -> str:
if ")" != title[-1]: if ")" != title[-1]: