fixed the path naming

This commit is contained in:
Hellow2 2023-06-15 11:28:35 +02:00
parent 805017fea5
commit 1d0e10a022
4 changed files with 81 additions and 43 deletions

View File

@ -18,4 +18,4 @@ if __name__ == "__main__":
"8" "8"
] ]
music_kraken.cli(genre="test", command_list=youtube_search) music_kraken.cli(genre="test", command_list=direct_download)

View File

@ -404,6 +404,10 @@ class Album(MainObject):
""" """
return len(self.artist_collection) > 1 return len(self.artist_collection) > 1
@property
def album_type_string(self) -> str:
return self.album_type.value
""" """
All objects dependent on Artist All objects dependent on Artist

View File

@ -2,8 +2,7 @@ import logging
import random import random
from copy import copy from copy import copy
from typing import Optional, Union, Type, Dict, Set, List, Tuple from typing import Optional, Union, Type, Dict, Set, List, Tuple
import threading from string import Formatter
from queue import Queue
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@ -24,7 +23,7 @@ from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec from ..audio import write_metadata_to_target, correct_codec
from ..utils import shared from ..utils import shared
from ..utils.shared import DEFAULT_VALUES, DOWNLOAD_PATH, DOWNLOAD_FILE, THREADED from ..utils.shared import DEFAULT_VALUES, DOWNLOAD_PATH, DOWNLOAD_FILE, THREADED, AUDIO_FORMAT
from ..utils.support_classes import Query, DownloadResult, DefaultTarget, EndThread, FinishedSearch from ..utils.support_classes import Query, DownloadResult, DefaultTarget, EndThread, FinishedSearch
@ -32,6 +31,66 @@ INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]] INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
class NamingDict(dict):
CUSTOM_KEYS: Dict[str, str] = {
"label": "label.name",
"artist": "artist.name",
"song": "song.title",
"isrc": "song.isrc",
"album": "album.title",
"album_type": "album.album_type_string"
}
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = AUDIO_FORMAT
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
def __getitem__(self, key: str) -> str:
return super().__getitem__(key)
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
def __missing__(self, key: str) -> str:
"""
TODO
add proper logging
"""
if "." not in key:
if key not in self.CUSTOM_KEYS:
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
frag_list = key.split(".")
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
return str(value)
except AttributeError:
return self.default_value_for_name(attribute_name)
def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]): def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
if type(music_object) == Label: if type(music_object) == Label:
return _clean_label(label=music_object, collections=collections) return _clean_label(label=music_object, collections=collections)
@ -262,38 +321,37 @@ class Page:
return Label() return Label()
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False) -> DownloadResult: def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False) -> DownloadResult:
naming_objects = {"genre": genre} naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject): def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_objects nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_ATTRIBUTES: for collection_name in naming_music_object.UPWARDS_COLLECTION_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name) collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty: if collection.empty:
continue continue
if collection.element_type in naming_objects:
continue
dom_ordered_music_object: DatabaseObject = collection[0] dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object) return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object) fill_naming_objects(music_object)
return self._download(music_object, naming_objects, download_all) return self._download(music_object, naming_dict, download_all)
def _download(self, music_object: DatabaseObject, naming_objects: Dict[Union[Type[DatabaseObject], str], DatabaseObject], download_all: bool = False) -> DownloadResult: def _download(self, music_object: DatabaseObject, naming_dict: NamingDict, download_all: bool = False) -> DownloadResult:
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False # Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album): if isinstance(music_object, Album):
if not download_all and music_object.album_type in shared.ALBUM_TYPE_BLACKLIST: if not download_all and music_object.album_type in shared.ALBUM_TYPE_BLACKLIST:
return DownloadResult() return DownloadResult()
self.fetch_details(music_object=music_object, stop_at_level=2) self.fetch_details(music_object=music_object, stop_at_level=2)
naming_objects[type(music_object)] = music_object naming_dict.add_object(music_object)
if isinstance(music_object, Song): if isinstance(music_object, Song):
return self._download_song(music_object, naming_objects) return self._download_song(music_object, naming_dict)
download_result: DownloadResult = DownloadResult() download_result: DownloadResult = DownloadResult()
@ -302,35 +360,17 @@ class Page:
sub_ordered_music_object: DatabaseObject sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection: for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_objects.copy(), download_all)) download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all))
return download_result return download_result
def _download_song(self, song: Song, naming_objects: Dict[Type[DatabaseObject], Union[DatabaseObject, str]]): def _download_song(self, song: Song, naming_dict: NamingDict):
name_attribute = DEFAULT_VALUES.copy() path_parts = Formatter().parse(DOWNLOAD_PATH)
file_parts = Formatter().parse(DOWNLOAD_FILE)
# song
if "genre" in naming_objects:
name_attribute["genre"] = naming_objects["genre"]
name_attribute["song"] = song.title
if Album in naming_objects:
album: Album = naming_objects[Album]
name_attribute["album"] = album.title
name_attribute["album_type"] = album.album_type.value
if Artist in naming_objects:
artist: Artist = naming_objects[Artist]
naming_objects["artist"] = artist.name
if Label in naming_objects:
label: Label = naming_objects[Label]
naming_objects["label"] = label.name
new_target = Target( new_target = Target(
relative_to_music_dir=True, relative_to_music_dir=True,
path=DOWNLOAD_PATH.format(**name_attribute), path=DOWNLOAD_PATH.format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
file=DOWNLOAD_FILE.format(**name_attribute) file=DOWNLOAD_FILE.format(**{part[1]: naming_dict[part[1]] for part in file_parts})
) )
if song.target_collection.empty: if song.target_collection.empty:

View File

@ -192,12 +192,6 @@ class YouTube(Page):
return artist_list return artist_list
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def _fetch_song_from_id(self, youtube_id: str) -> Tuple[Song, Optional[int]]: def _fetch_song_from_id(self, youtube_id: str) -> Tuple[Song, Optional[int]]:
# https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY # https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY
r = self.connection.get(get_invidious_url(path=f"/api/v1/videos/{youtube_id}")) r = self.connection.get(get_invidious_url(path=f"/api/v1/videos/{youtube_id}"))