Merge branch 'experimental' into issue16
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful

This commit is contained in:
2024-05-08 12:33:56 +02:00
28 changed files with 1108 additions and 951 deletions

View File

@@ -53,9 +53,9 @@ class Artwork:
def get_variant_name(self, variant: ArtworkVariant) -> str:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
def __merge__(self, other: Artwork, override: bool = False) -> None:
def __merge__(self, other: Artwork, **kwargs) -> None:
for key, value in other._variant_mapping.items():
if key not in self._variant_mapping or override:
if key not in self._variant_mapping:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool:

View File

@@ -1,9 +1,12 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
import copy
from .parents import OuterProxy
from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy)
@@ -13,8 +16,8 @@ class Collection(Generic[T]):
_data: List[T]
_indexed_values: Dict[str, set]
_indexed_to_objects: Dict[any, list]
_indexed_from_id: Dict[int, Dict[str, Any]]
_indexed_values: Dict[str, Dict[Any, T]]
shallow_list = property(fget=lambda self: self.data)
@@ -36,9 +39,9 @@ class Collection(Generic[T]):
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
@@ -46,11 +49,19 @@ class Collection(Generic[T]):
self.extend(data)
def __repr__(self) -> str:
return f"Collection({id(self)})"
def __hash__(self) -> int:
return id(self)
def _map_element(self, __object: T, from_map: bool = False):
self._unmap_element(__object.id)
@property
def collection_names(self) -> List[str]:
return list(set(self._collection_for.values()))
def __repr__(self) -> str:
return f"Collection({' | '.join(self.collection_names)} {id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
@@ -74,73 +85,125 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id]
def _find_object(self, __object: T) -> Optional[T]:
def _remap(self):
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
if __object.id in self._indexed_from_id:
return self._indexed_values["id"][__object.id]
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
return None
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param __object:
:param already_is_parent:
:param from_map:
:param other:
:return:
"""
if __object is None:
if other is None:
return
if other.id in self._indexed_from_id:
return
existing_object = self._find_object(__object)
object_trace(f"Appending {other.option_string} to {self}")
if existing_object is None:
# append
self._data.append(__object)
self._map_element(__object)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object)
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]")
data_to_extend = b.data
a._collection_for.update(b._collection_for)
for synced_with, key in b._collection_for.items():
synced_with.__setattr__(key, a)
a.extend(data_to_extend)
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
# output("found push to", r, other, c, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
# output("found pull from", r, other, c, self, color=BColors.RED, sep="\t")
c.remove(r, existing=r, **kwargs)
existing = self._find_object(other)
if existing is None:
self._append_new_object(other, **kwargs)
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
return
existing.merge(other, **kwargs)
old_id = existing_object.id
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, remove_from_other_collection=True, **kwargs):
other: T
for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
existing_object.merge(__object)
if remove_from_other_collection:
for c in copy.copy(other._inner._is_in_collection):
c.remove(other, silent=True, remove_from_other_collection=False, **kwargs)
other._inner._is_in_collection = set()
else:
self._data.remove(existing)
self._unmap_element(existing)
if existing_object.id != old_id:
self._unmap_element(old_id)
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None:
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return
for __object in __iterable:
self.append(__object)
for other_object in other_collections:
self.append(other_object, **kwargs)
@property
def data(self) -> List[T]:
@@ -156,8 +219,9 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other)
def __merge__(self, other: Collection, **kwargs):
object_trace(f"merging {str(self)} | {str(other)}")
self.extend(other, **kwargs)
def __getitem__(self, item: int):
return self._data[item]
@@ -166,3 +230,9 @@ class Collection(Generic[T]):
if item >= len(self._data):
return default
return self._data[item]
def __eq__(self, other: Collection) -> bool:
if self.empty and other.empty:
return True
return self._data == other._data

View File

@@ -9,9 +9,9 @@ from pathlib import Path
import inspect
from .metadata import Metadata
from ..utils import get_unix_time, object_trace
from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"]
@@ -29,9 +29,15 @@ class InnerData:
"""
_refers_to_instances: set = None
_is_in_collection: set = None
"""
Attribute versions keep track, of if the attribute has been changed.
"""
def __init__(self, object_type, **kwargs):
self._refers_to_instances = set()
self._is_in_collection = set()
self._fetched_from: dict = {}
# initialize the default values
@@ -42,21 +48,29 @@ class InnerData:
for key, value in kwargs.items():
if hasattr(value, "__is_collection__"):
value._collection_for[self] = key
self.__setattr__(key, value)
def __hash__(self):
return self.id
def __merge__(self, __other: InnerData, override: bool = False):
def __merge__(self, __other: InnerData, **kwargs):
"""
:param __other:
:param override:
:return:
"""
self._fetched_from.update(__other._fetched_from)
self._is_in_collection.update(__other._is_in_collection)
for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
continue
if hasattr(value, "__is_collection__") and key in self.__dict__:
self.__getattribute__(key).__merge__(value, **kwargs)
continue
# just set the other value if self doesn't already have it
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
self.__setattr__(key, value)
@@ -64,13 +78,8 @@ class InnerData:
# if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key)
if hasattr(type(existing), "__merge__"):
existing.__merge__(value, override)
continue
# override the existing value if requested
if override:
self.__setattr__(key, value)
if hasattr(existing, "__merge__"):
existing.__merge__(value, **kwargs)
class OuterProxy:
@@ -84,8 +93,6 @@ class OuterProxy:
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
TITEL = "id"
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
_automatic_id: bool = False
@@ -94,7 +101,7 @@ class OuterProxy:
generates a random integer id
the range is defined in the config
"""
_id = random.randint(0, HIGHEST_ID)
_id = generate_id()
_automatic_id = True
kwargs["automatic_id"] = _automatic_id
@@ -116,7 +123,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
object_trace(f"creating {type(self).__name__} [{self.option_string}]")
self.__init_collections__()
@@ -173,13 +180,12 @@ class OuterProxy:
def __eq__(self, other: Any):
return self.__hash__() == other.__hash__()
def merge(self, __other: Optional[OuterProxy], override: bool = False):
def merge(self, __other: Optional[OuterProxy], **kwargs):
"""
1. merges the data of __other in self
2. replaces the data of __other with the data of self
:param __other:
:param override:
:return:
"""
if __other is None:
@@ -196,7 +202,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
object_trace(f"merging {a.option_string} | {b.option_string}")
old_inner = b._inner
@@ -204,11 +210,11 @@ class OuterProxy:
instance._inner = a._inner
a._inner._refers_to_instances.add(instance)
a._inner.__merge__(old_inner, override=override)
a._inner.__merge__(old_inner, **kwargs)
del old_inner
def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
self.merge(__other, override)
def __merge__(self, __other: Optional[OuterProxy], **kwargs):
self.merge(__other, **kwargs)
def mark_as_fetched(self, *url_hash_list: List[str]):
for url_hash in url_hash_list:
@@ -235,7 +241,23 @@ class OuterProxy:
@property
def options(self) -> List[P]:
return [self]
r = []
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
r.append(self)
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = []
@property
def indexing_values(self) -> List[Tuple[str, object]]:
@@ -267,9 +289,10 @@ class OuterProxy:
return r
TITEL = "id"
@property
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL))
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union
import copy
import pycountry
@@ -22,6 +23,7 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .parents import OuterProxy as Base
@@ -43,7 +45,8 @@ def get_collection_string(
template: str,
ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND
foreground: BColors = OPTION_FOREGROUND,
add_id: bool = DEBUG_PRINT_ID,
) -> str:
if collection.empty:
return ""
@@ -55,8 +58,15 @@ def get_collection_string(
r = background
def get_element_str(element) -> str:
nonlocal add_id
r = element.title_string.strip()
if add_id and False:
r += " " + str(element.id)
return r
element: Base
titel_list: List[str] = [element.title_string.strip() for element in collection if element.title_string not in ignore_titles]
titel_list: List[str] = [get_element_str(element) for element in collection if element.title_string not in ignore_titles]
for i, titel in enumerate(titel_list):
delimiter = ", "
@@ -109,15 +119,29 @@ class Song(Base):
"tracksort": lambda: 0,
}
def __init__(self, title: str = "", unified_title: str = None, isrc: str = None, length: int = None,
genre: str = None, note: FormattedText = None, source_list: List[Source] = None,
target_list: List[Target] = None, lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, tracksort: int = 0, artwork: Optional[Artwork] = None, **kwargs) -> None:
def __init__(
self,
title: str = None,
isrc: str = None,
length: int = None,
genre: str = None,
note: FormattedText = None,
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None,
album_list: List[Album] = None,
tracksort: int = 0,
artwork: Optional[Artwork] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**locals())
Base.__init__(**real_kwargs)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title"
def __init_collections__(self) -> None:
@@ -135,6 +159,9 @@ class Song(Base):
"feature_song_collection": self
}
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
return
@@ -144,20 +171,21 @@ class Song(Base):
return
if isinstance(object_list, Artist):
self.main_artist_collection.extend(object_list)
self.feature_artist_collection.extend(object_list)
return
if isinstance(object_list, Album):
self.album_collection.extend(object_list)
return
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', unify(self.title)),
('isrc', self.isrc),
*[('url', source.url) for source in self.source_collection]
*self.source_collection.indexing_values(),
]
@property
@@ -169,6 +197,8 @@ class Song(Base):
id3Mapping.GENRE: [self.genre],
id3Mapping.TRACKNUMBER: [self.tracksort_str],
id3Mapping.COMMENT: [self.note.markdown],
id3Mapping.FILE_WEBPAGE_URL: self.source_collection.url_list,
id3Mapping.SOURCE_WEBPAGE_URL: self.source_collection.homepage_list,
})
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
@@ -189,20 +219,12 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
return r
@property
def options(self) -> List[P]:
options = self.main_artist_collection.shallow_list
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return options
@property
def tracksort_str(self) -> str:
"""
@@ -258,18 +280,30 @@ class Album(Base):
TITEL = "title"
# This is automatically generated
def __init__(self, title: str = None, unified_title: str = None, album_status: AlbumStatus = None,
album_type: AlbumType = None, language: Language = None, date: ID3Timestamp = None,
barcode: str = None, albumsort: int = None, notes: FormattedText = None,
source_list: List[Source] = None, artist_list: List[Artist] = None, song_list: List[Song] = None,
label_list: List[Label] = None, **kwargs) -> None:
super().__init__(title=title, unified_title=unified_title, album_status=album_status, album_type=album_type,
language=language, date=date, barcode=barcode, albumsort=albumsort, notes=notes,
source_list=source_list, artist_list=artist_list, song_list=song_list, label_list=label_list,
**kwargs)
def __init__(
self,
title: str = None,
unified_title: str = None,
album_status: AlbumStatus = None,
album_type: AlbumType = None,
language: Language = None,
date: ID3Timestamp = None,
barcode: str = None,
albumsort: int = None,
notes: FormattedText = None,
source_list: List[Source] = None,
artist_list: List[Artist] = None,
song_list: List[Song] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
def __init_collections__(self):
self.song_collection.append_object_to_attribute = {
@@ -302,13 +336,14 @@ class Album(Base):
self.label_collection.extend(object_list)
return
INDEX_DEPENDS_ON = ("title", "barcode", "source_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('title', unify(self.title)),
('barcode', self.barcode),
*[('url', source.url) for source in self.source_collection]
*self.source_collection.indexing_values(),
]
@property
@@ -333,19 +368,13 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0:
r += f" with {len(self.song_collection)} songs"
return r
@property
def options(self) -> List[P]:
options = [*self.artist_collection, self, *self.song_collection]
return options
def update_tracksort(self):
"""
@@ -372,18 +401,6 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i
def compile(self, merge_into: bool = False):
"""
compiles the recursive structures,
and does depending on the object some other stuff.
no need to override if only the recursive structure should be built.
override self.build_recursive_structures() instead
"""
self.update_tracksort()
self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into)
@property
def copyright(self) -> str:
if self.date is None:
@@ -415,21 +432,15 @@ class Album(Base):
return self.album_type.value
"""
All objects dependent on Artist
"""
class Artist(Base):
name: str
unified_name: str
country: Country
formed_in: ID3Timestamp
notes: FormattedText
lyrical_themes: List[str]
general_genre: str
unformated_location: str
unformatted_location: str
source_collection: SourceCollection
contact_collection: Collection[Contact]
@@ -439,10 +450,9 @@ class Artist(Base):
label_collection: Collection[Label]
_default_factories = {
"name": str,
"unified_name": lambda: None,
"name": lambda: None,
"country": lambda: None,
"unformated_location": lambda: None,
"unformatted_location": lambda: None,
"formed_in": ID3Timestamp,
"notes": FormattedText,
@@ -459,19 +469,30 @@ class Artist(Base):
TITEL = "name"
# This is automatically generated
def __init__(self, name: str = "", unified_name: str = None, country: Country = None,
formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None,
general_genre: str = None, unformated_location: str = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None:
def __init__(
self,
name: str = None,
unified_name: str = None,
country: Country = None,
formed_in: ID3Timestamp = None,
notes: FormattedText = None,
lyrical_themes: List[str] = None,
general_genre: str = None,
unformatted_location: str = None,
source_list: List[Source] = None,
contact_list: List[Contact] = None,
feature_song_list: List[Song] = None,
main_album_list: List[Album] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes,
lyrical_themes=lyrical_themes, general_genre=general_genre,
unformated_location=unformated_location, source_list=source_list, contact_list=contact_list,
feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list,
**kwargs)
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("feature_song_collection", "main_album_collection")
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self):
@@ -504,12 +525,6 @@ class Artist(Base):
self.label_collection.extend(object_list)
return
@property
def options(self) -> List[P]:
options = [self, *self.main_album_collection.shallow_list, *self.feature_album]
print(options)
return options
def update_albumsort(self):
"""
This updates the albumsort attributes, of the albums in
@@ -567,40 +582,27 @@ class Artist(Base):
# replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album)
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection],
*[('contact', contact.value) for contact in self.contact_collection]
*[('contact', contact.value) for contact in self.contact_collection],
*self.source_collection.indexing_values(),
]
@property
def metadata(self) -> Metadata:
metadata = Metadata({
id3Mapping.ARTIST: [self.name]
id3Mapping.ARTIST: [self.name],
id3Mapping.ARTIST_WEBPAGE_URL: self.source_collection.url_list,
})
metadata.merge_many([s.get_artist_metadata() for s in self.source_collection])
return metadata
"""
def __str__(self, include_notes: bool = False):
string = self.name or ""
if include_notes:
plaintext_notes = self.notes.get_plaintext()
if plaintext_notes is not None:
string += "\n" + plaintext_notes
return string
"""
def __repr__(self):
return f"Artist(\"{self.name}\")"
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value
@@ -613,48 +615,6 @@ class Artist(Base):
return r
@property
def options(self) -> List[P]:
options = [self]
options.extend(self.main_album_collection)
options.extend(self.feature_song_collection)
return options
@property
def feature_album(self) -> Album:
return Album(
title="features",
album_status=AlbumStatus.UNRELEASED,
album_type=AlbumType.COMPILATION_ALBUM,
is_split=True,
albumsort=666,
dynamic=True,
song_list=self.feature_song_collection.shallow_list
)
def get_all_songs(self) -> List[Song]:
"""
returns a list of all Songs.
probably not that useful, because it is unsorted
"""
collection = self.feature_song_collection.copy()
for album in self.discography:
collection.extend(album.song_collection)
return collection
@property
def discography(self) -> List[Album]:
flat_copy_discography = self.main_album_collection.copy()
flat_copy_discography.append(self.feature_album)
return flat_copy_discography
"""
Label
"""
class Label(Base):
COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection")
@@ -683,12 +643,21 @@ class Label(Base):
TITEL = "name"
def __init__(self, name: str = None, unified_name: str = None, notes: FormattedText = None,
source_list: List[Source] = None, contact_list: List[Contact] = None,
album_list: List[Album] = None, current_artist_list: List[Artist] = None, **kwargs) -> None:
super().__init__(name=name, unified_name=unified_name, notes=notes, source_list=source_list,
contact_list=contact_list, album_list=album_list, current_artist_list=current_artist_list,
**kwargs)
def __init__(
self,
name: str = None,
unified_name: str = None,
notes: FormattedText = None,
source_list: List[Source] = None,
contact_list: List[Contact] = None,
album_list: List[Album] = None,
current_artist_list: List[Artist] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
def __init_collections__(self):
self.album_collection.append_object_to_attribute = {
@@ -702,7 +671,6 @@ class Label(Base):
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection]
]

View File

@@ -2,142 +2,176 @@ from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable
from urllib.parse import urlparse
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums.source import SourcePages, SourceTypes
from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url
from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
from .parents import OuterProxy
from .collection import Collection
class Source(OuterProxy):
url: str
@dataclass
class Source:
page_enum: SourcePages
referer_page: SourcePages
url: str
referrer_page: SourcePages = None
audio_url: Optional[str] = None
audio_url: str
additional_data: dict = field(default_factory=dict)
_default_factories = {
"audio_url": lambda: None,
}
# This is automatically generated
def __init__(self, page_enum: SourcePages, url: str, referer_page: SourcePages = None, audio_url: str = None,
**kwargs) -> None:
if referer_page is None:
referer_page = page_enum
super().__init__(url=url, page_enum=page_enum, referer_page=referer_page, audio_url=audio_url, **kwargs)
def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod
def match_url(cls, url: str, referer_page: SourcePages) -> Optional["Source"]:
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]:
"""
this shouldn't be used, unlesse you are not certain what the source is for
this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient
"""
parsed = urlparse(url)
url = parsed.geturl()
parsed_url = urlparse(url)
url = parsed_url.geturl()
if "musify" in parsed.netloc:
return cls(SourcePages.MUSIFY, url, referer_page=referer_page)
if "musify" in parsed_url.netloc:
return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page)
if parsed.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referer_page=referer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url, referer_page=referer_page)
return cls(SourcePages.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url, referer_page=referer_page)
return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url, referer_page=referer_page)
return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed.netloc:
return cls(SourcePages.WIKIPEDIA, url, referer_page=referer_page)
if "wikipedia" in parsed_url.netloc:
return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referer_page=referer_page)
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once
if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url, referer_page=referer_page)
return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url, referer_page=referer_page)
return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url, referer_page=referer_page)
return cls(SourcePages.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"):
return cls(SourcePages.MYSPACE, url, referer_page=referer_page)
def get_song_metadata(self) -> Metadata:
return Metadata({
Mapping.FILE_WEBPAGE_URL: [self.url],
Mapping.SOURCE_WEBPAGE_URL: [self.homepage]
})
def get_artist_metadata(self) -> Metadata:
return Metadata({
Mapping.ARTIST_WEBPAGE_URL: [self.url]
})
return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page)
@property
def hash_url(self) -> str:
return hash_url(self.url)
@property
def metadata(self) -> Metadata:
return self.get_song_metadata()
@property
def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('url', self.url),
('audio_url', self.audio_url),
]
def __str__(self):
return self.__repr__()
def indexing_values(self) -> list:
r = [hash_url(self.url)]
if self.audio_url:
r.append(hash_url(self.audio_url))
return r
def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})"
@property
def title_string(self) -> str:
return self.url
def __merge__(self, other: Source, **kwargs):
if self.audio_url is None:
self.audio_url = other.audio_url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
class SourceCollection(Collection):
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourcePages, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
self._page_to_source_list = defaultdict(list)
self._indexed_sources = {}
super().__init__(data=data, **kwargs)
self.extend(data or [])
def _map_element(self, __object: Source, **kwargs):
super()._map_element(__object, **kwargs)
def has_source_page(self, *source_pages: SourcePages) -> bool:
return any(source_page in self._page_to_source_list for source_page in source_pages)
self._page_to_source_list[__object.page_enum].append(__object)
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
if not len(source_pages):
source_pages = self.source_pages
for page in source_pages:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:
existing_source = self._indexed_sources[key]
break
if existing_source is not None:
existing_source.__merge__(source)
source = existing_source
else:
self._page_to_source_list[source.page_enum].append(source)
changed = False
for key in source.indexing_values:
if key not in self._indexed_sources:
changed = True
self._indexed_sources[key] = source
if changed:
self.__change_version__ = generate_id()
def extend(self, sources: Iterable[Source]):
for source in sources:
self.append(source)
def __iter__(self):
yield from self.get_sources()
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def source_pages(self) -> Set[SourcePages]:
return set(source.page_enum for source in self._data)
def source_pages(self) -> Iterable[SourcePages]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
def get_sources_from_page(self, source_page: SourcePages) -> List[Source]:
"""
getting the sources for a specific page like
YouTube or musify
"""
return self._page_to_source_list[source_page].copy()
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@property
def url_list(self) -> List[str]:
return [source.url for source in self.get_sources()]
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:
yield "url", index