Compare commits

..

No commits in common. "9c369b421d9b92dea9ef0bf16746bb754ef41145" and "4510520db6e2457c301935c6daa0849b5128de75" have entirely different histories.

7 changed files with 107 additions and 156 deletions

View File

@ -7,7 +7,7 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Crystal F", "s: #a Crystal F",
"d: 20" "d: 20",
] ]

View File

@ -2,27 +2,30 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__": if __name__ == "__main__":
song_1 = Song( album_1 = Album(
title="song", title="album",
main_artist_list=[Artist( song_list=[
name="main_artist" Song(title="song", main_artist_list=[Artist(name="artist")]),
)], ],
feature_artist_list=[Artist( artist_list=[
name="main_artist" Artist(name="artist 3"),
)] ]
) )
other_artist = Artist(name="other_artist") album_2 = Album(
title="album",
song_2 = Song( song_list=[
title = "song", Song(title="song", main_artist_list=[Artist(name="artist 2")]),
main_artist_list=[other_artist] ],
artist_list=[
Artist(name="artist"),
]
) )
other_artist.name = "main_artist" album_1.merge(album_2)
song_1.merge(song_2) print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
print("#" * 120) print(id(album_1.artist_collection), id(album_2.artist_collection))
print("main", *song_1.main_artist_collection) print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
print("feat", *song_1.feature_artist_collection)

View File

@ -40,6 +40,8 @@ class Collection(Generic[T]):
self.pull_from: List[Collection] = [] self.pull_from: List[Collection] = []
self.push_to: List[Collection] = [] self.push_to: List[Collection] = []
self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects # this is to keep track and look up the actual objects
@ -48,10 +50,9 @@ class Collection(Generic[T]):
self.extend(data) self.extend(data)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})" return f"Collection({id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs): def _map_element(self, __object: T, from_map: bool = False):
if not no_unmap:
self._unmap_element(__object.id) self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id self._indexed_from_id[__object.id]["id"] = __object.id
@ -77,153 +78,106 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id] del self._indexed_from_id[obj_id]
def _remap(self): def _remap(self):
# reinitialize the mapping to clean it without time consuming operations for e in self:
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._map_element(e)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
for e in self._data: def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
self._map_element(e, no_unmap=True) if not no_push_to or True:
for c in self.push_to:
found, found_in = c._find_object(__object, no_push_to=True)
if found is not None:
output("push to", found, __object, color=BColors.RED)
return found, found_in
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap() self._remap()
for name, value in __object.indexing_values: for name, value in __object.indexing_values:
if value in self._indexed_values[name]: if value in self._indexed_values[name]:
return self._indexed_values[name][value] return self._indexed_values[name][value], self
return None return None, self
def _merge_into_contained_object(self, existing: T, other: T, **kwargs): def append(self, __object: Optional[T], **kwargs):
"""
This function merges the other object into the existing object, which is contained in the current collection.
This also modifies the correct mapping.
"""
if existing.id == other.id:
return
self._map_element(existing)
existing.merge(other, **kwargs)
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
self._map_element(other)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
"""
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
if id(b) in no_sync_collection:
continue
"""
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
# no_sync_collection.add(id(b))
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap) merge into this object. (and remap)
Else append to this collection. Else append to this collection.
:param other: :param __object:
:return: :return:
""" """
if __object is None:
return
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
if map_to is self:
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
if other is None: if other is None:
return continue
if other.id in self._indexed_from_id:
return
object_trace(f"Appending {other.option_string} to {self}") output("pull from", other, __object, color=BColors.RED)
__object.__merge__(other, no_push_to=False, **kwargs)
push_to: Optional[Tuple[Collection, T]] = None contained.remove(other)
for c in self.push_to:
r = c._find_object(other)
if r is not None:
push_to_collection = (c, r)
output("found push to", found, other, self, color=BColors.RED, sep="\t")
break
pull_from: Optional[Tuple[Collection, T]] = None
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
pull_from_collection = (c, r)
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
break
if pull_from is not None:
pull_from[0].remove(pull_from[1])
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
if existing_object is None: if existing_object is None:
if push_to is None: # append
self._append_new_object(other, **kwargs) self._data.append(__object)
else: self._map_element(__object)
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
if pull_from is not None: for collection_attribute, child_collection in self.extend_object_to_attribute.items():
self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs) __object.__getattribute__(collection_attribute).extend(child_collection, **kwargs)
else:
self._merge_into_contained_object(existing_object, other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
def remove(self, *other_list: List[T], silent: bool = False):
for other in other_list:
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items(): for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs) __object.__getattribute__(attribute).append(new_object, **kwargs)
self._data.remove(existing) # only modify collections if the object actually has been appended
self._unmap_element(existing) for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
if a is b:
continue
def contains(self, other: T) -> bool: no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
return self._find_object(other) is not None object_trace(f"Syncing [{a}] = [{b}]; {no_sync_collection}")
if id(b) in no_sync_collection:
continue
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None: b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b))
# kwargs["no_sync_collection"] = no_sync_collection
del b
a.extend(b_data, **kwargs)
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
return return
for __object in other_collections: existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
def extend(self, __iterable: Optional[Generator[T, None, None]], **kwargs):
if __iterable is None:
return
for __object in __iterable:
self.append(__object, **kwargs) self.append(__object, **kwargs)
@property @property
@ -240,8 +194,8 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]: def __iter__(self) -> Iterator[T]:
yield from self._data yield from self._data
def __merge__(self, other: Collection, **kwargs): def __merge__(self, __other: Collection, **kwargs):
self.extend(other, **kwargs) self.extend(__other, **kwargs)
def __getitem__(self, item: int): def __getitem__(self, item: int):
return self._data[item] return self._data[item]

View File

@ -11,7 +11,7 @@ import inspect
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID from ..utils.shared import HIGHEST_ID
from ..utils.hacking import MetaClass from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"] LOGGER = logging_settings["object_logger"]
@ -113,7 +113,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs) self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self) self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.option_string}]") object_trace(f"creating {type(self).__name__} [{self.title_string}]")
self.__init_collections__() self.__init_collections__()
@ -192,7 +192,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a a, b = b, a
object_trace(f"merging {a.option_string} | {b.option_string}") object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
old_inner = b._inner old_inner = b._inner
@ -243,10 +243,6 @@ class OuterProxy:
return r return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = [] INDEX_DEPENDS_ON: List[str] = []
@property @property
@ -282,7 +278,7 @@ class OuterProxy:
TITEL = "id" TITEL = "id"
@property @property
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") return str(self.__getattribute__(self.TITEL))
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"

View File

@ -22,7 +22,6 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection from .source import Source, SourceCollection
from .target import Target from .target import Target
from .country import Language, Country from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
@ -45,7 +44,7 @@ def get_collection_string(
ignore_titles: Set[str] = None, ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND, background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND, foreground: BColors = OPTION_FOREGROUND,
add_id: bool = DEBUG_PRINT_ID, add_id: bool = True,
) -> str: ) -> str:
if collection.empty: if collection.empty:
return "" return ""
@ -204,7 +203,7 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}") r += get_collection_string(self.feature_artist_collection, " feat. {}")
@ -349,7 +348,7 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
@ -579,7 +578,7 @@ class Artist(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value

View File

@ -30,7 +30,7 @@ class Source:
def __post_init__(self): def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum self.referrer_page = self.referrer_page or self.page_enum
@property @cached_property
def parsed_url(self) -> ParseResult: def parsed_url(self) -> ParseResult:
return urlparse(self.url) return urlparse(self.url)

View File

@ -20,7 +20,6 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG: if DEBUG:
print("DEBUG ACTIVE") print("DEBUG ACTIVE")