Merge pull request 'fix/reindex_before_collection' (#21) from fix/reindex_before_collection into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

Reviewed-on: #21
This commit is contained in:
Hazel 2024-05-06 17:36:27 +00:00
commit 3be6c71dcd
23 changed files with 712 additions and 430 deletions

View File

@ -16,6 +16,7 @@
},
"python.formatting.provider": "none",
"cSpell.words": [
"albumsort",
"APIC",
"Bandcamp",
"dotenv",
@ -28,9 +29,11 @@
"pathvalidate",
"Referer",
"sponsorblock",
"tracklist",
"tracksort",
"translit",
"unmap",
"youtube"
"youtube",
"youtubei"
]
}

View File

@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Crystal F",
"d: 20",
"s: #a Psychonaut 4",
"d: 0"
]

View File

@ -2,30 +2,24 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist 3"),
]
song_1 = Song(
title="song",
feature_artist_list=[Artist(
name="main_artist"
)]
)
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
],
artist_list=[
Artist(name="artist"),
]
other_artist = Artist(name="other_artist")
song_2 = Song(
title = "song",
main_artist_list=[other_artist]
)
album_1.merge(album_2)
other_artist.name = "main_artist"
print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
song_1.merge(song_2)
print(id(album_1.artist_collection), id(album_2.artist_collection))
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
print("#" * 120)
print("main", *song_1.main_artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@ -79,7 +79,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
with temp_target.open("wb") as f:
f.write(r.content)
converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}")
with Image.open(temp_target.file_path) as img:
# crop the image if it isn't square in the middle with minimum data loss
width, height = img.size

View File

@ -53,9 +53,9 @@ class Artwork:
def get_variant_name(self, variant: ArtworkVariant) -> str:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
def __merge__(self, other: Artwork, override: bool = False) -> None:
def __merge__(self, other: Artwork, **kwargs) -> None:
for key, value in other._variant_mapping.items():
if key not in self._variant_mapping or override:
if key not in self._variant_mapping:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool:

View File

@ -1,9 +1,10 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from .parents import OuterProxy
from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy)
@ -13,8 +14,8 @@ class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
_indexed_from_id: Dict[int, Dict[str, Any]]
_indexed_values: Dict[str, Dict[Any, T]]
shallow_list = property(fget=lambda self: self.data)
@ -36,8 +37,8 @@ class Collection(Generic[T]):
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self._id_to_index_values: Dict[int, set] = defaultdict(set)
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
@ -47,9 +48,10 @@ class Collection(Generic[T]):
self.extend(data)
def __repr__(self) -> str:
return f"Collection({id(self)})"
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
def _map_element(self, __object: T, from_map: bool = False):
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
@ -74,73 +76,129 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id]
def _find_object(self, __object: T) -> Optional[T]:
def _remap(self):
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
if __object.id in self._indexed_from_id:
return self._indexed_values["id"][__object.id]
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
return None
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param __object:
:param already_is_parent:
:param from_map:
:param other:
:return:
"""
if __object is None:
if other is None:
return
if other.id in self._indexed_from_id:
return
existing_object = self._find_object(__object)
object_trace(f"Appending {other.option_string} to {self}")
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
other.merge(r, **kwargs)
c.remove(r, existing=r, **kwargs)
break
existing_object = self._find_object(other)
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
if existing_object is None:
# append
self._data.append(__object)
self._map_element(__object)
self._append_new_object(other, **kwargs)
else:
existing_object.merge(other, **kwargs)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection)
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs):
for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
"""
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object)
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
"""
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]")
self._data.remove(existing)
self._unmap_element(existing)
data_to_extend = b.data
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
a._collection_for.update(b._collection_for)
for synced_with, key in b._collection_for.items():
synced_with.__setattr__(key, a)
a.extend(data_to_extend)
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return
old_id = existing_object.id
existing_object.merge(__object)
if existing_object.id != old_id:
self._unmap_element(old_id)
self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
for other_object in other_collections:
self.append(other_object, **kwargs)
@property
def data(self) -> List[T]:
@ -156,8 +214,9 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other)
def __merge__(self, other: Collection, **kwargs):
object_trace(f"merging {str(self)} | {str(other)}")
self.extend(other, **kwargs)
def __getitem__(self, item: int):
return self._data[item]
@ -166,3 +225,9 @@ class Collection(Generic[T]):
if item >= len(self._data):
return default
return self._data[item]
def __eq__(self, other: Collection) -> bool:
if self.empty and other.empty:
return True
return self._data == other._data

View File

@ -9,9 +9,9 @@ from pathlib import Path
import inspect
from .metadata import Metadata
from ..utils import get_unix_time, object_trace
from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"]
@ -29,6 +29,9 @@ class InnerData:
"""
_refers_to_instances: set = None
"""
Attribute versions keep track, of if the attribute has been changed.
"""
def __init__(self, object_type, **kwargs):
self._refers_to_instances = set()
@ -42,21 +45,28 @@ class InnerData:
for key, value in kwargs.items():
if hasattr(value, "__is_collection__"):
value._collection_for[self] = key
self.__setattr__(key, value)
def __hash__(self):
return self.id
def __merge__(self, __other: InnerData, override: bool = False):
def __merge__(self, __other: InnerData, **kwargs):
"""
:param __other:
:param override:
:return:
"""
self._fetched_from.update(__other._fetched_from)
for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
continue
if hasattr(value, "__is_collection__") and key in self.__dict__:
self.__getattribute__(key).__merge__(value, **kwargs)
continue
# just set the other value if self doesn't already have it
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
self.__setattr__(key, value)
@ -64,13 +74,8 @@ class InnerData:
# if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key)
if hasattr(type(existing), "__merge__"):
existing.__merge__(value, override)
continue
# override the existing value if requested
if override:
self.__setattr__(key, value)
if hasattr(existing, "__merge__"):
existing.__merge__(value, **kwargs)
class OuterProxy:
@ -84,8 +89,6 @@ class OuterProxy:
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
TITEL = "id"
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
_automatic_id: bool = False
@ -94,7 +97,7 @@ class OuterProxy:
generates a random integer id
the range is defined in the config
"""
_id = random.randint(0, HIGHEST_ID)
_id = generate_id()
_automatic_id = True
kwargs["automatic_id"] = _automatic_id
@ -116,7 +119,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
object_trace(f"creating {type(self).__name__} [{self.option_string}]")
self.__init_collections__()
@ -173,13 +176,12 @@ class OuterProxy:
def __eq__(self, other: Any):
return self.__hash__() == other.__hash__()
def merge(self, __other: Optional[OuterProxy], override: bool = False):
def merge(self, __other: Optional[OuterProxy], **kwargs):
"""
1. merges the data of __other in self
2. replaces the data of __other with the data of self
:param __other:
:param override:
:return:
"""
if __other is None:
@ -196,7 +198,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
object_trace(f"merging {a.option_string} | {b.option_string}")
old_inner = b._inner
@ -204,11 +206,11 @@ class OuterProxy:
instance._inner = a._inner
a._inner._refers_to_instances.add(instance)
a._inner.__merge__(old_inner, override=override)
a._inner.__merge__(old_inner, **kwargs)
del old_inner
def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
self.merge(__other, override)
def __merge__(self, __other: Optional[OuterProxy], **kwargs):
self.merge(__other, **kwargs)
def mark_as_fetched(self, *url_hash_list: List[str]):
for url_hash in url_hash_list:
@ -235,7 +237,23 @@ class OuterProxy:
@property
def options(self) -> List[P]:
return [self]
r = []
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
r.append(self)
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = []
@property
def indexing_values(self) -> List[Tuple[str, object]]:
@ -267,9 +285,10 @@ class OuterProxy:
return r
TITEL = "id"
@property
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL))
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"

View File

@ -22,6 +22,7 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .parents import OuterProxy as Base
@ -43,7 +44,8 @@ def get_collection_string(
template: str,
ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND
foreground: BColors = OPTION_FOREGROUND,
add_id: bool = DEBUG_PRINT_ID,
) -> str:
if collection.empty:
return ""
@ -55,8 +57,15 @@ def get_collection_string(
r = background
def get_element_str(element) -> str:
nonlocal add_id
r = element.title_string.strip()
if add_id and False:
r += " " + str(element.id)
return r
element: Base
titel_list: List[str] = [element.title_string.strip() for element in collection if element.title_string not in ignore_titles]
titel_list: List[str] = [get_element_str(element) for element in collection if element.title_string not in ignore_titles]
for i, titel in enumerate(titel_list):
delimiter = ", "
@ -117,7 +126,7 @@ class Song(Base):
Base.__init__(**locals())
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title"
def __init_collections__(self) -> None:
@ -135,6 +144,9 @@ class Song(Base):
"feature_song_collection": self
}
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
return
@ -144,20 +156,21 @@ class Song(Base):
return
if isinstance(object_list, Artist):
self.main_artist_collection.extend(object_list)
self.feature_artist_collection.extend(object_list)
return
if isinstance(object_list, Album):
self.album_collection.extend(object_list)
return
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', unify(self.title)),
('isrc', self.isrc),
*[('url', source.url) for source in self.source_collection]
*self.source_collection.indexing_values(),
]
@property
@ -169,6 +182,8 @@ class Song(Base):
id3Mapping.GENRE: [self.genre],
id3Mapping.TRACKNUMBER: [self.tracksort_str],
id3Mapping.COMMENT: [self.note.markdown],
id3Mapping.FILE_WEBPAGE_URL: self.source_collection.url_list,
id3Mapping.SOURCE_WEBPAGE_URL: self.source_collection.homepage_list,
})
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
@ -189,7 +204,7 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
@ -269,7 +284,7 @@ class Album(Base):
**kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
def __init_collections__(self):
self.song_collection.append_object_to_attribute = {
@ -302,13 +317,14 @@ class Album(Base):
self.label_collection.extend(object_list)
return
INDEX_DEPENDS_ON = ("title", "barcode", "source_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', unify(self.title)),
('barcode', self.barcode),
*[('url', source.url) for source in self.source_collection]
*self.source_collection.indexing_values(),
]
@property
@ -333,7 +349,7 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
@ -341,12 +357,6 @@ class Album(Base):
r += f" with {len(self.song_collection)} songs"
return r
@property
def options(self) -> List[P]:
options = [*self.artist_collection, self, *self.song_collection]
return options
def update_tracksort(self):
"""
This updates the tracksort attributes, of the songs in
@ -372,18 +382,6 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i
def compile(self, merge_into: bool = False):
"""
compiles the recursive structures,
and does depending on the object some other stuff.
no need to override if only the recursive structure should be built.
override self.build_recursive_structures() instead
"""
self.update_tracksort()
self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into)
@property
def copyright(self) -> str:
if self.date is None:
@ -429,7 +427,7 @@ class Artist(Base):
lyrical_themes: List[str]
general_genre: str
unformated_location: str
unformatted_location: str
source_collection: SourceCollection
contact_collection: Collection[Contact]
@ -442,7 +440,7 @@ class Artist(Base):
"name": str,
"unified_name": lambda: None,
"country": lambda: None,
"unformated_location": lambda: None,
"unformatted_location": lambda: None,
"formed_in": ID3Timestamp,
"notes": FormattedText,
@ -461,17 +459,17 @@ class Artist(Base):
# This is automatically generated
def __init__(self, name: str = "", unified_name: str = None, country: Country = None,
formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None,
general_genre: str = None, unformated_location: str = None, source_list: List[Source] = None,
general_genre: str = None, unformatted_location: str = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None:
super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes,
lyrical_themes=lyrical_themes, general_genre=general_genre,
unformated_location=unformated_location, source_list=source_list, contact_list=contact_list,
unformatted_location=unformatted_location, source_list=source_list, contact_list=contact_list,
feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list,
**kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("feature_song_collection", "main_album_collection")
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self):
@ -504,12 +502,6 @@ class Artist(Base):
self.label_collection.extend(object_list)
return
@property
def options(self) -> List[P]:
options = [self, *self.main_album_collection.shallow_list, *self.feature_album]
print(options)
return options
def update_albumsort(self):
"""
This updates the albumsort attributes, of the albums in
@ -567,40 +559,27 @@ class Artist(Base):
# replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album)
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection],
*[('contact', contact.value) for contact in self.contact_collection]
*[('contact', contact.value) for contact in self.contact_collection],
*self.source_collection.indexing_values(),
]
@property
def metadata(self) -> Metadata:
metadata = Metadata({
id3Mapping.ARTIST: [self.name]
id3Mapping.ARTIST: [self.name],
id3Mapping.ARTIST_WEBPAGE_URL: self.source_collection.url_list,
})
metadata.merge_many([s.get_artist_metadata() for s in self.source_collection])
return metadata
"""
def __str__(self, include_notes: bool = False):
string = self.name or ""
if include_notes:
plaintext_notes = self.notes.get_plaintext()
if plaintext_notes is not None:
string += "\n" + plaintext_notes
return string
"""
def __repr__(self):
return f"Artist(\"{self.name}\")"
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value
@ -613,43 +592,6 @@ class Artist(Base):
return r
@property
def options(self) -> List[P]:
options = [self]
options.extend(self.main_album_collection)
options.extend(self.feature_song_collection)
return options
@property
def feature_album(self) -> Album:
return Album(
title="features",
album_status=AlbumStatus.UNRELEASED,
album_type=AlbumType.COMPILATION_ALBUM,
is_split=True,
albumsort=666,
dynamic=True,
song_list=self.feature_song_collection.shallow_list
)
def get_all_songs(self) -> List[Song]:
"""
returns a list of all Songs.
probably not that useful, because it is unsorted
"""
collection = self.feature_song_collection.copy()
for album in self.discography:
collection.extend(album.song_collection)
return collection
@property
def discography(self) -> List[Album]:
flat_copy_discography = self.main_album_collection.copy()
flat_copy_discography.append(self.feature_album)
return flat_copy_discography
"""
Label
@ -702,7 +644,6 @@ class Label(Base):
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection]
]

View File

@ -2,142 +2,176 @@ from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable
from urllib.parse import urlparse
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums.source import SourcePages, SourceTypes
from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url
from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
from .parents import OuterProxy
from .collection import Collection
class Source(OuterProxy):
url: str
@dataclass
class Source:
page_enum: SourcePages
referer_page: SourcePages
url: str
referrer_page: SourcePages = None
audio_url: Optional[str] = None
audio_url: str
additional_data: dict = field(default_factory=dict)
_default_factories = {
"audio_url": lambda: None,
}
def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum
# This is automatically generated
def __init__(self, page_enum: SourcePages, url: str, referer_page: SourcePages = None, audio_url: str = None,
**kwargs) -> None:
if referer_page is None:
referer_page = page_enum
super().__init__(url=url, page_enum=page_enum, referer_page=referer_page, audio_url=audio_url, **kwargs)
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod
def match_url(cls, url: str, referer_page: SourcePages) -> Optional["Source"]:
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]:
"""
this shouldn't be used, unlesse you are not certain what the source is for
this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient
"""
parsed = urlparse(url)
url = parsed.geturl()
parsed_url = urlparse(url)
url = parsed_url.geturl()
if "musify" in parsed.netloc:
return cls(SourcePages.MUSIFY, url, referer_page=referer_page)
if "musify" in parsed_url.netloc:
return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page)
if parsed.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referer_page=referer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url, referer_page=referer_page)
return cls(SourcePages.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url, referer_page=referer_page)
return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url, referer_page=referer_page)
return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed.netloc:
return cls(SourcePages.WIKIPEDIA, url, referer_page=referer_page)
if "wikipedia" in parsed_url.netloc:
return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referer_page=referer_page)
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once
if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url, referer_page=referer_page)
return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url, referer_page=referer_page)
return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url, referer_page=referer_page)
return cls(SourcePages.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"):
return cls(SourcePages.MYSPACE, url, referer_page=referer_page)
def get_song_metadata(self) -> Metadata:
return Metadata({
Mapping.FILE_WEBPAGE_URL: [self.url],
Mapping.SOURCE_WEBPAGE_URL: [self.homepage]
})
def get_artist_metadata(self) -> Metadata:
return Metadata({
Mapping.ARTIST_WEBPAGE_URL: [self.url]
})
return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page)
@property
def hash_url(self) -> str:
return hash_url(self.url)
@property
def metadata(self) -> Metadata:
return self.get_song_metadata()
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('url', self.url),
('audio_url', self.audio_url),
]
def __str__(self):
return self.__repr__()
def indexing_values(self) -> list:
r = [hash_url(self.url)]
if self.audio_url:
r.append(hash_url(self.audio_url))
return r
def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})"
@property
def title_string(self) -> str:
return self.url
def __merge__(self, other: Source, **kwargs):
if self.audio_url is None:
self.audio_url = other.audio_url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
class SourceCollection(Collection):
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourcePages, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
self._page_to_source_list = defaultdict(list)
self._indexed_sources = {}
super().__init__(data=data, **kwargs)
self.extend(data or [])
def _map_element(self, __object: Source, **kwargs):
super()._map_element(__object, **kwargs)
def has_source_page(self, *source_pages: SourcePages) -> bool:
return any(source_page in self._page_to_source_list for source_page in source_pages)
self._page_to_source_list[__object.page_enum].append(__object)
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
if not len(source_pages):
source_pages = self.source_pages
for page in source_pages:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:
existing_source = self._indexed_sources[key]
break
if existing_source is not None:
existing_source.__merge__(source)
source = existing_source
else:
self._page_to_source_list[source.page_enum].append(source)
changed = False
for key in source.indexing_values:
if key not in self._indexed_sources:
changed = True
self._indexed_sources[key] = source
if changed:
self.__change_version__ = generate_id()
def extend(self, sources: Iterable[Source]):
for source in sources:
self.append(source)
def __iter__(self):
yield from self.get_sources()
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def source_pages(self) -> Set[SourcePages]:
return set(source.page_enum for source in self._data)
def source_pages(self) -> Iterable[SourcePages]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
def get_sources_from_page(self, source_page: SourcePages) -> List[Source]:
"""
getting the sources for a specific page like
YouTube or musify
"""
return self._page_to_source_list[source_page].copy()
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@property
def url_list(self) -> List[str]:
return [source.url for source in self.get_sources()]
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:
yield "url", index

View File

@ -89,52 +89,6 @@ class NamingDict(dict):
return self.default_value_for_name(attribute_name)
def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
if type(music_object) == Label:
return _clean_label(label=music_object, collections=collections)
if type(music_object) == Artist:
return _clean_artist(artist=music_object, collections=collections)
if type(music_object) == Album:
return _clean_album(album=music_object, collections=collections)
if type(music_object) == Song:
return _clean_song(song=music_object, collections=collections)
def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]):
if collection.element_type not in collection_dict:
return
for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element, merge_into_existing=True)
collection[i] = r.current_element
if not r.was_the_same:
_clean_music_object(r.current_element, collection_dict)
def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(label.current_artist_collection, collections)
_clean_collection(label.album_collection, collections)
def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(artist.main_album_collection, collections)
_clean_collection(artist.feature_song_collection, collections)
_clean_collection(artist.label_collection, collections)
def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(album.label_collection, collections)
_clean_collection(album.song_collection, collections)
_clean_collection(album.artist_collection, collections)
def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(song.album_collection, collections)
_clean_collection(song.feature_artist_collection, collections)
_clean_collection(song.main_artist_collection, collections)
class Page:
"""
This is an abstract class, laying out the
@ -246,7 +200,7 @@ class Page:
# only certain database objects, have a source list
if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source
for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE):
for source in music_object.source_collection.get_sources(self.SOURCE_TYPE):
if music_object.already_fetched_from(source.hash_url):
continue
@ -419,9 +373,10 @@ class Page:
if song.target_collection.empty:
song.target_collection.append(new_target)
sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
if len(sources) == 0:
return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.")
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
temp_target: Target = Target(
relative_to_music_dir=False,
@ -448,14 +403,19 @@ class Page:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r
source = sources[0]
if not found_on_disc:
for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error:
r.merge(self._post_process_targets(song, temp_target,
[] if found_on_disc else self.get_skip_intervals(song, source)))
break
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=[] if found_on_disc else self.get_skip_intervals(song, source)
))
return r

View File

@ -185,7 +185,7 @@ class Bandcamp(Page):
if li is None and li['href'] is not None:
continue
source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE))
source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE))
return Artist(
name=name,

View File

@ -486,7 +486,7 @@ class EncyclopaediaMetallum(Page):
href = anchor["href"]
if href is not None:
source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE))
source_list.append(Source.match_url(href, referrer_page=self.SOURCE_TYPE))
# The following code is only legacy code, which I just kep because it doesn't harm.
# The way ma returns sources changed.
@ -504,7 +504,7 @@ class EncyclopaediaMetallum(Page):
if url is None:
continue
source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE))
source_list.append(Source.match_url(url, referrer_page=self.SOURCE_TYPE))
return source_list

View File

@ -503,7 +503,7 @@ class Musify(Page):
source_list.append(Source(
SourcePages.YOUTUBE,
iframe["src"],
referer_page=self.SOURCE_TYPE
referrer_page=self.SOURCE_TYPE
))
return Song(
@ -690,13 +690,6 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song)
if stop_at_level > 1:
song: Song
for song in album.song_collection:
sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
for source in sources:
song.merge(self.fetch_song(source=source))
album.update_tracksort()
return album
@ -812,7 +805,7 @@ class Musify(Page):
href = additional_source.get("href")
if href is None:
continue
new_src = Source.match_url(href, referer_page=self.SOURCE_TYPE)
new_src = Source.match_url(href, referrer_page=self.SOURCE_TYPE)
if new_src is None:
continue
source_list.append(new_src)

View File

@ -25,7 +25,6 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results.extend(parse_renderer(sub_renderer))
return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", []))
@ -54,19 +53,11 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
for result in results:
_map[type(result)].append(result)
for song in song_list:
if len(song_list) == 1:
song = song_list[0]
song.feature_artist_collection.extend(artist_list)
song.album_collection.extend(album_list)
song.main_artist_collection.extend(artist_list)
for album in album_list:
album.artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
return [song]
return results

View File

@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
_temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {}))
element_type = PageType.SONG
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
except ValueError:
return
element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId"))
element_text = run_element.get("text")
if element_id is None or element_text is None:
@ -60,7 +60,11 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=clean_song_title(element_text), source_list=[source])
return Song(
title=clean_song_title(element_text),
source_list=[source]
)
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")

View File

@ -8,6 +8,7 @@ import json
from dataclasses import dataclass
import re
from functools import lru_cache
from collections import defaultdict
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
@ -17,7 +18,7 @@ from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
@ -30,12 +31,16 @@ from ...objects import (
Song,
Album,
Label,
Target
Target,
Lyrics,
FormattedText
)
from ...connection import Connection
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube
@ -162,6 +167,12 @@ class MusicKrakenYoutubeIE(YoutubeIE):
ALBUM_TYPE_MAP = {
"Single": AlbumType.SINGLE,
"Album": AlbumType.STUDIO_ALBUM,
"EP": AlbumType.EP,
}
class YoutubeMusic(SuperYouTube):
# CHANGE
@ -401,7 +412,7 @@ class YoutubeMusic(SuperYouTube):
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist()
artist = Artist(source_list=[source])
# construct the request
url = urlparse(source.url)
@ -421,6 +432,19 @@ class YoutubeMusic(SuperYouTube):
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
# artist details
data: dict = r.json()
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
artist.name = title_runs[0].get("text", artist.name)
# fetch discography
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@ -465,6 +489,46 @@ class YoutubeMusic(SuperYouTube):
if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
# album details
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
album.title = title_runs[0].get("text", album.title)
def other_parse_run(run: dict) -> str:
nonlocal album
if "text" not in run:
return
text = run["text"]
is_text_field = len(run.keys()) == 1
# regex that text is a year
if is_text_field and re.match(r"\d{4}", text):
album.date = ID3Timestamp.strptime(text, "%Y")
return
if text in ALBUM_TYPE_MAP:
album.album_type = ALBUM_TYPE_MAP[text]
return
if not is_text_field:
r = parse_run_element(run)
if r is not None:
album.add_list_of_other_objects([r])
return
for _run in subtitle_runs:
other_parse_run(_run)
# tracklist
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@ -472,20 +536,67 @@ class YoutubeMusic(SuperYouTube):
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list:
results.extend(parse_renderer(renderer))
album.add_list_of_other_objects(parse_renderer(renderer))
album.add_list_of_other_objects(results)
for song in album.song_collection:
for song_source in song.source_collection:
song_source.additional_data["playlist_id"] = browse_id
return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
}
if playlist_id is not None:
request_data["playlistId"] = playlist_id
tab_request = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_tabs_{video_id}.json",
)
if tab_request is None:
return None
dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False)
tab_data: dict = tab_request.json()
tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[])
browse_id = None
for tab in tabs:
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is None:
return None
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
},
name=f"fetch_song_lyrics_{video_id}.json"
)
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None)
if lyrics_text is None:
return None
return Lyrics(FormattedText(plain=lyrics_text))
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = {}
@ -498,7 +609,19 @@ class YoutubeMusic(SuperYouTube):
self.fetch_media_url(source=source, ydl_res=ydl_res)
artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic")
artist_names = []
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
artist_list = [
Artist(
name=name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
) for name in artist_names]
album_list = []
if "album" in ydl_res:
@ -507,25 +630,57 @@ class YoutubeMusic(SuperYouTube):
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
))
return Song(
artist_name = artist_names[0] if len(artist_names) > 0 else None
song = Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"),
album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=[Artist(
name=artist_name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
)],
main_artist_list=artist_list,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
), source],
)
# other song details
parsed_url = urlparse(source.url)
browse_id = parse_qs(parsed_url.query)['v'][0]
request_data = {
"captionParams": {},
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": browse_id,
}
if "playlist_id" in source.additional_data:
request_data["playlistId"] = source.additional_data["playlist_id"]
initial_details = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_{browse_id}.json",
)
if initial_details is None:
return song
dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False)
data = initial_details.json()
video_details = data.get("videoDetails", {})
browse_id = video_details.get("videoId", browse_id)
song.title = video_details.get("title", song.title)
if video_details.get("isLiveContent", False):
for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
return song
def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict:
def _get_best_format(format_list: List[Dict]) -> dict:

View File

@ -3,24 +3,30 @@ from pathlib import Path
import json
import logging
import inspect
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args
"""
IO functions
"""
def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str):
msg = str(msg)
if color is BColors.ENDC:
return msg
return color.value + msg + BColors.ENDC.value
def output(msg: str, color: BColors = BColors.ENDC):
print(_apply_color(msg, color))
@merge_args(print)
def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs):
print(*(_apply_color(s, color) for s in msg), **kwargs)
def user_input(msg: str, color: BColors = BColors.ENDC):
@ -71,6 +77,43 @@ def object_trace(obj):
misc functions
"""
def traverse_json_path(data, path: Union[str, List[str]], default=None):
"""
Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices.
"""
if isinstance(path, str):
path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".")
path = [p for p in path.split(".") if len(p) > 0]
if len(path) <= 0:
return data
current = path[0]
path = path[1:]
new_data = None
if isinstance(data, dict):
new_data = data.get(current)
elif isinstance(data, list):
try:
new_data = data[int(current)]
except (IndexError, ValueError):
pass
if new_data is None:
return default
return traverse_json_path(data=new_data, path=path, default=default)
_auto_increment = 0
def generate_id() -> int:
global _auto_increment
_auto_increment += 1
return _auto_increment
def get_current_millis() -> int:
dt = datetime.now()
return int(dt.microsecond / 1_000)

View File

@ -9,42 +9,32 @@ class SourceTypes(Enum):
class SourcePages(Enum):
YOUTUBE = "youtube"
MUSIFY = "musify"
YOUTUBE_MUSIC = "youtube music"
GENIUS = "genius"
MUSICBRAINZ = "musicbrainz"
YOUTUBE = "youtube", "https://www.youtube.com/"
MUSIFY = "musify", "https://musify.club/"
YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/"
GENIUS = "genius", "https://genius.com/"
MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp"
DEEZER = "deezer"
SPOTIFY = "spotify"
BANDCAMP = "bandcamp", "https://bandcamp.com/"
DEEZER = "deezer", "https://www.deezer.com/"
SPOTIFY = "spotify", "https://open.spotify.com/"
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = "wikipedia"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
TWITTER = "twitter" # I will use nitter though lol
MYSPACE = "myspace" # Yes somehow this ancient site is linked EVERYWHERE
WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page"
INSTAGRAM = "instagram", "https://www.instagram.com/"
FACEBOOK = "facebook", "https://www.facebook.com/"
TWITTER = "twitter", "https://twitter.com/"
MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE
MANUAL = "manual"
MANUAL = "manual", ""
PRESET = "preset"
PRESET = "preset", ""
def __new__(cls, value, homepage = None):
member = object.__new__(cls)
member._value_ = value
member.homepage = homepage
return member
@classmethod
def get_homepage(cls, attribute) -> str:
homepage_map = {
cls.YOUTUBE: "https://www.youtube.com/",
cls.MUSIFY: "https://musify.club/",
cls.MUSICBRAINZ: "https://musicbrainz.org/",
cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/",
cls.GENIUS: "https://genius.com/",
cls.BANDCAMP: "https://bandcamp.com/",
cls.DEEZER: "https://www.deezer.com/",
cls.INSTAGRAM: "https://www.instagram.com/",
cls.FACEBOOK: "https://www.facebook.com/",
cls.SPOTIFY: "https://open.spotify.com/",
cls.TWITTER: "https://twitter.com/",
cls.MYSPACE: "https://myspace.com/",
cls.WIKIPEDIA: "https://en.wikipedia.org/wiki/Main_Page"
}
return homepage_map[attribute]

View File

@ -78,7 +78,14 @@ def _merge(
drop_args = []
if drop_kwonlyargs is None:
drop_kwonlyargs = []
is_builtin = False
try:
source_spec = inspect.getfullargspec(source)
except TypeError:
is_builtin = True
source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], [])
dest_spec = inspect.getfullargspec(dest)
if source_spec.varargs or source_spec.varkw:
@ -128,13 +135,15 @@ def _merge(
'co_kwonlyargcount': len(kwonlyargs_merged),
'co_posonlyargcount': dest.__code__.co_posonlyargcount,
'co_nlocals': len(args_all),
'co_flags': source.__code__.co_flags,
'co_varnames': args_all,
'co_filename': dest.__code__.co_filename,
'co_name': dest.__code__.co_name,
'co_firstlineno': dest.__code__.co_firstlineno,
}
if hasattr(source, "__code__"):
replace_kwargs['co_flags'] = source.__code__.co_flags
if PY310:
replace_kwargs['co_linetable'] = dest.__code__.co_linetable
else:
@ -151,7 +160,7 @@ def _merge(
len(kwonlyargs_merged),
_blank.__code__.co_nlocals,
_blank.__code__.co_stacksize,
source.__code__.co_flags,
source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags,
_blank.__code__.co_code, (), (),
args_all, dest.__code__.co_filename,
dest.__code__.co_name,
@ -171,6 +180,9 @@ def _merge(
dest_ret = dest.__annotations__['return']
for v in ('__kwdefaults__', '__annotations__'):
if not hasattr(source, v):
continue
out = getattr(source, v)
if out is None:
out = {}

View File

@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:
print("DEBUG ACTIVE")

View File

@ -6,6 +6,7 @@ from functools import lru_cache
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
@ -21,6 +22,7 @@ def unify(string: str) -> str:
returns a unified str, to make comparisons easy.
a unified string has the following attributes:
- is lowercase
- is transliterated to Latin characters from e.g. Cyrillic
"""
if string is None:
@ -31,7 +33,8 @@ def unify(string: str) -> str:
except LanguageDetectionError:
pass
return string.lower()
string = unify_punctuation(string)
return string.lower().strip()
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]:
@ -49,7 +52,14 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni
string = string[1:]
string = string.replace("/", "_").replace("\\", "_")
try:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
string = sanitize_filename(string)
return string
if isinstance(string, Path):
@ -127,13 +137,45 @@ UNIFY_TO = " "
ALLOWED_LENGTH_DISTANCE = 20
def unify_punctuation(to_unify: str) -> str:
def unify_punctuation(to_unify: str, unify_to: str = UNIFY_TO) -> str:
for char in string.punctuation:
to_unify = to_unify.replace(char, UNIFY_TO)
to_unify = to_unify.replace(char, unify_to)
return to_unify
def hash_url(url: str) -> int:
return url.strip().lower().lstrip("https://").lstrip("http://")
@lru_cache(maxsize=128)
def hash_url(url: Union[str, ParseResult]) -> str:
if isinstance(url, str):
url = urlparse(url)
unify_to = "-"
def unify_part(part: str) -> str:
nonlocal unify_to
return unify_punctuation(part.lower(), unify_to=unify_to).strip(unify_to)
# netloc
netloc = unify_part(url.netloc)
if netloc.startswith("www" + unify_to):
netloc = netloc[3 + len(unify_to):]
# query
query = url.query
query_dict: Optional[dict] = None
try:
query_dict: dict = parse_qs(url.query, strict_parsing=True)
except ValueError:
# the query couldn't be parsed
pass
if isinstance(query_dict, dict):
# sort keys alphabetically
query = ""
for key, value in sorted(query_dict.items(), key=lambda i: i[0]):
query += f"{key.strip()}-{''.join(i.strip() for i in value)}"
r = f"{netloc}_{unify_part(url.path)}_{unify_part(query)}"
r = r.lower().strip()
return r
def remove_feature_part_from_track(title: str) -> str:

0
tests/__init__.py Normal file
View File

35
tests/test_hash_url.py Normal file
View File

@ -0,0 +1,35 @@
import unittest
from music_kraken.utils.string_processing import hash_url
class TestCollection(unittest.TestCase):
def test_remove_schema(self):
self.assertFalse(hash_url("https://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("ftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("sftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("http://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
def test_no_punctuation(self):
self.assertNotIn(hash_url("https://www.you_tube.com/watch?v=3jZ_D3ELwOQ"), "you_tube")
self.assertNotIn(hash_url("https://docs.gitea.com/next/install.ation/comparison"), ".")
def test_three_parts(self):
"""
The url is parsed into three parts [netloc; path; query]
Which are then appended to each other with an underscore between.
"""
self.assertTrue(hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web").count("_") == 2)
def test_sort_query(self):
"""
The query is sorted alphabetically
"""
hashed = hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web")
sorted_keys = ["ia-", "q-", "t-"]
self.assertTrue(hashed.index(sorted_keys[0]) < hashed.index(sorted_keys[1]) < hashed.index(sorted_keys[2]))
if __name__ == "__main__":
unittest.main()