Compare commits

..

2 Commits

Author SHA1 Message Date
9c369b421d feat: oh no
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-03 14:52:12 +02:00
be843f2c10 draft: improved debug even more 2024-04-30 17:43:00 +02:00
7 changed files with 156 additions and 107 deletions

View File

@ -7,7 +7,7 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Crystal F",
"d: 20",
"d: 20"
]

View File

@ -2,30 +2,27 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist 3"),
]
song_1 = Song(
title="song",
main_artist_list=[Artist(
name="main_artist"
)],
feature_artist_list=[Artist(
name="main_artist"
)]
)
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
],
artist_list=[
Artist(name="artist"),
]
other_artist = Artist(name="other_artist")
song_2 = Song(
title = "song",
main_artist_list=[other_artist]
)
album_1.merge(album_2)
other_artist.name = "main_artist"
print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
song_1.merge(song_2)
print(id(album_1.artist_collection), id(album_2.artist_collection))
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
print("#" * 120)
print("main", *song_1.main_artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@ -40,8 +40,6 @@ class Collection(Generic[T]):
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
@ -50,9 +48,10 @@ class Collection(Generic[T]):
self.extend(data)
def __repr__(self) -> str:
return f"Collection({id(self)})"
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
def _map_element(self, __object: T, from_map: bool = False):
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
@ -78,106 +77,153 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id]
def _remap(self):
for e in self:
self._map_element(e)
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
if not no_push_to or True:
for c in self.push_to:
found, found_in = c._find_object(__object, no_push_to=True)
if found is not None:
output("push to", found, __object, color=BColors.RED)
return found, found_in
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
for name, value in __object.indexing_values:
if value in self._indexed_values[name]:
return self._indexed_values[name][value], self
return self._indexed_values[name][value]
return None, self
return None
def append(self, __object: Optional[T], **kwargs):
def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param __object:
:return:
This function merges the other object into the existing object, which is contained in the current collection.
This also modifies the correct mapping.
"""
if __object is None:
if existing.id == other.id:
return
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
self._map_element(existing)
existing.merge(other, **kwargs)
if map_to is self:
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
if other is None:
continue
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
output("pull from", other, __object, color=BColors.RED)
__object.__merge__(other, no_push_to=False, **kwargs)
contained.remove(other)
self._data.append(other)
self._map_element(other)
if existing_object is None:
# append
self._data.append(__object)
self._map_element(__object)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection, **kwargs)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object, **kwargs)
other.__getattribute__(attribute).append(new_object, **kwargs)
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
"""
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
object_trace(f"Syncing [{a}] = [{b}]; {no_sync_collection}")
if id(b) in no_sync_collection:
continue
"""
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b))
# kwargs["no_sync_collection"] = no_sync_collection
del b
# no_sync_collection.add(id(b))
a.extend(b_data, **kwargs)
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap)
Else append to this collection.
:param other:
:return:
"""
if other is None:
return
if other.id in self._indexed_from_id:
return
object_trace(f"Appending {other.option_string} to {self}")
push_to: Optional[Tuple[Collection, T]] = None
for c in self.push_to:
r = c._find_object(other)
if r is not None:
push_to_collection = (c, r)
output("found push to", found, other, self, color=BColors.RED, sep="\t")
break
pull_from: Optional[Tuple[Collection, T]] = None
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
pull_from_collection = (c, r)
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
break
if pull_from is not None:
pull_from[0].remove(pull_from[1])
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
if existing_object is None:
if push_to is None:
self._append_new_object(other, **kwargs)
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs)
else:
self._merge_into_contained_object(existing_object, other, **kwargs)
if pull_from is not None:
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
def remove(self, *other_list: List[T], silent: bool = False):
for other in other_list:
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
self._data.remove(existing)
self._unmap_element(existing)
def contains(self, other: T) -> bool:
return self._find_object(other) is not None
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return
existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
def extend(self, __iterable: Optional[Generator[T, None, None]], **kwargs):
if __iterable is None:
return
for __object in __iterable:
for __object in other_collections:
self.append(__object, **kwargs)
@property
@ -194,8 +240,8 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, **kwargs):
self.extend(__other, **kwargs)
def __merge__(self, other: Collection, **kwargs):
self.extend(other, **kwargs)
def __getitem__(self, item: int):
return self._data[item]

View File

@ -11,7 +11,7 @@ import inspect
from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"]
@ -113,7 +113,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
object_trace(f"creating {type(self).__name__} [{self.option_string}]")
self.__init_collections__()
@ -192,7 +192,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
object_trace(f"merging {a.option_string} | {b.option_string}")
old_inner = b._inner
@ -243,6 +243,10 @@ class OuterProxy:
return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = []
@property
@ -278,7 +282,7 @@ class OuterProxy:
TITEL = "id"
@property
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL))
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"

View File

@ -22,6 +22,7 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .parents import OuterProxy as Base
@ -44,7 +45,7 @@ def get_collection_string(
ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND,
add_id: bool = True,
add_id: bool = DEBUG_PRINT_ID,
) -> str:
if collection.empty:
return ""
@ -203,7 +204,7 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
@ -348,7 +349,7 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
@ -578,7 +579,7 @@ class Artist(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value

View File

@ -30,7 +30,7 @@ class Source:
def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum
@cached_property
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)

View File

@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:
print("DEBUG ACTIVE")