finished the easy part of migration

This commit is contained in:
Hellow 2023-09-10 16:54:06 +02:00
parent d2dd015817
commit a81a4a05a7
10 changed files with 59 additions and 159 deletions

View File

@ -5,7 +5,7 @@ import re
from .utils import cli_function from .utils import cli_function
from .options.first_config import initial_config from .options.first_config import initial_config
from ..utils.config import set_name_to_value, write_config from ..utils.config import write_config, main_settings
from ..utils.regex import URL_PATTERN from ..utils.regex import URL_PATTERN
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes import Query, DownloadResult from ..utils.support_classes import Query, DownloadResult
@ -15,8 +15,6 @@ from ..download.page_attributes import Pages
from ..pages import Page from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject from ..objects import Song, Album, Artist, DatabaseObject
from ..utils import settings
""" """
This is the implementation of the Shell This is the implementation of the Shell
@ -96,12 +94,12 @@ def get_existing_genre() -> List[str]:
existing_genres: List[str] = [] existing_genres: List[str] = []
# get all subdirectories of MUSIC_DIR, not the files in the dir. # get all subdirectories of MUSIC_DIR, not the files in the dir.
existing_subdirectories: List[Path] = [f for f in settings["music_directory"].iterdir() if f.is_dir()] existing_subdirectories: List[Path] = [f for f in main_settings["music_directory"].iterdir() if f.is_dir()]
for subdirectory in existing_subdirectories: for subdirectory in existing_subdirectories:
name: str = subdirectory.name name: str = subdirectory.name
if not any(re.match(regex_pattern, name) for regex_pattern in settings["not_a_genre_regex"]): if not any(re.match(regex_pattern, name) for regex_pattern in main_settings["not_a_genre_regex"]):
existing_genres.append(name) existing_genres.append(name)
existing_genres.sort() existing_genres.sort()
@ -134,7 +132,7 @@ def get_genre():
def help_message(): def help_message():
print() print()
print(settings["happy_messages"]) print(main_settings["happy_messages"])
print() print()
@ -188,18 +186,18 @@ class Downloader:
print() print()
def set_current_options(self, current_options: Results): def set_current_options(self, current_options: Results):
if settings["result_history"]: if main_settings["result_history"]:
self._result_history.append(current_options) self._result_history.append(current_options)
if settings["history_length"] != -1: if main_settings["history_length"] != -1:
if len(self._result_history) > settings["history_length"]: if len(self._result_history) > main_settings["history_length"]:
self._result_history.pop(0) self._result_history.pop(0)
self.current_results = current_options self.current_results = current_options
def previous_option(self) -> bool: def previous_option(self) -> bool:
if not settings["result_history"]: if not main_settings["result_history"]:
print("History is turned of.\nGo to settings, and change the value at 'result_history' to 'true'.") print("History is turned of.\nGo to main_settings, and change the value at 'result_history' to 'true'.")
return False return False
if len(self._result_history) <= 1: if len(self._result_history) <= 1:
@ -397,10 +395,10 @@ def download(
command_list: List[str] = None, command_list: List[str] = None,
process_metadata_anyway: bool = False, process_metadata_anyway: bool = False,
): ):
if settings["hasnt_yet_started"]: if main_settings["hasnt_yet_started"]:
code = initial_config() code = initial_config()
if code == 0: if code == 0:
set_name_to_value("hasnt_yet_started", "false") main_settings["hasnt_yet_started"] = False
write_config() write_config()
print("Restart the programm to use it.") print("Restart the programm to use it.")
return code return code

View File

@ -1,7 +1,7 @@
from typing import List, Optional, Dict, Type from typing import List, Optional, Dict, Type
from enum import Enum from enum import Enum
from ...utils.shared import YOUTUBE_MUSIC_LOGGER as LOGGER from ...utils.config import logging_settings
from ...objects import Source, DatabaseObject from ...objects import Source, DatabaseObject
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
@ -16,6 +16,9 @@ from ...objects import (
from ._music_object_render import parse_run_list, parse_run_element from ._music_object_render import parse_run_list, parse_run_element
LOGGER = logging_settings["youtube_music_logger"]
def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]: def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results = parse_run_list(renderer.get("title", {}).get("runs", [])) results = parse_run_list(renderer.get("title", {}).get("runs", []))

View File

@ -1,7 +1,7 @@
from typing import List, Optional from typing import List, Optional
from enum import Enum from enum import Enum
from ...utils.shared import YOUTUBE_MUSIC_LOGGER as LOGGER, YOUTUBE_MUSIC_CLEAN_DATA from ...utils.config import youtube_settings, logging_settings
from ...objects import Source, DatabaseObject from ...objects import Source, DatabaseObject
from ..abstract import Page from ..abstract import Page
from ...objects import ( from ...objects import (
@ -14,6 +14,8 @@ from ...objects import (
Target Target
) )
LOGGER = logging_settings["youtube_music_logger"]
SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC
@ -50,15 +52,15 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
LOGGER.warning("Couldn't find either the id or text of a Youtube music element.") LOGGER.warning("Couldn't find either the id or text of a Youtube music element.")
return return
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not YOUTUBE_MUSIC_CLEAN_DATA) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not YOUTUBE_MUSIC_CLEAN_DATA): if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=element_text, source_list=[source]) return Song(title=element_text, source_list=[source])
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not YOUTUBE_MUSIC_CLEAN_DATA): if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")
return Artist(name=element_text, source_list=[source]) return Artist(name=element_text, source_list=[source])
if element_type == PageType.ALBUM or (element_type == PageType.PLAYLIST and not YOUTUBE_MUSIC_CLEAN_DATA): if element_type == PageType.ALBUM or (element_type == PageType.PLAYLIST and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/playlist?list={element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/playlist?list={element_id}")
return Album(title=element_text, source_list=[source]) return Album(title=element_text, source_list=[source])

View File

@ -20,11 +20,11 @@ from ...objects import (
) )
from ...connection import Connection from ...connection import Connection
from ...utils.support_classes import DownloadResult from ...utils.support_classes import DownloadResult
from ...utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403 from ...utils.config import youtube_settings, logging_settings, main_settings
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment)) return urlunparse((youtube_settings["invidious_instance"].scheme, youtube_settings["invidious_instance"].netloc, path, params, query, fragment))
class YouTubeUrlType(Enum): class YouTubeUrlType(Enum):
@ -94,7 +94,7 @@ class YouTubeUrl:
def couldnt_find_id(self, url: str): def couldnt_find_id(self, url: str):
YOUTUBE_LOGGER.warning(f"The id is missing: {url}") logging_settings["youtube_logger"].warning(f"The id is missing: {url}")
self.url_type = YouTubeUrlType.NONE self.url_type = YouTubeUrlType.NONE
@property @property
@ -125,7 +125,7 @@ class YouTubeUrl:
class SuperYouTube(Page): class SuperYouTube(Page):
# CHANGE # CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = YOUTUBE_LOGGER LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True NO_ADDITIONAL_DATA_FROM_SONG = True
@ -133,7 +133,7 @@ class SuperYouTube(Page):
self.download_connection: Connection = Connection( self.download_connection: Connection = Connection(
host="https://www.youtube.com/", host="https://www.youtube.com/",
logger=self.LOGGER, logger=self.LOGGER,
sleep_after_404=SLEEP_AFTER_YOUTUBE_403 sleep_after_404=youtube_settings["sleep_after_youtube_403"]
) )
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does # the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
@ -180,7 +180,7 @@ class SuperYouTube(Page):
bitrate = int(possible_format.get("bitrate", 0)) bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= BITRATE: if bitrate >= main_settings["bitrate"]:
best_bitrate = bitrate best_bitrate = bitrate
audio_format = possible_format audio_format = possible_format
break break
@ -198,7 +198,7 @@ class SuperYouTube(Page):
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
if not ENABLE_SPONSOR_BLOCK: if not youtube_settings["use_sponsor_block"]:
return [] return []
parsed = YouTubeUrl(source.url) parsed = YouTubeUrl(source.url)

View File

@ -9,7 +9,6 @@ import re
from ...utils.exception.config import SettingValueError from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG from ...utils.shared import DEBUG
from ...utils.config import CONNECTION_SECTION, write_config
from ...utils.functions import get_current_millis from ...utils.functions import get_current_millis
if DEBUG: if DEBUG:
from ...utils.debug_utils import dump_to_file from ...utils.debug_utils import dump_to_file
@ -100,7 +99,7 @@ class YoutubeMusic(SuperYouTube):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: YoutubeMusicConnection = YoutubeMusicConnection(logger=self.LOGGER, accept_language="en-US,en;q=0.5") self.connection: YoutubeMusicConnection = YoutubeMusicConnection(logger=self.LOGGER, accept_language="en-US,en;q=0.5")
self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials( self.credentials: YouTubeMusicCredentials = YouTubeMusicCredentials(
api_key=CONNECTION_SECTION.YOUTUBE_MUSIC_API_KEY.object_from_value, api_key=youtube_settings["youtube_music_api_key"],
ctoken="", ctoken="",
context= { context= {
"client": { "client": {
@ -191,7 +190,7 @@ class YoutubeMusic(SuperYouTube):
api_key = api_keys[0] api_key = api_keys[0]
try: try:
CONNECTION_SECTION.YOUTUBE_MUSIC_API_KEY.set_value(api_key) youtube_settings["youtube_music_api_key"] = api_key
except SettingValueError: except SettingValueError:
continue continue
@ -199,7 +198,6 @@ class YoutubeMusic(SuperYouTube):
break break
if found_a_good_api_key: if found_a_good_api_key:
write_config()
self.LOGGER.info(f"Found a valid API-KEY for {type(self).__name__}: \"{api_key}\"") self.LOGGER.info(f"Found a valid API-KEY for {type(self).__name__}: \"{api_key}\"")
else: else:
self.LOGGER.error(f"Couldn't find an API-KEY for {type(self).__name__}. :((") self.LOGGER.error(f"Couldn't find an API-KEY for {type(self).__name__}. :((")

View File

@ -1,127 +0,0 @@
from typing import Union, Tuple, Dict, Iterable, List
from datetime import datetime
import logging
import os
from ..exception.config import SettingNotFound, SettingValueError
from ..path_manager import LOCATIONS
from .base_classes import Description, Attribute, Section, EmptyLine, COMMENT_PREFIX
from .sections.audio import AUDIO_SECTION
from .sections.logging import LOGGING_SECTION
from .sections.connection import CONNECTION_SECTION
from .sections.misc import MISC_SECTION
from .sections.paths import PATHS_SECTION
LOGGER = logging.getLogger("config")
class Config:
def __init__(self):
self.config_elements: Tuple[Union[Description, Attribute, Section], ...] = (
Description("IMPORTANT: If you modify this file, the changes for the actual setting, will be kept as is.\n"
"The changes you make to the comments, will be discarded, next time you run music-kraken. "
"Have fun!"),
Description(f"Latest reset: {datetime.now()}"),
Description("Those are all Settings for the audio codec.\n"
"If you, for some reason wanna fill your drive real quickly, I mean enjoy HIFI music,\n"
"feel free to tinker with the Bitrate or smth. :)"),
AUDIO_SECTION,
Description("Modify how Music-Kraken connects to the internet:"),
CONNECTION_SECTION,
Description("Modify all your paths, except your config file..."),
PATHS_SECTION,
Description("For all your Logging needs.\n"
"If you found a bug, and wan't to report it, please set the Logging level to 0,\n"
"reproduce the bug, and attach the logfile in the bugreport. ^w^"),
LOGGING_SECTION,
Description("If there are stupid settings, they are here."),
MISC_SECTION,
Description("🏳️‍⚧️🏳️‍⚧️ Protect trans youth. 🏳️‍⚧️🏳️‍⚧️\n"),
)
self._length = 0
self._section_list: List[Section] = []
self.name_section_map: Dict[str, Section] = dict()
for element in self.config_elements:
if not isinstance(element, Section):
continue
self._section_list.append(element)
for name in element.name_attribute_map:
if name in self.name_section_map:
raise ValueError(f"Two sections have the same name: "
f"{name}: "
f"{element.__class__.__name__} {self.name_section_map[name].__class__.__name__}")
self.name_section_map[name] = element
self._length += 1
def set_name_to_value(self, name: str, value: str, silent: bool = True):
"""
:raises SettingValueError, SettingNotFound:
:param name:
:param value:
:return:
"""
if name not in self.name_section_map:
if silent:
LOGGER.warning(f"The setting \"{name}\" is either deprecated, or doesn't exist.")
return
raise SettingNotFound(setting_name=name)
LOGGER.debug(f"setting: {name} value: {value}")
self.name_section_map[name].modify_setting(setting_name=name, new_value=value)
def __len__(self):
return self._length
@property
def config_string(self) -> str:
return "\n\n".join(str(element) for element in self.config_elements)
def _parse_conf_line(self, line: str, index: int):
"""
:raises SettingValueError, SettingNotFound:
:param line:
:param index:
:return:
"""
line = line.strip()
if line.startswith(COMMENT_PREFIX):
return
if line == "":
return
if "=" not in line:
"""
TODO
No value error but custom conf error
"""
raise ValueError(f"Couldn't find the '=' in line {index}.")
line_segments = line.split("=")
name = line_segments[0]
value = "=".join(line_segments[1:])
self.set_name_to_value(name, value)
def read_from_config_file(self, path: os.PathLike):
with open(path, "r", encoding=LOCATIONS.FILE_ENCODING) as conf_file:
for section in self._section_list:
section.reset_list_attribute()
for i, line in enumerate(conf_file):
self._parse_conf_line(line, i+1)
def write_to_config_file(self, path: os.PathLike):
with open(path, "w", encoding=LOCATIONS.FILE_ENCODING) as conf_file:
conf_file.write(self.config_string)
def __iter__(self) -> Iterable[Attribute]:
for section in self._section_list:
for name, attribute in section.name_attribute_map.items():
yield attribute

View File

@ -143,7 +143,7 @@ class AudioFormatAttribute(Attribute):
class LoggerAttribute(Attribute): class LoggerAttribute(Attribute):
def parse_simple_value(self, value: str) -> logging.Logger: def parse_simple_value(self, value: str) -> logging.Logger:
return logging.getLogger(self.value) return logging.getLogger(value)
def unparse_simple_value(self, value: logging.Logger) -> any: def unparse_simple_value(self, value: logging.Logger) -> any:
return value.name return value.name

View File

@ -1,4 +1,4 @@
from typing import Tuple, Union from typing import Any, Tuple, Union
from pathlib import Path from pathlib import Path
import logging import logging
@ -7,6 +7,23 @@ import toml
from .attributes.attribute import Attribute, Description, EmptyLine from .attributes.attribute import Attribute, Description, EmptyLine
class ConfigDict(dict):
def __init__(self, config_reference: "Config", *args, **kwargs):
self.config_reference: Config = config_reference
super().__init__(*args, **kwargs)
def __getattribute__(self, __name: str) -> Any:
return super().__getattribute__(__name)
def __setitem__(self, __key: Any, __value: Any) -> None:
attribute: Attribute = self.config_reference.attribute_map[__key]
attribute.load_toml({attribute.name: __value})
self.config_reference.write()
return super().__setitem__(__key, attribute.value)
class Config: class Config:
def __init__(self, componet_list: Tuple[Union[Attribute, Description, EmptyLine]], config_file: Path) -> None: def __init__(self, componet_list: Tuple[Union[Attribute, Description, EmptyLine]], config_file: Path) -> None:
self.config_file: Path = config_file self.config_file: Path = config_file
@ -14,6 +31,14 @@ class Config:
self.component_list: Tuple[Union[Attribute, Description, EmptyLine]] = componet_list self.component_list: Tuple[Union[Attribute, Description, EmptyLine]] = componet_list
self.loaded_settings: dict = {} self.loaded_settings: dict = {}
self.attribute_map = {}
for component in self.component_list:
if not isinstance(component, Attribute):
continue
self.attribute_map[component.name] = component
self.loaded_settings[component.name] = component.value
@property @property
def toml_string(self): def toml_string(self):
"\n\n".join(component.toml_string for component in self.component_list) "\n\n".join(component.toml_string for component in self.component_list)

View File

@ -18,7 +18,7 @@ config = Config([
Reference for the logging formats: https://docs.python.org/3/library/logging.html#logrecord-attributes"""), Reference for the logging formats: https://docs.python.org/3/library/logging.html#logrecord-attributes"""),
IntegerSelect( IntegerSelect(
name="log_level", name="log_level",
default_value=str(logging.INFO), default_value=logging.INFO,
options={ options={
"CRITICAL": 50, "CRITICAL": 50,
"ERROR": 40, "ERROR": 40,

View File

@ -103,6 +103,7 @@ all the error messages are shown."""),
], description="""Just some nice and wholesome messages. ], description="""Just some nice and wholesome messages.
If your mindset has traits of a [file corruption], you might not agree. If your mindset has traits of a [file corruption], you might not agree.
But anyways... Freedom of thought, so go ahead and change the messages."""), But anyways... Freedom of thought, so go ahead and change the messages."""),
Attribute(name="modify_gc", default_value=True),
Attribute(name="id_bits", default_value=64, description="I really dunno why I even made this a setting.. Modifying this is a REALLY dumb idea."), Attribute(name="id_bits", default_value=64, description="I really dunno why I even made this a setting.. Modifying this is a REALLY dumb idea."),
Description("🏳️‍⚧️🏳️‍⚧️ Protect trans youth. 🏳️‍⚧️🏳️‍⚧️\n"), Description("🏳️‍⚧️🏳️‍⚧️ Protect trans youth. 🏳️‍⚧️🏳️‍⚧️\n"),