Merge branch 'source/bandcamp' into experimental

This commit is contained in:
2024-04-10 14:39:34 +02:00
192 changed files with 4692 additions and 7608 deletions

View File

@@ -0,0 +1,66 @@
from datetime import datetime
from pathlib import Path
import json
import logging
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
"""
IO functions
"""
def _apply_color(msg: str, color: BColors) -> str:
if color is BColors.ENDC:
return msg
return color.value + msg + BColors.ENDC.value
def output(msg: str, color: BColors = BColors.ENDC):
print(_apply_color(msg, color))
def user_input(msg: str, color: BColors = BColors.ENDC):
return input(_apply_color(msg, color)).strip()
def dump_to_file(file_name: str, payload: str, is_json: bool = False, exit_after_dump: bool = False):
if not DEBUG_DUMP:
return
path = Path(LOCATIONS.TEMP_DIRECTORY, file_name)
logging.warning(f"dumping {file_name} to: \"{path}\"")
if is_json and isinstance(payload, str):
payload = json.loads(payload)
if isinstance(payload, dict):
payload = json.dumps(payload, indent=4)
with path.open("w") as f:
f.write(payload)
if exit_after_dump:
exit()
def trace(msg: str):
if not DEBUG_TRACE:
return
output("trace: " + msg, BColors.OKBLUE)
"""
misc functions
"""
def get_current_millis() -> int:
dt = datetime.now()
return int(dt.microsecond / 1_000)
def get_unix_time() -> int:
return int(datetime.now().timestamp())

View File

@@ -0,0 +1,33 @@
from typing import Tuple
from .config import Config
from .config_files import (
main_config,
logging_config,
youtube_config,
)
_sections: Tuple[Config, ...] = (
main_config.config,
logging_config.config,
youtube_config.config
)
def read_config():
for section in _sections:
section.read()
# special cases
if main_settings['tor']:
main_settings['proxies'] = {
'http': f'socks5h://127.0.0.1:{main_settings["tor_port"]}',
'https': f'socks5h://127.0.0.1:{main_settings["tor_port"]}'
}
def write_config():
for section in _sections:
section.write()
main_settings: main_config.SettingsStructure = main_config.config.loaded_settings
logging_settings: logging_config.SettingsStructure = logging_config.config.loaded_settings
youtube_settings: youtube_config.SettingsStructure = youtube_config.config.loaded_settings

View File

@@ -0,0 +1,132 @@
import re
from typing import Optional, List, Union, Iterable, Callable
from dataclasses import dataclass
import logging
import toml
from copy import deepcopy, copy
from urllib.parse import urlparse, urlunparse, ParseResult
from ...exception.config import SettingValueError
from ..utils import comment
LOGGER = logging.getLogger("config")
COMMENT_PREFIX = "#"
def comment_string(uncommented: str) -> str:
unprocessed_lines = uncommented.split("\n")
processed_lines: List[str] = []
for line in unprocessed_lines:
if line.startswith(COMMENT_PREFIX) or line == "":
processed_lines.append(line)
continue
line = COMMENT_PREFIX + " " + line
processed_lines.append(line)
return "\n".join(processed_lines)
@dataclass
class Description:
description: str
@property
def toml_string(self):
return comment_string(self.description)
class EmptyLine(Description):
def __init__(self):
self.description = ""
class Attribute:
def __init__(
self,
name: str,
default_value: any,
description: Optional[str] = None,
):
self.name = name
self.value = self._recursive_parse_object(default_value, self.parse_simple_value)
self.description: Optional[str] = description
self.loaded_settings: dict = None
def initialize_from_config(self, loaded_settings: dict):
self.loaded_settings = loaded_settings
self.loaded_settings.__setitem__(self.name, self.value, True)
def unparse_simple_value(self, value: any) -> any:
return value
def parse_simple_value(self, value: any) -> any:
return value
def _recursive_parse_object(self, __object, callback: Callable):
__object = copy(__object)
if isinstance(__object, dict):
for key, value in __object.items():
__object[key] = self._recursive_parse_object(value, callback)
return __object
if isinstance(__object, list) or (isinstance(__object, tuple) and not isinstance(__object, ParseResult)):
for i, item in enumerate(__object):
__object[i] = self._recursive_parse_object(item, callback)
return __object
return callback(__object)
def parse(self, unparsed_value):
self.value = self._recursive_parse_object(unparsed_value, self.parse_simple_value)
return self.value
def unparse(self, parsed_value):
return self._recursive_parse_object(parsed_value, self.unparse_simple_value)
def load_toml(self, loaded_toml: dict) -> bool:
"""
returns true if succesfull
"""
if self.name not in loaded_toml:
LOGGER.warning(f"No setting by the name {self.name} found in the settings file.")
self.loaded_settings.__setitem__(self.name, self.value, True)
return
try:
self.parse(loaded_toml[self.name])
except SettingValueError as settings_error:
logging.warning(settings_error)
return False
self.loaded_settings.__setitem__(self.name, self.value, True)
return True
@property
def toml_string(self) -> str:
string = ""
if self.description is not None:
string += comment(self.description) + "\n"
string += toml.dumps({self.name: self.unparse(self.value)})
# print(string)
return string
def __str__(self):
return f"{self.description}\n{self.name}={self.value}"

View File

@@ -0,0 +1,151 @@
from pathlib import Path, PosixPath
from typing import Optional, Dict, Set
from urllib.parse import urlparse, urlunparse
import logging
from .attribute import Attribute
from ...exception.config import SettingValueError
class UrlAttribute(Attribute):
def parse_simple_value(self, value: any) -> any:
return urlparse(value)
def unparse_simple_value(self, value: any) -> any:
return urlunparse((value.scheme, value.netloc, value.path, value.params, value.query, value.fragment))
class PathAttribute(Attribute):
def parse_simple_value(self, value: any) -> Path:
if isinstance(value, Path) or isinstance(value, PosixPath):
return value
return Path(value)
def unparse_simple_value(self, value: Path) -> any:
return str(value.resolve())
class SelectAttribute(Attribute):
def __init__(self, name: str, default_value: any, options: tuple, description: Optional[str] = None, ignore_options_for_description = False):
self.options: tuple = options
new_description = ""
if description is not None:
new_description += description
new_description += "\n"
if not ignore_options_for_description:
new_description += f"{{{', '.join(self.options)}}}"
super().__init__(name, default_value, description)
def parse_simple_value(self, value: any) -> any:
if value in self.options:
return value
raise SettingValueError(
setting_name=self.name,
setting_value=value,
rule=f"has to be in the options: {{{', '.join(self.options)}}}."
)
def unparse_simple_value(self, value: any) -> any:
return value
class IntegerSelect(Attribute):
def __init__(self, name: str, default_value: any, options: Dict[int, str], description: Optional[str] = None, ignore_options_for_description = False):
self.options: Dict[str, int] = options
self.option_values: Set[int] = set(self.options.values())
new_description = ""
if description is not None:
new_description += description
description_lines = []
if description is not None:
description_lines.append(description)
description_lines.append("The values can be either an integer or one of the following values:")
for number, option in self.options.items():
description_lines.append(f"{number}: {option}")
super().__init__(name, default_value, "\n".join(description_lines))
def parse_simple_value(self, value: any) -> any:
if isinstance(value, str):
if value not in self.options:
raise SettingValueError(
setting_name=self.name,
setting_value=value,
rule=f"has to be in the options: {{{', '.join(self.options.keys())}}}, if it is a string."
)
return self.options[value]
return value
def unparse_simple_value(self, value: int) -> any:
if value in self.option_values:
for option, v in self.options.items():
if v == value:
return value
return value
ID3_2_FILE_FORMATS = frozenset((
"mp3", "mp2", "mp1", # MPEG-1 ID3.2
"wav", "wave", "rmi", # RIFF (including WAV) ID3.2
"aiff", "aif", "aifc", # AIFF ID3.2
"aac", "aacp", # Raw AAC ID3.2
"tta", # True Audio ID3.2
))
_sorted_id3_2_formats = sorted(ID3_2_FILE_FORMATS)
ID3_1_FILE_FORMATS = frozenset((
"ape", # Monkey's Audio ID3.1
"mpc", "mpp", "mp+", # MusePack ID3.1
"wv", # WavPack ID3.1
"ofr", "ofs" # OptimFrog ID3.1
))
_sorted_id3_1_formats = sorted(ID3_1_FILE_FORMATS)
class AudioFormatAttribute(Attribute):
def __init__(self, name: str, default_value: any, description: Optional[str] = None, ignore_options_for_description = False):
new_description = ""
if description is not None:
new_description += description
new_description += "\n"
new_description += f"ID3.2: {{{', '.join(ID3_2_FILE_FORMATS)}}}\n"
new_description += f"ID3.1: {{{', '.join(ID3_1_FILE_FORMATS)}}}"
super().__init__(name, default_value, description)
def parse_simple_value(self, value: any) -> any:
value = value.strip().lower()
if value in ID3_2_FILE_FORMATS:
return value
if value in ID3_1_FILE_FORMATS:
logging.debug(f"setting audio format to a format that only supports ID3.1: {v}")
return value
raise SettingValueError(
setting_name=self.name,
setting_value=value,
rule="has to be a valid audio format, supporting id3 metadata"
)
def unparse_simple_value(self, value: any) -> any:
return value
class LoggerAttribute(Attribute):
def parse_simple_value(self, value: str) -> logging.Logger:
return logging.getLogger(value)
def unparse_simple_value(self, value: logging.Logger) -> any:
return value.name

View File

@@ -0,0 +1,86 @@
from typing import Any, Tuple, Union, List
from pathlib import Path
import logging
from datetime import datetime
import toml
from .attributes.attribute import Attribute, Description, EmptyLine
class ConfigDict(dict):
def __init__(self, config_reference: "Config", *args, **kwargs):
self.config_reference: Config = config_reference
super().__init__(*args, **kwargs)
def __getitem__(self, __name: str) -> Any:
return super().__getitem__(__name)
def __setitem__(self, __key: Any, __value: Any, from_attribute: bool = False, is_parsed: bool = False) -> None:
if not from_attribute:
attribute: Attribute = self.config_reference.attribute_map[__key]
if is_parsed:
attribute.value = __value
else:
attribute.parse(__value)
self.config_reference.write()
__value = attribute.value
return super().__setitem__(__key, __value)
class Config:
def __init__(self, component_list: Tuple[Union[Attribute, Description, EmptyLine], ...], config_file: Path) -> None:
self.config_file: Path = config_file
self.component_list: List[Union[Attribute, Description, EmptyLine]] = [
Description(f"""IMPORTANT: If you modify this file, the changes for the actual setting, will be kept as is.
The changes you make to the comments, will be discarded, next time you run music-kraken. Have fun!
Latest reset: {datetime.now()}
_____
/ ____|
| | __ __ _ _ _
| | |_ | / _` || | | |
| |__| || (_| || |_| |
\_____| \__,_| \__, |
__/ |
|___/
""")]
self.component_list.extend(component_list)
self.loaded_settings: ConfigDict = ConfigDict(self)
self.attribute_map = {}
for component in self.component_list:
if not isinstance(component, Attribute):
continue
component.initialize_from_config(self.loaded_settings)
self.attribute_map[component.name] = component
@property
def toml_string(self):
return "\n".join(component.toml_string for component in self.component_list)
def write(self):
print(self.config_file)
with self.config_file.open("w", encoding="utf-8") as conf_file:
conf_file.write(self.toml_string)
def read(self):
if not self.config_file.is_file():
logging.info(f"Config file at '{self.config_file}' doesn't exist => generating")
self.write()
return
toml_data = {}
with self.config_file.open("r", encoding="utf-8") as conf_file:
toml_data = toml.load(conf_file)
for component in self.component_list:
if isinstance(component, Attribute):
component.load_toml(toml_data)

View File

@@ -0,0 +1,105 @@
from typing import TypedDict, List
from urllib.parse import ParseResult
from logging import Logger
from pathlib import Path
import logging
from ...path_manager import LOCATIONS
from ..config import Config
from ..attributes.attribute import Attribute, EmptyLine
from ..attributes.special_attributes import (
IntegerSelect,
LoggerAttribute
)
config = Config([
Attribute(name="logging_format", default_value="%(levelname)s:%(name)s:%(message)s", description="""Logging settings for the actual logging:
Reference for the logging formats: https://docs.python.org/3/library/logging.html#logrecord-attributes"""),
IntegerSelect(
name="log_level",
default_value=logging.INFO,
options={
"CRITICAL": 50,
"ERROR": 40,
"WARNING": 30,
"INFO": 20,
"DEBUG": 10,
"NOTSET": 0
}
),
LoggerAttribute(
name="download_logger",
description="The logger for downloading.",
default_value="download"
),
LoggerAttribute(
name="tagging_logger",
description="The logger for tagging id3 containers.",
default_value="tagging"
),
LoggerAttribute(
name="codex_logger",
description="The logger for streaming the audio into an uniform codex.",
default_value="codex"
),
LoggerAttribute(
name="object_logger",
description="The logger for creating Data-Objects.",
default_value="object"
),
LoggerAttribute(
name="database_logger",
description="The logger for Database operations.",
default_value="database"
),
LoggerAttribute(
name="musify_logger",
description="The logger for the musify scraper.",
default_value="musify"
),
LoggerAttribute(
name="youtube_logger",
description="The logger for the youtube scraper.",
default_value="youtube"
),
LoggerAttribute(
name="youtube_music_logger",
description="The logger for the youtube music scraper.\n(The scraper is seperate to the youtube scraper)",
default_value="youtube_music"
),
LoggerAttribute(
name="metal_archives_logger",
description="The logger for the metal archives scraper.",
default_value="metal_archives"
),
LoggerAttribute(
name="genius_logger",
description="The logger for the genius scraper",
default_value="genius"
),
LoggerAttribute(
name="bandcamp_logger",
description="The logger for the bandcamp scraper",
default_value="bandcamp"
)
], LOCATIONS.get_config_file("logging"))
class SettingsStructure(TypedDict):
# logging
logging_format: str
log_level: int
download_logger: Logger
tagging_logger: Logger
codex_logger: Logger
object_logger: Logger
database_logger: Logger
musify_logger: Logger
youtube_logger: Logger
youtube_music_logger: Logger
metal_archives_logger: Logger
genius_logger: Logger
bandcamp_logger: Logger

View File

@@ -0,0 +1,155 @@
from typing import TypedDict, List
from urllib.parse import ParseResult
from logging import Logger
from pathlib import Path
from ...path_manager import LOCATIONS
from ..config import Config
from ..attributes.attribute import Attribute, EmptyLine, Description
from ..attributes.special_attributes import (
SelectAttribute,
PathAttribute,
AudioFormatAttribute
)
config = Config((
Attribute(name="hasnt_yet_started", default_value=False, description="This will be set automatically, to look if it needs to run the scripts that run on start."),
Attribute(name="bitrate", default_value=125, description="Streams the audio with given bitrate [kB/s]. Can't stream with a higher Bitrate, than the audio source provides."),
AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format.
You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage.
'-1' removes the Limit entirely."""),
EmptyLine(),
Attribute(name="sort_by_date", default_value=True, description="If this is set to true, it will set the albumsort attribute such that,\nthe albums are sorted by date"),
Attribute(name="sort_album_by_type", default_value=True, description="""If this is set to true, it will set the albumsort attribute such that,
the albums are put into categories before being sorted.
This means for example, the Studio Albums and EP's are always in front of Singles, and Compilations are in the back."""),
Attribute(name="download_path", default_value="{genre}/{artist}/{album}", description="""There are multiple fields, you can use for the path and file name:
- genre
- label
- artist
- album
- song
- album_type
The folder music kraken should put the songs into."""),
Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."),
SelectAttribute(name="album_type_blacklist", default_value=[
"Compilation Album",
"Live Album",
"Mixtape"
], options=("Studio Album", "EP (Extended Play)", "Single", "Live Album", "Compilation Album", "Mixtape", "Demo", "Other"), description="""Music Kraken ignores all albums of those types.
Following album types exist in the programm:"""),
Attribute(name="refresh_after", default_value=161, description="The time in seconds, after which a song/album/artist/label is newly fetched."),
EmptyLine(),
Attribute(name="proxies", default_value=[], description="This is a dictionary."),
Attribute(name="tor", default_value=False, description="""Route ALL traffic through Tor.
If you use Tor, make sure the Tor browser is installed, and running.I can't guarantee maximum security though!"""),
Attribute(name="tor_port", default_value=9150, description="The port, tor is listening. If tor is already working, don't change it."),
Attribute(name="chunk_size", default_value=1024, description="Size of the chunks that are streamed.\nHere could be some room for improvement."),
Attribute(name="show_download_errors_threshold", default_value=0.3, description="""If the percentage of failed downloads goes over this threshold,
all the error messages are shown."""),
Attribute(
name="language",
default_value="en-US,en;q=0.6",
description="The language of the program. This will be used to translate the program in the future.\n"
"Currently it just sets the Accept-Language header.\n"
"https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Language"
),
Attribute(
name="user_agent",
default_value="Mozilla/5.0 (X11; Linux x86_64; rv:90.0) Gecko/20100101 Firefox/90.0",
description="The user agent of the program. This will be used to translate the program in the future.\n"
"Currently it just sets the User-Agent header.\n"
"https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent"
),
Attribute(
name="tries_per_proxy",
default_value=2,
description="The retries it should do. These can be overridden by the program, at certain places, and they have to be.",
),
EmptyLine(),
PathAttribute(name="music_directory", default_value=LOCATIONS.MUSIC_DIRECTORY.resolve(), description="The directory, all the music will be downloaded to."),
PathAttribute(name="temp_directory", default_value=LOCATIONS.TEMP_DIRECTORY.resolve(), description="All temporary stuff is gonna be dumped in this directory."),
PathAttribute(name="log_file", default_value=LOCATIONS.get_log_file("download_logs.log").resolve()),
PathAttribute(name="ffmpeg_binary", default_value=LOCATIONS.FFMPEG_BIN.resolve(), description="Set the path to the ffmpeg binary."),
PathAttribute(name="cache_directory", default_value=LOCATIONS.CACHE_DIRECTORY.resolve(), description="Set the path of the cache directory."),
Attribute(
name="not_a_genre_regex",
description="These regular expressions tell music-kraken, which sub-folders of the music-directory\n"
"it should ignore, and not count to genres",
default_value=[
r'^\.' # is hidden/starts with a "."
]
),
EmptyLine(),
Attribute(name="happy_messages", default_value=[
"Support the artist.",
"Star Me: https://github.com/HeIIow2/music-downloader",
"🏳️‍⚧️🏳️‍⚧️ Trans rights are human rights. 🏳️‍⚧️🏳️‍⚧️",
"🏳️‍⚧️🏳️‍⚧️ Trans women are women, trans men are men, and enbies are enbies. 🏳️‍⚧️🏳️‍⚧️",
"🏴‍☠️🏴‍☠️ Unite under one flag, fck borders. 🏴‍☠️🏴‍☠️",
"Join my Matrix Space: https://matrix.to/#/#music-kraken:matrix.org",
"BPJM does cencorship.",
"🏳️‍⚧️🏳️‍⚧️ Protect trans youth. 🏳️‍⚧️🏳️‍⚧️",
"Klassenkampf",
"Rise Proletarians!!"
], description="""Just some nice and wholesome messages.
If your mindset has traits of a [file corruption], you might not agree.
But anyways... Freedom of thought, so go ahead and change the messages."""),
Attribute(name="modify_gc", default_value=True),
Attribute(name="id_bits", default_value=64, description="I really dunno why I even made this a setting.. Modifying this is a REALLY dumb idea."),
Description("🏳️‍⚧️🏳️‍⚧️ Protect trans youth. 🏳️‍⚧️🏳️‍⚧️\n"),
), LOCATIONS.get_config_file("main"))
class SettingsStructure(TypedDict):
hasnt_yet_started: bool
result_history: bool
history_length: int
happy_messages: List[str]
modify_gc: bool
id_bits: int
refresh_after: int
# audio
bitrate: int
audio_format: str
sort_by_date: bool
sort_album_by_type: bool
download_path: str
download_file: str
album_type_blacklist: List[str]
# connection
proxies: List[dict[str, str]]
tries_per_proxy: int
tor: bool
tor_port: int
chunk_size: int
show_download_errors_threshold: float
language: str
user_agent: str
# paths
music_directory: Path
temp_directory: Path
log_file: Path
not_a_genre_regex: List[str]
ffmpeg_binary: Path
cache_directory: Path

View File

@@ -0,0 +1,113 @@
from typing import TypedDict, List
from urllib.parse import ParseResult
from logging import Logger
from pathlib import Path
from ...path_manager import LOCATIONS
from ..config import Config
from ..attributes.attribute import Attribute
from ..attributes.special_attributes import SelectAttribute, PathAttribute, UrlAttribute
config = Config((
Attribute(name="use_youtube_alongside_youtube_music", default_value=False, description="""If set to true, it will search youtube through invidious and piped,
despite a direct wrapper for the youtube music INNERTUBE api being implemented.
I my INNERTUBE api wrapper doesn't work, set this to true."""),
UrlAttribute(name="invidious_instance", default_value="https://yt.artemislena.eu", description="""This is an attribute, where you can define the invidious instances,
the youtube downloader should use.
Here is a list of active ones: https://docs.invidious.io/instances/
Instances that use cloudflare or have source code changes could cause issues.
Hidden instances (.onion) will only work, when setting 'tor=true'."""),
UrlAttribute(name="piped_instance", default_value="https://piped-api.privacy.com.de", description="""This is an attribute, where you can define the pioed instances,
the youtube downloader should use.
Here is a list of active ones: https://github.com/TeamPiped/Piped/wiki/Instances
Instances that use cloudflare or have source code changes could cause issues.
Hidden instances (.onion) will only work, when setting 'tor=true"""),
Attribute(name="sleep_after_youtube_403", default_value=30, description="The time to wait, after youtube returned 403 (in seconds)"),
Attribute(name="youtube_music_api_key", default_value="AIzaSyC9XL3ZjWddXya6X74dJoCTL-WEYFDNX30", description="""This is the API key used by YouTube-Music internally.
Dw. if it is empty, Rachel will fetch it automatically for you <333
(she will also update outdated api keys/those that don't work)"""),
Attribute(name="youtube_music_clean_data", default_value=True, description="If set to true, it exclusively fetches artists/albums/songs, not things like user channels etc."),
UrlAttribute(name="youtube_url", default_value=[
"https://www.youtube.com/",
"https://www.youtu.be/",
"https://music.youtube.com/",
], description="""This is used to detect, if an url is from youtube, or any alternativ frontend.
If any instance seems to be missing, run music kraken with the -f flag."""),
Attribute(name="use_sponsor_block", default_value=True, description="Use sponsor block to remove adds or simmilar from the youtube videos."),
Attribute(name="player_url", default_value="https://music.youtube.com/s/player/80b90bfd/player_ias.vflset/en_US/base.js", description="""
This is needed to fetch videos without invidious
"""),
Attribute(name="youtube_music_consent_cookies", default_value={
"CONSENT": "PENDING+258"
}, description="The cookie with the key CONSENT says to what stuff you agree. Per default you decline all cookies, but it honestly doesn't matter."),
Attribute(name="youtube_music_innertube_context", default_value={
"client": {
"hl": "en",
"gl": "DE",
"remoteHost": "87.123.241.77",
"deviceMake": "",
"deviceModel": "",
"visitorData": "CgtiTUxaTHpoXzk1Zyia59WlBg%3D%3D",
"userAgent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"clientName": "WEB_REMIX",
"clientVersion": "1.20230710.01.00",
"osName": "X11",
"osVersion": "",
"originalUrl": "https://music.youtube.com/",
"platform": "DESKTOP",
"clientFormFactor": "UNKNOWN_FORM_FACTOR",
"configInfo": {
"appInstallData": "",
"coldConfigData": "",
"coldHashData": "",
"hotHashData": ""
},
"userInterfaceTheme": "USER_INTERFACE_THEME_DARK",
"timeZone": "Atlantic/Jan_Mayen",
"browserName": "Firefox",
"browserVersion": "115.0",
"acceptHeader": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"deviceExperimentId": "ChxOekkxTmpnek16UTRNVFl4TkRrek1ETTVOdz09EJrn1aUGGJrn1aUG",
"screenWidthPoints": 584,
"screenHeightPoints": 939,
"screenPixelDensity": 1,
"screenDensityFloat": 1,
"utcOffsetMinutes": 120,
"musicAppInfo": {
"pwaInstallabilityStatus": "PWA_INSTALLABILITY_STATUS_UNKNOWN",
"webDisplayMode": "WEB_DISPLAY_MODE_BROWSER",
"storeDigitalGoodsApiSupportStatus": {
"playStoreDigitalGoodsApiSupportStatus": "DIGITAL_GOODS_API_SUPPORT_STATUS_UNSUPPORTED"
}
}
},
"user": { "lockedSafetyMode": False },
"request": {
"useSsl": True,
"internalExperimentFlags": [],
"consistencyTokenJars": []
},
"adSignalsInfo": {
"params": []
}
}, description="Don't bother about this. It is something technical, but if you wanna change the innertube requests... go on."),
Attribute(name="ytcfg", description="Please... ignore it.", default_value={})
), LOCATIONS.get_config_file("youtube"))
class SettingsStructure(TypedDict):
use_youtube_alongside_youtube_music: bool
invidious_instance: ParseResult
piped_instance: ParseResult
sleep_after_youtube_403: float
youtube_music_api_key: str
youtube_music_clean_data: bool
youtube_url: List[ParseResult]
use_sponsor_block: bool
player_url: str
youtube_music_innertube_context: dict
youtube_music_consent_cookies: dict
ytcfg: dict

View File

@@ -0,0 +1,4 @@
def comment(uncommented_string: str) -> str:
_fragments = uncommented_string.split("\n")
_fragments = ["# " + frag for frag in _fragments]
return "\n".join(_fragments)

View File

@@ -0,0 +1 @@
from .source import SourcePages

View File

@@ -0,0 +1,26 @@
from enum import Enum
class AlbumStatus(Enum):
"""
Enum class representing the possible statuses of an album.
"""
UNRELEASED = "Unreleased"
RELEASED = "Released"
LEAKED = "Leaked"
OFFICIAL = "Official"
BOOTLEG = "Bootleg"
class AlbumType(Enum):
"""
Enum class representing the possible types of an album.
"""
STUDIO_ALBUM = "Studio Album"
EP = "EP (Extended Play)"
SINGLE = "Single"
LIVE_ALBUM = "Live Album"
COMPILATION_ALBUM = "Compilation Album"
MIXTAPE = "Mixtape"
DEMO = "Demo"
OTHER = "Other"

View File

@@ -0,0 +1,19 @@
from enum import Enum
class BColors(Enum):
# https://stackoverflow.com/a/287944
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
GREY = "\x1b[38;20m"
YELLOW = "\x1b[33;20m"
RED = "\x1b[31;20m"
BOLD_RED = "\x1b[31;1m"

View File

@@ -0,0 +1,7 @@
from enum import Enum
class ContactMethod(Enum):
EMAIL = "email"
PHONE = "phone"
FAX = "fax"

View File

@@ -0,0 +1,50 @@
from enum import Enum
class SourceTypes(Enum):
SONG = "song"
ALBUM = "album"
ARTIST = "artist"
LYRICS = "lyrics"
class SourcePages(Enum):
YOUTUBE = "youtube"
MUSIFY = "musify"
YOUTUBE_MUSIC = "youtube music"
GENIUS = "genius"
MUSICBRAINZ = "musicbrainz"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp"
DEEZER = "deezer"
SPOTIFY = "spotify"
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = "wikipedia"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
TWITTER = "twitter" # I will use nitter though lol
MYSPACE = "myspace" # Yes somehow this ancient site is linked EVERYWHERE
MANUAL = "manual"
PRESET = "preset"
@classmethod
def get_homepage(cls, attribute) -> str:
homepage_map = {
cls.YOUTUBE: "https://www.youtube.com/",
cls.MUSIFY: "https://musify.club/",
cls.MUSICBRAINZ: "https://musicbrainz.org/",
cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/",
cls.GENIUS: "https://genius.com/",
cls.BANDCAMP: "https://bandcamp.com/",
cls.DEEZER: "https://www.deezer.com/",
cls.INSTAGRAM: "https://www.instagram.com/",
cls.FACEBOOK: "https://www.facebook.com/",
cls.SPOTIFY: "https://open.spotify.com/",
cls.TWITTER: "https://twitter.com/",
cls.MYSPACE: "https://myspace.com/",
cls.WIKIPEDIA: "https://en.wikipedia.org/wiki/Main_Page"
}
return homepage_map[attribute]

View File

@@ -0,0 +1 @@
__all__ = ["config"]

View File

@@ -0,0 +1,28 @@
class SettingException(Exception):
pass
class SettingNotFound(SettingException):
def __init__(self, setting_name: str):
self.setting_name = setting_name
def __str__(self):
return f"Setting '{self.setting_name}' not found."
class SettingValueError(SettingException):
def __init__(self, setting_name: str, setting_value: str, rule: str):
"""
The rule has to be such, that the following format makes sense:
{name} {rule}, not '{value}'
:param setting_name:
:param setting_value:
:param rule:
"""
self.setting_name = setting_name
self.setting_value = setting_value
self.rule = rule
def __str__(self):
return f"{self.setting_name} {self.rule}, not '{self.setting_value}'."

View File

@@ -0,0 +1,11 @@
class DownloadException(Exception):
pass
class UrlNotFoundException(DownloadException):
def __init__(self, url: str, *args: object) -> None:
self.url = url
super().__init__(*args)
def __str__(self) -> str:
return f"Couldn't find the page of {self.url}"

View File

@@ -0,0 +1,10 @@
class ObjectException(Exception):
pass
class IsDynamicException(Exception):
"""
Gets raised, if a dynamic data object tries to perform an action,
which does not make sense for a dynamic object.
"""
pass

View File

@@ -0,0 +1,303 @@
# -*- encoding: utf-8 -*-
# merge_args v0.1.5
# Merge signatures of two functions with Advanced Hackery.
# Copyright © 2018-2023, Chris Warrick.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the author of this software nor the names of
# contributors to this software may be used to endorse or promote
# products derived from this software without specific prior written
# consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Merge signatures of two functions with Advanced Hackery. Useful for wrappers.
Usage: @merge_args(old_function)
"""
import weakref
from types import FunctionType
from functools import wraps
from typing import Dict, Set
import inspect
import itertools
import types
import functools
import sys
import typing
__version__ = '0.1.5'
__all__ = ('merge_args',)
PY38 = sys.version_info >= (3, 8)
PY310 = sys.version_info >= (3, 10)
PY311 = sys.version_info >= (3, 11)
def _blank(): # pragma: no cover
pass
def _merge(
source,
dest,
drop_args: typing.Optional[typing.List[str]] = None,
drop_kwonlyargs: typing.Optional[typing.List[str]] = None,
):
"""Merge the signatures of ``source`` and ``dest``.
``dest`` args go before ``source`` args in all three categories
(positional, keyword-maybe, keyword-only).
"""
if drop_args is None:
drop_args = []
if drop_kwonlyargs is None:
drop_kwonlyargs = []
source_spec = inspect.getfullargspec(source)
dest_spec = inspect.getfullargspec(dest)
if source_spec.varargs or source_spec.varkw:
return dest
source_all = source_spec.args
dest_all = dest_spec.args
if source_spec.defaults:
source_pos = source_all[:-len(source_spec.defaults)]
source_kw = source_all[-len(source_spec.defaults):]
else:
source_pos = source_all
source_kw = []
if dest_spec.defaults:
dest_pos = dest_all[:-len(dest_spec.defaults)]
dest_kw = dest_all[-len(dest_spec.defaults):]
else:
dest_pos = dest_all
dest_kw = []
args_merged = dest_pos
for a in source_pos:
if a not in args_merged and a not in drop_args:
args_merged.append(a)
defaults_merged = []
for a, default in itertools.chain(
zip(dest_kw, dest_spec.defaults or []),
zip(source_kw, source_spec.defaults or [])
):
if a not in args_merged and a not in drop_args:
args_merged.append(a)
defaults_merged.append(default)
kwonlyargs_merged = dest_spec.kwonlyargs
for a in source_spec.kwonlyargs:
if a not in kwonlyargs_merged and a not in drop_kwonlyargs:
kwonlyargs_merged.append(a)
args_all = tuple(args_merged + kwonlyargs_merged)
if PY38:
replace_kwargs = {
'co_argcount': len(args_merged),
'co_kwonlyargcount': len(kwonlyargs_merged),
'co_posonlyargcount': dest.__code__.co_posonlyargcount,
'co_nlocals': len(args_all),
'co_flags': source.__code__.co_flags,
'co_varnames': args_all,
'co_filename': dest.__code__.co_filename,
'co_name': dest.__code__.co_name,
'co_firstlineno': dest.__code__.co_firstlineno,
}
if PY310:
replace_kwargs['co_linetable'] = dest.__code__.co_linetable
else:
replace_kwargs['co_lnotab'] = dest.__code__.co_lnotab
if PY311:
replace_kwargs['co_exceptiontable'] = dest.__code__.co_exceptiontable
replace_kwargs['co_qualname'] = dest.__code__.co_qualname
passer_code = _blank.__code__.replace(**replace_kwargs)
else:
passer_args = [
len(args_merged),
len(kwonlyargs_merged),
_blank.__code__.co_nlocals,
_blank.__code__.co_stacksize,
source.__code__.co_flags,
_blank.__code__.co_code, (), (),
args_all, dest.__code__.co_filename,
dest.__code__.co_name,
dest.__code__.co_firstlineno,
dest.__code__.co_lnotab,
]
passer_code = types.CodeType(*passer_args)
passer = types.FunctionType(passer_code, globals())
dest.__wrapped__ = passer
# annotations
# ensure we take destinations return annotation
has_dest_ret = 'return' in dest.__annotations__
if has_dest_ret:
dest_ret = dest.__annotations__['return']
for v in ('__kwdefaults__', '__annotations__'):
out = getattr(source, v)
if out is None:
out = {}
if getattr(dest, v) is not None:
out = out.copy()
out.update(getattr(dest, v))
setattr(passer, v, out)
if has_dest_ret:
passer.__annotations__['return'] = dest_ret
dest.__annotations__ = passer.__annotations__
passer.__defaults__ = tuple(defaults_merged)
if not dest.__doc__:
dest.__doc__ = source.__doc__
return dest
def merge_args(
source,
drop_args: typing.Optional[typing.List[str]] = None,
drop_kwonlyargs: typing.Optional[typing.List[str]] = None,
):
"""Merge the signatures of two functions."""
try:
return functools.partial(
lambda x, y: _merge(x, y, drop_args, drop_kwonlyargs), source
)
except TypeError:
pass
class Lake:
def __init__(self):
self.redirects: Dict[int, int] = {}
self.id_to_object: Dict[int, object] = {}
def get_real_object(self, db_object: object) -> object:
_id = id(db_object)
while _id in self.redirects:
_id = self.redirects[_id]
try:
return self.id_to_object[_id]
except KeyError:
self.add(db_object)
return db_object
def add(self, db_object: object):
self.id_to_object[id(db_object)] = db_object
def override(self, to_override: object, new_db_object: object):
_id = id(to_override)
while _id in self.redirects:
_id = self.redirects[_id]
if id(new_db_object) in self.id_to_object:
print("!!!!!")
self.add(new_db_object)
self.redirects[_id] = id(new_db_object)
# if _id in self.id_to_object:
# del self.id_to_object[_id]
def is_same(self, __object: object, other: object) -> bool:
_self_id = id(__object)
while _self_id in self.redirects:
_self_id = self.redirects[_self_id]
_other_id = id(other)
while _other_id in self.redirects:
_other_id = self.redirects[_other_id]
return _self_id == _other_id
lake = Lake()
def wrapper(method):
@wraps(method)
def wrapped(*args, **kwargs):
return method(*(lake.get_real_object(args[0]), *args[1:]), **kwargs)
return wrapped
class BaseClass:
def __new__(cls, *args, **kwargs):
instance = cls(*args, **kwargs)
print("new")
lake.add(instance)
return instance
def __eq__(self, other):
return lake.is_same(self, other)
def _risky_merge(self, to_replace):
lake.override(to_replace, self)
class MetaClass(type):
def __new__(meta, classname, bases, classDict):
bases = (*bases, BaseClass)
newClassDict = {}
ignore_functions: Set[str] = {"__new__", "__init__"}
for attributeName, attribute in classDict.items():
if isinstance(attribute, FunctionType) and (attributeName not in ignore_functions):
"""
The funktion new and init shouldn't be accounted for because we can assume the class is
independent on initialization.
"""
attribute = wrapper(attribute)
newClassDict[attributeName] = attribute
print()
for key, value in object.__dict__.items():
# hasattr( value, '__call__' ) and
if hasattr(value, '__call__') and value not in newClassDict and key not in ("__new__", "__init__"):
newClassDict[key] = wrapper(value)
new_instance = type.__new__(meta, classname, bases, newClassDict)
lake.add(new_instance)
return new_instance

View File

@@ -0,0 +1,3 @@
from .locations import Locations
LOCATIONS = Locations()

View File

@@ -0,0 +1,7 @@
from pathlib import Path
import platformdirs
def get_config_directory(application_name: str) -> Path:
return platformdirs.user_config_path(appname=application_name)

View File

@@ -0,0 +1,88 @@
import configparser
from pathlib import Path
import os
from os.path import expandvars
import logging
from sys import platform
import tempfile
from typing import Optional
from pyffmpeg import FFmpeg
from .music_directory import get_music_directory
from .config_directory import get_config_directory
class Locations:
@staticmethod
def _get_env(key: str, default: Path, default_for_windows: bool = True) -> Optional[Path]:
res = os.environ.get(key.upper())
if res is not None:
return res
xdg_user_dirs_file = os.environ.get("XDG_CONFIG_HOME") or Path(Path.home(), ".config", "user-dirs.dirs")
xdg_user_dirs_default_file = Path("/etc/xdg/user-dirs.defaults")
def get_dir_from_xdg_file(xdg_file_path: os.PathLike) -> Optional[Path]:
nonlocal key
try:
with open(xdg_file_path, 'r') as f:
data = "[XDG_USER_DIRS]\n" + f.read()
config = configparser.ConfigParser(allow_no_value=True)
config.read_string(data)
xdg_config = config['XDG_USER_DIRS']
return Path(expandvars(xdg_config[key.lower()].strip('"')))
except (FileNotFoundError, KeyError) as e:
logging.warning(
f"Missing file or No entry found for \"{key}\" in: \"{xdg_file_path}\".\n"
)
logging.debug(str(e))
res = get_dir_from_xdg_file(xdg_user_dirs_file)
if res is not None:
return res
res = get_dir_from_xdg_file(xdg_user_dirs_default_file)
if res is not None:
return res
logging.warning(f"couldn't find a {key}, falling back to: {default}")
if not default_for_windows and platform == "linux":
return
return default
def __init__(self, application_name: os.PathLike = "music-kraken"):
self.FILE_ENCODING: str = "utf-8"
self.TEMP_DIRECTORY = Path(tempfile.gettempdir(), application_name)
self.TEMP_DIRECTORY.mkdir(exist_ok=True, parents=True)
self.MUSIC_DIRECTORY = get_music_directory()
self.CONFIG_DIRECTORY = get_config_directory(str(application_name))
self.CONFIG_DIRECTORY.mkdir(exist_ok=True, parents=True)
self.CONFIG_FILE = Path(self.CONFIG_DIRECTORY, f"{application_name}.conf")
self.LEGACY_CONFIG_FILE = Path(self.CONFIG_DIRECTORY, f"{application_name}.conf")
self.CACHE_DIRECTORY = self._get_env("XDG_CACHE_HOME", Path(Path.home(), ".cache"))
if self.CACHE_DIRECTORY is None:
logging.warning(f"Could not find a cache dir. Falling back to the temp dir: {self.TEMP_DIRECTORY}")
self.CACHE_DIRECTORY = self.TEMP_DIRECTORY
else:
self.CACHE_DIRECTORY = Path(self.CACHE_DIRECTORY, application_name)
self.CACHE_DIRECTORY.mkdir(parents=True, exist_ok=True)
self.FFMPEG_BIN = Path(FFmpeg(enable_log=False).get_ffmpeg_bin())
def get_config_file(self, config_name: str) -> Path:
return Path(self.CONFIG_DIRECTORY, f"{config_name}.toml")
def get_log_file(self, file_name: os.PathLike) -> Path:
return Path(self.TEMP_DIRECTORY, file_name)

View File

@@ -0,0 +1,58 @@
import os
from pathlib import Path
from typing import Optional
from sys import platform
import logging
from os.path import expandvars
import configparser
DEFAULT_MUSIC_DIRECTORY = Path(Path.home(), "Music")
def get_xdg_music_directory() -> Path:
"""
gets the xdg music directory, for all the linux or bsd folks!
Thanks to Distant Thunder, as well as Kevin Gruber for making that pull request:
https://github.com/HeIIow2/music-downloader/pull/6
XDG_USER_DIRS_FILE reference:
https://freedesktop.org/wiki/Software/xdg-user-dirs/
https://web.archive.org/web/20230322012953/https://freedesktop.org/wiki/Software/xdg-user-dirs/
"""
xdg_user_dirs_file = Path(os.environ.get("XDG_CONFIG_HOME") or Path(Path.home(), ".config"), "user-dirs.dirs")
xdg_user_dirs_default_file = Path("/etc/xdg/user-dirs.defaults")
def get_music_dir_from_xdg_file(xdg_file_path: os.PathLike) -> Optional[Path]:
try:
with open(xdg_file_path, 'r', encoding="utf-8") as f:
data = "[XDG_USER_DIRS]\n" + f.read()
config = configparser.ConfigParser(allow_no_value=True)
config.read_string(data)
xdg_config = config['XDG_USER_DIRS']
return Path(expandvars(xdg_config['xdg_music_dir'].strip('"')))
except (FileNotFoundError, KeyError) as e:
logging.warning(
f"Missing file or No entry found for \"xdg_music_dir\" in: \"{xdg_file_path}\".\n"
)
logging.debug(str(e))
music_dir = get_music_dir_from_xdg_file(xdg_user_dirs_file)
if music_dir is not None:
return music_dir
music_dir = get_music_dir_from_xdg_file(xdg_user_dirs_default_file)
if music_dir is not None:
return music_dir
logging.warning(f"couldn't find a XDG music dir, falling back to: {DEFAULT_MUSIC_DIRECTORY}")
return DEFAULT_MUSIC_DIRECTORY
def get_music_directory() -> Path:
if platform != "linux":
return DEFAULT_MUSIC_DIRECTORY
return get_xdg_music_directory()

View File

@@ -0,0 +1,50 @@
import random
from dotenv import load_dotenv
from pathlib import Path
import os
from .path_manager import LOCATIONS
from .config import main_settings
if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
load_dotenv(Path(__file__).parent.parent.parent / ".env.example")
__stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and True
if DEBUG:
print("DEBUG ACTIVE")
def get_random_message() -> str:
return random.choice(main_settings['happy_messages'])
CONFIG_DIRECTORY = LOCATIONS.CONFIG_DIRECTORY
HIGHEST_ID = 2 ** main_settings['id_bits']
HELP_MESSAGE = """to search:
> s: {query or url}
> s: https://musify.club/release/some-random-release-183028492
> s: #a {artist} #r {release} #t {track}
to download:
> d: {option ids or direct url}
> d: 0, 3, 4
> d: 1
> d: https://musify.club/release/some-random-release-183028492
have fun :3""".strip()
# regex pattern
URL_PATTERN = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+"
INT_PATTERN = r"^\d*$"
FLOAT_PATTERN = r"^[\d|\,|\.]*$"

View File

@@ -0,0 +1,142 @@
from typing import Tuple, Union
from pathlib import Path
import string
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)",
)
def unify(string: str) -> str:
"""
returns a unified str, to make comparisons easy.
a unified string has the following attributes:
- is lowercase
"""
try:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
return string.lower()
def fit_to_file_system(string: Union[str, Path]) -> Union[str, Path]:
def fit_string(string: str) -> str:
if string == "/":
return "/"
string = string.strip()
while string[0] == ".":
if len(string) == 0:
return string
string = string[1:]
string = string.replace("/", "_").replace("\\", "_")
string = sanitize_filename(string)
return string
if isinstance(string, Path):
return Path(*(fit_string(part) for part in string.parts))
else:
return fit_string(string)
def clean_song_title(raw_song_title: str, artist_name: str) -> str:
"""
This function cleans common naming "conventions" for non clean song titles, like the title of youtube videos
cleans:
- `artist - song` -> `song`
- `song (Official Video)` -> `song`
- ` song` -> `song`
- `song (prod. some producer)`
"""
raw_song_title = raw_song_title.strip()
artist_name = artist_name.strip()
# Clean official Video appendix
for dirty_appendix in COMMON_TITLE_APPENDIX_LIST:
if raw_song_title.lower().endswith(dirty_appendix):
raw_song_title = raw_song_title[:-len(dirty_appendix)].strip()
# Remove artist from the start of the title
if raw_song_title.lower().startswith(artist_name.lower()):
raw_song_title = raw_song_title[len(artist_name):].strip()
if raw_song_title.startswith("-"):
raw_song_title = raw_song_title[1:].strip()
return raw_song_title.strip()
def comment(uncommented_string: str) -> str:
_fragments = uncommented_string.split("\n")
_fragments = ["# " + frag for frag in _fragments]
return "\n".join(_fragments)
# comparisons
TITLE_THRESHOLD_LEVENSHTEIN = 1
UNIFY_TO = " "
ALLOWED_LENGTH_DISTANCE = 20
def unify_punctuation(to_unify: str) -> str:
for char in string.punctuation:
to_unify = to_unify.replace(char, UNIFY_TO)
return to_unify
def remove_feature_part_from_track(title: str) -> str:
if ")" != title[-1]:
return title
if "(" not in title:
return title
return title[:title.index("(")]
def modify_title(to_modify: str) -> str:
to_modify = to_modify.strip()
to_modify = to_modify.lower()
to_modify = remove_feature_part_from_track(to_modify)
to_modify = unify_punctuation(to_modify)
return to_modify
def match_titles(title_1: str, title_2: str):
title_1, title_2 = modify_title(title_1), modify_title(title_2)
distance = jellyfish.levenshtein_distance(title_1, title_2)
return distance > TITLE_THRESHOLD_LEVENSHTEIN, distance
def match_artists(artist_1, artist_2: str):
if type(artist_1) == list:
distances = []
for artist_1_ in artist_1:
match, distance = match_titles(artist_1_, artist_2)
if not match:
return match, distance
distances.append(distance)
return True, min(distances)
return match_titles(artist_1, artist_2)
def match_length(length_1: int | None, length_2: int | None) -> bool:
# returning true if either one is Null, because if one value is not known,
# then it shouldn't be an attribute which could reject an audio source
if length_1 is None or length_2 is None:
return True
return abs(length_1 - length_2) <= ALLOWED_LENGTH_DISTANCE

View File

@@ -0,0 +1,98 @@
from dataclasses import dataclass, field
from typing import List, Tuple
from ...utils.config import main_settings, logging_settings
from ...utils.enums.colors import BColors
from ...objects import Target
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
UNIT_DIVISOR = 1024
LOGGER = logging_settings["download_logger"]
@dataclass
class DownloadResult:
total: int = 0
fail: int = 0
sponsor_segments: int = 0
error_message: str = None
total_size = 0
found_on_disk: int = 0
_error_message_list: List[str] = field(default_factory=list)
@property
def success(self) -> int:
return self.total - self.fail
@property
def success_percentage(self) -> float:
if self.total == 0:
return 0
return self.success / self.total
@property
def failure_percentage(self) -> float:
if self.total == 0:
return 1
return self.fail / self.total
@property
def is_fatal_error(self) -> bool:
return self.error_message is not None
@property
def is_mild_failure(self) -> bool:
if self.is_fatal_error:
return True
return self.failure_percentage > main_settings["show_download_errors_threshold"]
def _size_val_unit_pref_ind(self, val: float, ind: int) -> Tuple[float, int]:
if val < UNIT_DIVISOR:
return val, ind
if ind >= len(UNIT_PREFIXES):
return val, ind
return self._size_val_unit_pref_ind(val=val / UNIT_DIVISOR, ind=ind + 1)
@property
def formated_size(self) -> str:
total_size, prefix_index = self._size_val_unit_pref_ind(self.total_size, 0)
return f"{total_size:.{2}f} {UNIT_PREFIXES[prefix_index]}B"
def add_target(self, target: Target):
self.total_size += target.size
def merge(self, other: "DownloadResult"):
if other.is_fatal_error:
LOGGER.debug(other.error_message)
self._error_message_list.append(other.error_message)
self.total += 1
self.fail += 1
else:
self.total += other.total
self.fail += other.fail
self._error_message_list.extend(other._error_message_list)
self.sponsor_segments += other.sponsor_segments
self.total_size += other.total_size
self.found_on_disk += other.found_on_disk
def __str__(self):
if self.is_fatal_error:
return self.error_message
head = f"{self.fail} from {self.total} downloads failed:\n" \
f"success-rate:\t{int(self.success_percentage * 100)}%\n" \
f"fail-rate:\t{int(self.failure_percentage * 100)}%\n" \
f"total size:\t{self.formated_size}\n" \
f"skipped segments:\t{self.sponsor_segments}\n" \
f"found on disc:\t{self.found_on_disk}"
if not self.is_mild_failure:
return head
_lines = [head]
_lines.extend(BColors.FAIL.value + s + BColors.ENDC.value for s in self._error_message_list)
return "\n".join(_lines)

View File

@@ -0,0 +1,32 @@
from typing import Optional, List
from ...objects import Artist, Album, Song, DatabaseObject
class Query:
def __init__(
self,
raw_query: str = "",
music_object: DatabaseObject = None
) -> None:
self.raw_query: str = raw_query
self.music_object: Optional[DatabaseObject] = music_object
@property
def is_raw(self) -> bool:
return self.music_object is None
@property
def default_search(self) -> List[str]:
if self.music_object is None:
return [self.raw_query]
if isinstance(self.music_object, Artist):
return [self.music_object.name]
if isinstance(self.music_object, Song):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.main_artist_collection]
if isinstance(self.music_object, Album):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]
return [self.raw_query]