Compare commits
2 Commits
4510520db6
...
9c369b421d
Author | SHA1 | Date | |
---|---|---|---|
9c369b421d | |||
be843f2c10 |
@ -7,7 +7,7 @@ logging.getLogger().setLevel(logging.DEBUG)
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
commands = [
|
commands = [
|
||||||
"s: #a Crystal F",
|
"s: #a Crystal F",
|
||||||
"d: 20",
|
"d: 20"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -2,30 +2,27 @@ import music_kraken
|
|||||||
from music_kraken.objects import Song, Album, Artist, Collection
|
from music_kraken.objects import Song, Album, Artist, Collection
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
album_1 = Album(
|
song_1 = Song(
|
||||||
title="album",
|
title="song",
|
||||||
song_list=[
|
main_artist_list=[Artist(
|
||||||
Song(title="song", main_artist_list=[Artist(name="artist")]),
|
name="main_artist"
|
||||||
],
|
)],
|
||||||
artist_list=[
|
feature_artist_list=[Artist(
|
||||||
Artist(name="artist 3"),
|
name="main_artist"
|
||||||
]
|
)]
|
||||||
)
|
)
|
||||||
|
|
||||||
album_2 = Album(
|
other_artist = Artist(name="other_artist")
|
||||||
title="album",
|
|
||||||
song_list=[
|
song_2 = Song(
|
||||||
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
|
title = "song",
|
||||||
],
|
main_artist_list=[other_artist]
|
||||||
artist_list=[
|
|
||||||
Artist(name="artist"),
|
|
||||||
]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
album_1.merge(album_2)
|
other_artist.name = "main_artist"
|
||||||
|
|
||||||
print()
|
song_1.merge(song_2)
|
||||||
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
|
|
||||||
|
|
||||||
print(id(album_1.artist_collection), id(album_2.artist_collection))
|
print("#" * 120)
|
||||||
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
|
print("main", *song_1.main_artist_collection)
|
||||||
|
print("feat", *song_1.feature_artist_collection)
|
||||||
|
@ -40,8 +40,6 @@ class Collection(Generic[T]):
|
|||||||
self.pull_from: List[Collection] = []
|
self.pull_from: List[Collection] = []
|
||||||
self.push_to: List[Collection] = []
|
self.push_to: List[Collection] = []
|
||||||
|
|
||||||
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
|
||||||
|
|
||||||
# This is to cleanly unmap previously mapped items by their id
|
# This is to cleanly unmap previously mapped items by their id
|
||||||
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
||||||
# this is to keep track and look up the actual objects
|
# this is to keep track and look up the actual objects
|
||||||
@ -50,10 +48,11 @@ class Collection(Generic[T]):
|
|||||||
self.extend(data)
|
self.extend(data)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"Collection({id(self)})"
|
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
|
||||||
|
|
||||||
def _map_element(self, __object: T, from_map: bool = False):
|
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
|
||||||
self._unmap_element(__object.id)
|
if not no_unmap:
|
||||||
|
self._unmap_element(__object.id)
|
||||||
|
|
||||||
self._indexed_from_id[__object.id]["id"] = __object.id
|
self._indexed_from_id[__object.id]["id"] = __object.id
|
||||||
self._indexed_values["id"][__object.id] = __object
|
self._indexed_values["id"][__object.id] = __object
|
||||||
@ -78,106 +77,153 @@ class Collection(Generic[T]):
|
|||||||
del self._indexed_from_id[obj_id]
|
del self._indexed_from_id[obj_id]
|
||||||
|
|
||||||
def _remap(self):
|
def _remap(self):
|
||||||
for e in self:
|
# reinitialize the mapping to clean it without time consuming operations
|
||||||
self._map_element(e)
|
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
||||||
|
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
|
||||||
|
|
||||||
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
|
for e in self._data:
|
||||||
if not no_push_to or True:
|
self._map_element(e, no_unmap=True)
|
||||||
for c in self.push_to:
|
|
||||||
found, found_in = c._find_object(__object, no_push_to=True)
|
|
||||||
if found is not None:
|
|
||||||
output("push to", found, __object, color=BColors.RED)
|
|
||||||
return found, found_in
|
|
||||||
|
|
||||||
|
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
|
||||||
self._remap()
|
self._remap()
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
for name, value in __object.indexing_values:
|
||||||
if value in self._indexed_values[name]:
|
if value in self._indexed_values[name]:
|
||||||
return self._indexed_values[name][value], self
|
return self._indexed_values[name][value]
|
||||||
|
|
||||||
return None, self
|
return None
|
||||||
|
|
||||||
def append(self, __object: Optional[T], **kwargs):
|
def _merge_into_contained_object(self, existing: T, other: T, **kwargs):
|
||||||
|
"""
|
||||||
|
This function merges the other object into the existing object, which is contained in the current collection.
|
||||||
|
This also modifies the correct mapping.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if existing.id == other.id:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._map_element(existing)
|
||||||
|
existing.merge(other, **kwargs)
|
||||||
|
|
||||||
|
def _append_new_object(self, other: T, **kwargs):
|
||||||
|
"""
|
||||||
|
This function appends the other object to the current collection.
|
||||||
|
This only works if not another object, which represents the same real life object exists in the collection.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._data.append(other)
|
||||||
|
self._map_element(other)
|
||||||
|
|
||||||
|
# all of the existing hooks to get the defined datastructure
|
||||||
|
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
||||||
|
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
|
||||||
|
|
||||||
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
|
other.__getattribute__(attribute).append(new_object, **kwargs)
|
||||||
|
|
||||||
|
for attribute, a in self.sync_on_append.items():
|
||||||
|
# syncing two collections by reference
|
||||||
|
b = other.__getattribute__(attribute)
|
||||||
|
if a is b:
|
||||||
|
continue
|
||||||
|
|
||||||
|
"""
|
||||||
|
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
|
||||||
|
if id(b) in no_sync_collection:
|
||||||
|
continue
|
||||||
|
"""
|
||||||
|
object_trace(f"Syncing [{a}] = [{b}]")
|
||||||
|
|
||||||
|
|
||||||
|
b_data = b.data.copy()
|
||||||
|
b_collection_for = b._collection_for.copy()
|
||||||
|
# no_sync_collection.add(id(b))
|
||||||
|
|
||||||
|
del b
|
||||||
|
|
||||||
|
for synced_with, key in b_collection_for.items():
|
||||||
|
synced_with.__setattr__(key, a)
|
||||||
|
a._collection_for[synced_with] = key
|
||||||
|
|
||||||
|
a.extend(b_data, **kwargs)
|
||||||
|
|
||||||
|
def append(self, other: Optional[T], **kwargs):
|
||||||
"""
|
"""
|
||||||
If an object, that represents the same entity exists in a relevant collection,
|
If an object, that represents the same entity exists in a relevant collection,
|
||||||
merge into this object. (and remap)
|
merge into this object. (and remap)
|
||||||
Else append to this collection.
|
Else append to this collection.
|
||||||
|
|
||||||
:param __object:
|
:param other:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
if __object is None:
|
if other is None:
|
||||||
|
return
|
||||||
|
if other.id in self._indexed_from_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
|
object_trace(f"Appending {other.option_string} to {self}")
|
||||||
|
|
||||||
if map_to is self:
|
push_to: Optional[Tuple[Collection, T]] = None
|
||||||
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
|
for c in self.push_to:
|
||||||
if other is None:
|
r = c._find_object(other)
|
||||||
continue
|
if r is not None:
|
||||||
|
push_to_collection = (c, r)
|
||||||
|
output("found push to", found, other, self, color=BColors.RED, sep="\t")
|
||||||
|
break
|
||||||
|
|
||||||
output("pull from", other, __object, color=BColors.RED)
|
pull_from: Optional[Tuple[Collection, T]] = None
|
||||||
__object.__merge__(other, no_push_to=False, **kwargs)
|
for c in self.pull_from:
|
||||||
contained.remove(other)
|
r = c._find_object(other)
|
||||||
|
if r is not None:
|
||||||
|
pull_from_collection = (c, r)
|
||||||
|
output("found pull from", found, other, self, color=BColors.RED, sep="\t")
|
||||||
|
break
|
||||||
|
|
||||||
|
if pull_from is not None:
|
||||||
|
pull_from[0].remove(pull_from[1])
|
||||||
|
|
||||||
|
existing_object = self._find_object(other, no_push_to=kwargs.get("no_push_to", False))
|
||||||
|
|
||||||
if existing_object is None:
|
if existing_object is None:
|
||||||
# append
|
if push_to is None:
|
||||||
self._data.append(__object)
|
self._append_new_object(other, **kwargs)
|
||||||
self._map_element(__object)
|
else:
|
||||||
|
push_to[0]._merge_into_contained_object(push_to[1], other, **kwargs)
|
||||||
|
|
||||||
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
|
if pull_from is not None:
|
||||||
__object.__getattribute__(collection_attribute).extend(child_collection, **kwargs)
|
self._merge_into_contained_object(other if push_to is None else push_to[1], pull_from[1], **kwargs)
|
||||||
|
else:
|
||||||
|
self._merge_into_contained_object(existing_object, other, **kwargs)
|
||||||
|
if pull_from is not None:
|
||||||
|
self._merge_into_contained_object(existing_object, pull_from[1], **kwargs)
|
||||||
|
|
||||||
|
def remove(self, *other_list: List[T], silent: bool = False):
|
||||||
|
for other in other_list:
|
||||||
|
existing: Optional[T] = self._indexed_values["id"].get(other.id, None)
|
||||||
|
if existing is None:
|
||||||
|
if not silent:
|
||||||
|
raise ValueError(f"Object {other} not found in {self}")
|
||||||
|
return other
|
||||||
|
|
||||||
|
for collection_attribute, generator in self.extend_object_to_attribute.items():
|
||||||
|
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
|
||||||
|
|
||||||
for attribute, new_object in self.append_object_to_attribute.items():
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
__object.__getattribute__(attribute).append(new_object, **kwargs)
|
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
|
||||||
|
|
||||||
# only modify collections if the object actually has been appended
|
self._data.remove(existing)
|
||||||
for attribute, a in self.sync_on_append.items():
|
self._unmap_element(existing)
|
||||||
b = __object.__getattribute__(attribute)
|
|
||||||
if a is b:
|
|
||||||
continue
|
|
||||||
|
|
||||||
no_sync_collection: Set[Collection] = kwargs.get("no_sync_collection", set())
|
def contains(self, other: T) -> bool:
|
||||||
object_trace(f"Syncing [{a}] = [{b}]; {no_sync_collection}")
|
return self._find_object(other) is not None
|
||||||
if id(b) in no_sync_collection:
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
|
||||||
b_data = b.data.copy()
|
if other_collections is None:
|
||||||
b_collection_for = b._collection_for.copy()
|
|
||||||
no_sync_collection.add(id(b))
|
|
||||||
# kwargs["no_sync_collection"] = no_sync_collection
|
|
||||||
del b
|
|
||||||
|
|
||||||
a.extend(b_data, **kwargs)
|
|
||||||
|
|
||||||
for synced_with, key in b_collection_for.items():
|
|
||||||
synced_with.__setattr__(key, a)
|
|
||||||
a._collection_for[synced_with] = key
|
|
||||||
|
|
||||||
else:
|
|
||||||
# merge only if the two objects are not the same
|
|
||||||
if existing_object.id == __object.id:
|
|
||||||
return
|
|
||||||
|
|
||||||
existing_object.merge(__object, **kwargs)
|
|
||||||
map_to._map_element(existing_object)
|
|
||||||
|
|
||||||
def remove(self, __object: T) -> T:
|
|
||||||
self._data.remove(__object)
|
|
||||||
self._unmap_element(__object)
|
|
||||||
return __object
|
|
||||||
|
|
||||||
def contains(self, __object: T) -> bool:
|
|
||||||
return self._find_object(__object) is not None
|
|
||||||
|
|
||||||
def extend(self, __iterable: Optional[Generator[T, None, None]], **kwargs):
|
|
||||||
if __iterable is None:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
for __object in __iterable:
|
for __object in other_collections:
|
||||||
self.append(__object, **kwargs)
|
self.append(__object, **kwargs)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -194,8 +240,8 @@ class Collection(Generic[T]):
|
|||||||
def __iter__(self) -> Iterator[T]:
|
def __iter__(self) -> Iterator[T]:
|
||||||
yield from self._data
|
yield from self._data
|
||||||
|
|
||||||
def __merge__(self, __other: Collection, **kwargs):
|
def __merge__(self, other: Collection, **kwargs):
|
||||||
self.extend(__other, **kwargs)
|
self.extend(other, **kwargs)
|
||||||
|
|
||||||
def __getitem__(self, item: int):
|
def __getitem__(self, item: int):
|
||||||
return self._data[item]
|
return self._data[item]
|
||||||
|
@ -11,7 +11,7 @@ import inspect
|
|||||||
from .metadata import Metadata
|
from .metadata import Metadata
|
||||||
from ..utils import get_unix_time, object_trace, generate_id
|
from ..utils import get_unix_time, object_trace, generate_id
|
||||||
from ..utils.config import logging_settings, main_settings
|
from ..utils.config import logging_settings, main_settings
|
||||||
from ..utils.shared import HIGHEST_ID
|
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID
|
||||||
from ..utils.hacking import MetaClass
|
from ..utils.hacking import MetaClass
|
||||||
|
|
||||||
LOGGER = logging_settings["object_logger"]
|
LOGGER = logging_settings["object_logger"]
|
||||||
@ -113,7 +113,7 @@ class OuterProxy:
|
|||||||
self._inner: InnerData = InnerData(type(self), **kwargs)
|
self._inner: InnerData = InnerData(type(self), **kwargs)
|
||||||
self._inner._refers_to_instances.add(self)
|
self._inner._refers_to_instances.add(self)
|
||||||
|
|
||||||
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
|
object_trace(f"creating {type(self).__name__} [{self.option_string}]")
|
||||||
|
|
||||||
self.__init_collections__()
|
self.__init_collections__()
|
||||||
|
|
||||||
@ -192,7 +192,7 @@ class OuterProxy:
|
|||||||
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
|
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
|
||||||
a, b = b, a
|
a, b = b, a
|
||||||
|
|
||||||
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
|
object_trace(f"merging {a.option_string} | {b.option_string}")
|
||||||
|
|
||||||
old_inner = b._inner
|
old_inner = b._inner
|
||||||
|
|
||||||
@ -243,6 +243,10 @@ class OuterProxy:
|
|||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
@property
|
||||||
|
def option_string(self) -> str:
|
||||||
|
return self.title_string
|
||||||
|
|
||||||
INDEX_DEPENDS_ON: List[str] = []
|
INDEX_DEPENDS_ON: List[str] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -278,7 +282,7 @@ class OuterProxy:
|
|||||||
TITEL = "id"
|
TITEL = "id"
|
||||||
@property
|
@property
|
||||||
def title_string(self) -> str:
|
def title_string(self) -> str:
|
||||||
return str(self.__getattribute__(self.TITEL))
|
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f"{type(self).__name__}({self.title_string})"
|
return f"{type(self).__name__}({self.title_string})"
|
||||||
|
@ -22,6 +22,7 @@ from .parents import OuterProxy, P
|
|||||||
from .source import Source, SourceCollection
|
from .source import Source, SourceCollection
|
||||||
from .target import Target
|
from .target import Target
|
||||||
from .country import Language, Country
|
from .country import Language, Country
|
||||||
|
from ..utils.shared import DEBUG_PRINT_ID
|
||||||
from ..utils.string_processing import unify
|
from ..utils.string_processing import unify
|
||||||
|
|
||||||
from .parents import OuterProxy as Base
|
from .parents import OuterProxy as Base
|
||||||
@ -44,7 +45,7 @@ def get_collection_string(
|
|||||||
ignore_titles: Set[str] = None,
|
ignore_titles: Set[str] = None,
|
||||||
background: BColors = OPTION_BACKGROUND,
|
background: BColors = OPTION_BACKGROUND,
|
||||||
foreground: BColors = OPTION_FOREGROUND,
|
foreground: BColors = OPTION_FOREGROUND,
|
||||||
add_id: bool = True,
|
add_id: bool = DEBUG_PRINT_ID,
|
||||||
) -> str:
|
) -> str:
|
||||||
if collection.empty:
|
if collection.empty:
|
||||||
return ""
|
return ""
|
||||||
@ -203,7 +204,7 @@ class Song(Base):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def option_string(self) -> str:
|
def option_string(self) -> str:
|
||||||
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
|
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
|
||||||
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
|
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
|
||||||
r += get_collection_string(self.main_artist_collection, " by {}")
|
r += get_collection_string(self.main_artist_collection, " by {}")
|
||||||
r += get_collection_string(self.feature_artist_collection, " feat. {}")
|
r += get_collection_string(self.feature_artist_collection, " feat. {}")
|
||||||
@ -348,7 +349,7 @@ class Album(Base):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def option_string(self) -> str:
|
def option_string(self) -> str:
|
||||||
r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
|
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
|
||||||
r += get_collection_string(self.artist_collection, " by {}")
|
r += get_collection_string(self.artist_collection, " by {}")
|
||||||
r += get_collection_string(self.label_collection, " under {}")
|
r += get_collection_string(self.label_collection, " under {}")
|
||||||
|
|
||||||
@ -578,7 +579,7 @@ class Artist(Base):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def option_string(self) -> str:
|
def option_string(self) -> str:
|
||||||
r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
|
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
|
||||||
r += get_collection_string(self.label_collection, " under {}")
|
r += get_collection_string(self.label_collection, " under {}")
|
||||||
|
|
||||||
r += OPTION_BACKGROUND.value
|
r += OPTION_BACKGROUND.value
|
||||||
|
@ -30,7 +30,7 @@ class Source:
|
|||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
self.referrer_page = self.referrer_page or self.page_enum
|
self.referrer_page = self.referrer_page or self.page_enum
|
||||||
|
|
||||||
@cached_property
|
@property
|
||||||
def parsed_url(self) -> ParseResult:
|
def parsed_url(self) -> ParseResult:
|
||||||
return urlparse(self.url)
|
return urlparse(self.url)
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
|||||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||||
DEBUG_PAGES = DEBUG and False
|
DEBUG_PAGES = DEBUG and False
|
||||||
DEBUG_DUMP = DEBUG and False
|
DEBUG_DUMP = DEBUG and False
|
||||||
|
DEBUG_PRINT_ID = DEBUG and True
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
print("DEBUG ACTIVE")
|
print("DEBUG ACTIVE")
|
||||||
|
Loading…
Reference in New Issue
Block a user