Compare commits

...

2 Commits

Author SHA1 Message Date
9c369b421d feat: oh no
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-03 14:52:12 +02:00
be843f2c10 draft: improved debug even more 2024-04-30 17:43:00 +02:00
7 changed files with 156 additions and 107 deletions

View File

@ -7,7 +7,7 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Crystal F", "s: #a Crystal F",
"d: 20", "d: 20"
] ]

View File

@ -2,30 +2,27 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__": if __name__ == "__main__":
album_1 = Album( song_1 = Song(
title="album", title="song",
song_list=[ main_artist_list=[Artist(
Song(title="song", main_artist_list=[Artist(name="artist")]), name="main_artist"
], )],
artist_list=[ feature_artist_list=[Artist(
Artist(name="artist 3"), name="main_artist"
] )]
) )
album_2 = Album( other_artist = Artist(name="other_artist")
title="album",
song_list=[ song_2 = Song(
Song(title="song", main_artist_list=[Artist(name="artist 2")]), title = "song",
], main_artist_list=[other_artist]
artist_list=[
Artist(name="artist"),
]
) )
album_1.merge(album_2) other_artist.name = "main_artist"
print() song_1.merge(song_2)
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
print(id(album_1.artist_collection), id(album_2.artist_collection)) print("#" * 120)
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection)) print("main", *song_1.main_artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@ -40,8 +40,6 @@ class Collection(Generic[T]):
self.pull_from: List[Collection] = [] self.pull_from: List[Collection] = []
self.push_to: List[Collection] = [] self.push_to: List[Collection] = []
self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects # this is to keep track and look up the actual objects
@ -50,10 +48,11 @@ class Collection(Generic[T]):
self.extend(data) self.extend(data)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Collection({id(self)})" return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
def _map_element(self, __object: T, from_map: bool = False): def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
self._unmap_element(__object.id) if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object self._indexed_values["id"][__object.id] = __object
@ -78,106 +77,153 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id] del self._indexed_from_id[obj_id]
def _remap(self): def _remap(self):
for e in self: # reinitialize the mapping to clean it without time consuming operations
self._map_element(e) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]: for e in self._data:
if not no_push_to or True: self._map_element(e, no_unmap=True)
for c in self.push_to:
found, found_in = c._find_object(__object, no_push_to=True)
if found is not None:
output("push to", found, __object, color=BColors.RED)
return found, found_in
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap() self._remap()
for name, value in __object.indexing_values: for name, value in __object.indexing_values:
if value in self._indexed_values[name]: if value in self._indexed_values[name]:
return self._indexed_values[name][value], self return self._indexed_values[name][value]
return None, self return None
def append(self, __object: Optional[T], **kwargs): def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
"""
This function merges the other object into the existing object, which is contained in the current collection.
This also modifies the correct mapping.
"""
if existing.id == other.id:
return
self._map_element(existing)
existing.merge(other, **kwargs)
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
self._map_element(other)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
"""
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
if id(b) in no_sync_collection:
continue
"""
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
# no_sync_collection.add(id(b))
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap) merge into this object. (and remap)
Else append to this collection. Else append to this collection.
:param __object: :param other:
:return: :return:
""" """
if __object is None: if other is None:
return
if other.id in self._indexed_from_id:
return return
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False)) object_trace(f"Appending {other.option_string} to {self}")
if map_to is self: push_to: Optional[Tuple[Collection, T]] = None
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from): for c in self.push_to:
if other is None: r = c._find_object(other)
continue if r is not None:
push_to_collection = (c, r)
output("found push to", found, other, self, color=BColors.RED, sep="\t")
break
output("pull from", other, __object, color=BColors.RED) pull_from: Optional[Tuple[Collection, T]] = None
__object.__merge__(other, no_push_to=False, **kwargs) for c in self.pull_from:
contained.remove(other) r = c._find_object(other)
if r is not None:
pull_from_collection = (c, r)
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
break
if pull_from is not None:
pull_from[0].remove(pull_from[1])
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
if existing_object is None: if existing_object is None:
# append if push_to is None:
self._data.append(__object) self._append_new_object(other, **kwargs)
self._map_element(__object) else:
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
for collection_attribute, child_collection in self.extend_object_to_attribute.items(): if pull_from is not None:
__object.__getattribute__(collection_attribute).extend(child_collection, **kwargs) self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs)
else:
self._merge_into_contained_object(existing_object, other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
def remove(self, *other_list: List[T], silent: bool = False):
for other in other_list:
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items(): for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, **kwargs) other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
# only modify collections if the object actually has been appended self._data.remove(existing)
for attribute, a in self.sync_on_append.items(): self._unmap_element(existing)
b = __object.__getattribute__(attribute)
if a is b:
continue
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set()) def contains(self, other: T) -> bool:
object_trace(f"Syncing [{a}] = [{b}]; {no_sync_collection}") return self._find_object(other) is not None
if id(b) in no_sync_collection:
continue
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
b_data = b.data.copy() if other_collections is None:
b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b))
# kwargs["no_sync_collection"] = no_sync_collection
del b
a.extend(b_data, **kwargs)
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
return
existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
def extend(self, __iterable: Optional[Generator[T, None, None]], **kwargs):
if __iterable is None:
return return
for __object in __iterable: for __object in other_collections:
self.append(__object, **kwargs) self.append(__object, **kwargs)
@property @property
@ -194,8 +240,8 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]: def __iter__(self) -> Iterator[T]:
yield from self._data yield from self._data
def __merge__(self, __other: Collection, **kwargs): def __merge__(self, other: Collection, **kwargs):
self.extend(__other, **kwargs) self.extend(other, **kwargs)
def __getitem__(self, item: int): def __getitem__(self, item: int):
return self._data[item] return self._data[item]

View File

@ -11,7 +11,7 @@ import inspect
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
from ..utils.hacking import MetaClass from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"] LOGGER = logging_settings["object_logger"]
@ -113,7 +113,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs) self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self) self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]") object_trace(f"creating {type(self).__name__} [{self.option_string}]")
self.__init_collections__() self.__init_collections__()
@ -192,7 +192,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]") object_trace(f"merging {a.option_string} | {b.option_string}")
old_inner = b._inner old_inner = b._inner
@ -243,6 +243,10 @@ class OuterProxy:
return r return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = [] INDEX_DEPENDS_ON: List[str] = []
@property @property
@ -278,7 +282,7 @@ class OuterProxy:
TITEL = "id" TITEL = "id"
@property @property
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"

View File

@ -22,6 +22,7 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection from .source import Source, SourceCollection
from .target import Target from .target import Target
from .country import Language, Country from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
@ -44,7 +45,7 @@ def get_collection_string(
ignore_titles: Set[str] = None, ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND, background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND, foreground: BColors = OPTION_FOREGROUND,
add_id: bool = True, add_id: bool = DEBUG_PRINT_ID,
) -> str: ) -> str:
if collection.empty: if collection.empty:
return "" return ""
@ -203,7 +204,7 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}") r += get_collection_string(self.feature_artist_collection, " feat. {}")
@ -348,7 +349,7 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
@ -578,7 +579,7 @@ class Artist(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value

View File

@ -30,7 +30,7 @@ class Source:
def __post_init__(self): def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum self.referrer_page = self.referrer_page or self.page_enum
@cached_property @property
def parsed_url(self) -> ParseResult: def parsed_url(self) -> ParseResult:
return urlparse(self.url) return urlparse(self.url)

View File

@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG: if DEBUG:
print("DEBUG ACTIVE") print("DEBUG ACTIVE")