8 Commits

Author SHA1 Message Date
cacff47643 Merge pull request 'fix/bandcamp' (#12) from fix/bandcamp into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #12
2024-04-23 10:04:51 +00:00
0179246ec0 feat: dynamic objects now also have ids
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-04-23 11:52:08 +02:00
3d432cd0d7 fix: test
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-23 11:44:39 +02:00
0080a48e70 feat: removed legacy code 2024-04-23 11:39:25 +02:00
ea5adfbe8a feat: limited complexity of collection by removing child collections 2024-04-23 11:37:49 +02:00
fa723d7747 feat: removed redundand collection functions
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-23 09:19:06 +02:00
312e57d82e feat: progress
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-19 17:48:42 +02:00
a998e52cd9 fix: syncing
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-19 17:45:49 +02:00
7 changed files with 131 additions and 306 deletions

View File

@@ -2,91 +2,30 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__":
artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist 3"),
]
)
other_artist: Artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(
title="album",
albumsort=123,
main_artist=Artist(name="other_artist"),
),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
],
artist_list=[
Artist(name="artist"),
]
)
artist.merge(other_artist)
album_1.merge(album_2)
a = artist.main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
d.title = "new_title"
print(a.id, a.title, a.barcode, a.albumsort)
print(b.id, b.title, b.barcode, b.albumsort)
print(c.id, c.title, c.barcode, c.albumsort)
print(d.id, d.title, d.barcode, d.albumsort)
print(e.id, e.title, e.barcode, e.albumsort)
print(f.id, f.title, f.barcode, f.albumsort)
print(g.id, g.title, g.barcode, g.albumsort)
print()
print(artist.main_album_collection._indexed_values)
print(id(album_1.artist_collection), id(album_2.artist_collection))
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
from .parents import OuterProxy
from ..utils import object_trace
T = TypeVar('T', bound=OuterProxy)
@@ -21,186 +22,62 @@ class Collection(Generic[T]):
self,
data: Optional[Iterable[T]] = None,
sync_on_append: Dict[str, Collection] = None,
contain_given_in_attribute: Dict[str, Collection] = None,
contain_attribute_in_given: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None
append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None,
) -> None:
self._collection_for: dict = dict()
self._contains_ids = set()
self._data = []
self.parents: List[Collection[T]] = []
self.children: List[Collection[T]] = []
# List of collection attributes that should be modified on append
# Key: collection attribute (str) of appended element
# Value: main collection to sync to
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self._id_to_index_values: Dict[int, set] = defaultdict(set)
self._indexed_values = defaultdict(lambda: None)
self._indexed_to_objects = defaultdict(lambda: None)
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
# this is to keep track and look up the actual objects
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
self.extend(data)
def _map_element(self, __object: T, from_map: bool = False):
self._contains_ids.add(__object.id)
def __repr__(self) -> str:
return f"Collection({id(self)})"
for name, value in (*__object.indexing_values, ('id', __object.id)):
def _map_element(self, __object: T, from_map: bool = False):
self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id
self._indexed_values["id"][__object.id] = __object
for name, value in __object.indexing_values:
if value is None or value == __object._inner._default_values.get(name):
continue
self._indexed_values[name] = value
self._indexed_to_objects[value] = __object
self._id_to_index_values[__object.id].add((name, value))
self._indexed_values[name][value] = __object
self._indexed_from_id[__object.id][name] = value
def _unmap_element(self, __object: Union[T, int]):
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
if obj_id in self._contains_ids:
self._contains_ids.remove(obj_id)
for name, value in self._id_to_index_values[obj_id]:
if name in self._indexed_values:
del self._indexed_values[name]
if value in self._indexed_to_objects:
del self._indexed_to_objects[value]
del self._id_to_index_values[obj_id]
def _contained_in_self(self, __object: T) -> bool:
if __object.id in self._contains_ids:
return True
for name, value in __object.indexing_values:
if value is None:
continue
if value == self._indexed_values[name]:
return True
return False
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List[Collection]:
"""
Gets the collection this object is found in, if it is found in any.
:param __object:
:param break_at_first:
:return:
"""
results = []
if self._contained_in_self(__object):
return [self]
for collection in self.children:
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
if break_at_first:
return results
return results
def _get_root_collections(self) -> List[Collection]:
if not len(self.parents):
return [self]
root_collections = []
for upper_collection in self.parents:
root_collections.extend(upper_collection._get_root_collections())
return root_collections
@property
def _is_root(self) -> bool:
return len(self.parents) <= 0
def _get_parents_of_multiple_contained_children(self, __object: T):
results = []
if len(self.children) < 2 or self._contained_in_self(__object):
return results
count = 0
for collection in self.children:
sub_results = collection._get_parents_of_multiple_contained_children(__object)
if len(sub_results) > 0:
count += 1
results.extend(sub_results)
if count >= 2:
results.append(self)
return results
def merge_into_self(self, __object: T, from_map: bool = False):
"""
1. find existing objects
2. merge into existing object
3. remap existing object
"""
if __object.id in self._contains_ids:
if obj_id not in self._indexed_from_id:
return
existing_object: T = None
for name, value in self._indexed_from_id[obj_id].items():
if value in self._indexed_values[name]:
del self._indexed_values[name][value]
del self._indexed_from_id[obj_id]
def _find_object(self, __object: T) -> Optional[T]:
for name, value in __object.indexing_values:
if value is None:
continue
if value == self._indexed_values[name]:
existing_object = self._indexed_to_objects[value]
if existing_object.id == __object.id:
return None
break
if existing_object is None:
return None
existing_object.merge(__object)
# just a check if it really worked
if existing_object.id != __object.id:
raise ValueError("This should NEVER happen. Merging doesn't work.")
self._map_element(existing_object, from_map=from_map)
def contains(self, __object: T) -> bool:
return len(self._contained_in_sub(__object)) > 0
def _find_object_in_self(self, __object: T) -> Optional[T]:
for name, value in __object.indexing_values:
if value == self._indexed_values[name]:
return self._indexed_to_objects[value]
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
other_object = self._find_object_in_self(__object)
if other_object is not None:
return self, other_object
for c in self.children:
o, other_object = c._find_object(__object)
if other_object is not None:
return o, other_object
if no_sibling:
return self, None
"""
# find in siblings and all children of siblings
for parent in self.parents:
for sibling in parent.children:
if sibling is self:
continue
o, other_object = sibling._find_object(__object, no_sibling=True)
if other_object is not None:
return o, other_object
"""
return self, None
if value in self._indexed_values[name]:
return self._indexed_values[name][value]
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
"""
@@ -217,23 +94,32 @@ class Collection(Generic[T]):
if __object is None:
return
append_to, existing_object = self._find_object(__object)
existing_object = self._find_object(__object)
if existing_object is None:
# append
append_to._data.append(__object)
append_to._map_element(__object)
self._data.append(__object)
self._map_element(__object)
# only modify collections if the object actually has been appended
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
__object.__getattribute__(collection_attribute).extend(child_collection)
for attribute, new_object in self.append_object_to_attribute.items():
__object.__getattribute__(attribute).append(new_object)
for attribute, collection in self.sync_on_append.items():
collection.extend(__object.__getattribute__(attribute))
__object.__setattr__(attribute, collection)
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]")
data_to_extend = b.data
a._collection_for.update(b._collection_for)
for synced_with, key in b._collection_for.items():
synced_with.__setattr__(key, a)
a.extend(data_to_extend)
else:
# merge only if the two objects are not the same
@@ -245,9 +131,9 @@ class Collection(Generic[T]):
existing_object.merge(__object)
if existing_object.id != old_id:
append_to._unmap_element(old_id)
self._unmap_element(old_id)
append_to._map_element(existing_object)
self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None:
@@ -256,54 +142,22 @@ class Collection(Generic[T]):
for __object in __iterable:
self.append(__object)
def contain_collection_inside(self, sub_collection: Collection, _object: T):
"""
This collection will ALWAYS contain everything from the passed in collection
"""
if self is sub_collection or sub_collection in self.children:
return
_object._inner._is_collection_child[self] = sub_collection
_object._inner._is_collection_parent[sub_collection] = self
self.children.append(sub_collection)
sub_collection.parents.append(self)
@property
def data(self) -> List[T]:
return list(self.__iter__())
def __len__(self) -> int:
return len(self._data) + sum(len(collection) for collection in self.children)
return len(self._data)
@property
def empty(self) -> bool:
return self.__len__() <= 0
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
_finished_ids = finished_ids or set()
for element in self._data:
if element.id in _finished_ids:
continue
_finished_ids.add(element.id)
yield element
for c in self.children:
yield from c.__iter__(finished_ids=finished_ids)
def __iter__(self) -> Iterator[T]:
yield from self._data
def __merge__(self, __other: Collection, override: bool = False):
self.extend(__other)
def __getitem__(self, item: int):
if item < len(self._data):
return self._data[item]
item = item - len(self._data)
for c in self.children:
if item < len(c):
return c.__getitem__(item)
item = item - len(c._data)
raise IndexError
return self._data[item]

View File

@@ -34,18 +34,19 @@ class InnerData:
self._refers_to_instances = set()
self._fetched_from: dict = {}
# collection : collection that is a collection of self
self._is_collection_child: Dict[Collection, Collection] = {}
self._is_collection_parent: Dict[Collection, Collection] = {}
# initialize the default values
self._default_values = {}
for name, factory in object_type._default_factories.items():
self._default_values[name] = factory()
for key, value in kwargs.items():
if hasattr(value, "__is_collection__"):
value._collection_for[self] = key
self.__setattr__(key, value)
def __hash__(self):
return self.id
def __merge__(self, __other: InnerData, override: bool = False):
"""
:param __other:
@@ -88,7 +89,7 @@ class OuterProxy:
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
_automatic_id: bool = False
if _id is None and not dynamic:
if _id is None:
"""
generates a random integer id
the range is defined in the config
@@ -116,6 +117,7 @@ class OuterProxy:
self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
self.__init_collections__()
for name, data_list in collection_data.items():
@@ -195,18 +197,6 @@ class OuterProxy:
a, b = b, a
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
for collection, child_collection in b._inner._is_collection_child.items():
try:
collection.children.remove(child_collection)
except ValueError:
pass
for collection, parent_collection in b._inner._is_collection_parent.items():
try:
collection.parents.remove(parent_collection)
except ValueError:
pass
old_inner = b._inner

View File

@@ -93,8 +93,7 @@ class Song(Base):
self.album_collection.append_object_to_attribute = {
"song_collection": self,
}
self.main_artist_collection.contain_given_in_attribute = {
self.main_artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection
}
self.feature_artist_collection.append_object_to_attribute = {
@@ -204,6 +203,7 @@ class Album(Base):
notes: FormattedText
source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song]
label_collection: Collection[Label]
@@ -253,7 +253,7 @@ class Album(Base):
self.artist_collection.append_object_to_attribute = {
"main_album_collection": self
}
self.artist_collection.contain_given_in_attribute = {
self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection
}

View File

@@ -2,6 +2,7 @@ from datetime import datetime
from pathlib import Path
import json
import logging
import inspect
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config

View File

@@ -15,8 +15,8 @@ __stage__ = os.getenv("STAGE", "prod")
DEBUG = (__stage__ == "dev") and True
DEBUG_LOGGING = DEBUG and False
DEBUG_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE = DEBUG and True
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG and False
DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False

View File

@@ -70,7 +70,49 @@ class TestCollection(unittest.TestCase):
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
def test_artist_artist_relation(self):
artist = Artist(
name="artist",
main_album_list=[
Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
]
)
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
def test_artist_collection_sync(self):
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
]
)
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
]
)
album_1.merge(album_2)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
@@ -80,7 +122,6 @@ class TestCollection(unittest.TestCase):
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
"""
if __name__ == "__main__":
unittest.main()