Compare commits
10 Commits
92495f73fd
...
4473ff08d3
Author | SHA1 | Date | |
---|---|---|---|
|
4473ff08d3 | ||
|
bdbaeceda8 | ||
9366eb46fd | |||
7af8bacfab | |||
60dc5b2558 | |||
70b86b5c47 | |||
6936c9da9d | |||
28e1350b01 | |||
3aef267608 | |||
29e2b4616e |
6
.vscode/settings.json
vendored
6
.vscode/settings.json
vendored
@ -19,9 +19,13 @@
|
|||||||
"APIC",
|
"APIC",
|
||||||
"Bandcamp",
|
"Bandcamp",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
|
"encyclopaedia",
|
||||||
"levenshtein",
|
"levenshtein",
|
||||||
|
"metallum",
|
||||||
|
"musify",
|
||||||
"OKBLUE",
|
"OKBLUE",
|
||||||
"Referer",
|
"Referer",
|
||||||
"tracksort"
|
"tracksort",
|
||||||
|
"youtube"
|
||||||
]
|
]
|
||||||
}
|
}
|
57
development/objects_collection.py
Normal file
57
development/objects_collection.py
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
import music_kraken
|
||||||
|
from music_kraken.objects import Song, Album, Artist, Collection
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
artist: Artist = Artist(
|
||||||
|
name="artist",
|
||||||
|
main_album_list=[
|
||||||
|
Album(
|
||||||
|
title="album",
|
||||||
|
song_list=[
|
||||||
|
Song(
|
||||||
|
title="song",
|
||||||
|
album_list=[
|
||||||
|
Album(title="album", albumsort=123),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
Song(
|
||||||
|
title="other_song",
|
||||||
|
album_list=[
|
||||||
|
Album(title="album", albumsort=423),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
),
|
||||||
|
Album(title="album", barcode="1234567890123"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
a = artist.main_album_collection[0]
|
||||||
|
b = a.song_collection[0].album_collection[0]
|
||||||
|
c = a.song_collection[1].album_collection[0]
|
||||||
|
d = b.song_collection[0].album_collection[0]
|
||||||
|
e = d.song_collection[0].album_collection[0]
|
||||||
|
f = e.song_collection[0].album_collection[0]
|
||||||
|
g = f.song_collection[0].album_collection[0]
|
||||||
|
|
||||||
|
print(a.id, a.title, a.barcode, a.albumsort)
|
||||||
|
print(b.id, b.title, b.barcode, b.albumsort)
|
||||||
|
print(c.id, c.title, c.barcode, c.albumsort)
|
||||||
|
print(d.id, d.title, d.barcode, d.albumsort)
|
||||||
|
print(e.id, e.title, e.barcode, e.albumsort)
|
||||||
|
print(f.id, f.title, f.barcode, f.albumsort)
|
||||||
|
print(g.id, g.title, g.barcode, g.albumsort)
|
||||||
|
print()
|
||||||
|
|
||||||
|
d.title = "new_title"
|
||||||
|
|
||||||
|
print(a.id, a.title, a.barcode, a.albumsort)
|
||||||
|
print(b.id, b.title, b.barcode, b.albumsort)
|
||||||
|
print(c.id, c.title, c.barcode, c.albumsort)
|
||||||
|
print(d.id, d.title, d.barcode, d.albumsort)
|
||||||
|
print(e.id, e.title, e.barcode, e.albumsort)
|
||||||
|
print(f.id, f.title, f.barcode, f.albumsort)
|
||||||
|
print(g.id, g.title, g.barcode, g.albumsort)
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(artist.main_album_collection._indexed_values)
|
@ -110,7 +110,7 @@ class Collection(Generic[T]):
|
|||||||
if self._contained_in_self(__object):
|
if self._contained_in_self(__object):
|
||||||
return [self]
|
return [self]
|
||||||
|
|
||||||
for collection in self.children:
|
for collection in (*self.children, *self.parents):
|
||||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
||||||
|
|
||||||
if break_at_first:
|
if break_at_first:
|
||||||
|
@ -1,257 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from collections import defaultdict
|
|
||||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
|
|
||||||
from .parents import OuterProxy
|
|
||||||
|
|
||||||
T = TypeVar('T', bound=OuterProxy)
|
|
||||||
|
|
||||||
|
|
||||||
class Collection(Generic[T]):
|
|
||||||
_data: List[T]
|
|
||||||
|
|
||||||
_indexed_values: Dict[str, set]
|
|
||||||
_indexed_to_objects: Dict[any, list]
|
|
||||||
|
|
||||||
shallow_list = property(fget=lambda self: self.data)
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
data: Optional[Iterable[T]] = None,
|
|
||||||
sync_on_append: Dict[str, "Collection"] = None,
|
|
||||||
contain_given_in_attribute: Dict[str, "Collection"] = None,
|
|
||||||
contain_attribute_in_given: Dict[str, "Collection"] = None,
|
|
||||||
append_object_to_attribute: Dict[str, T] = None
|
|
||||||
) -> None:
|
|
||||||
self._contains_ids = set()
|
|
||||||
self._data = []
|
|
||||||
self.upper_collections: List[Collection[T]] = []
|
|
||||||
self.contained_collections: List[Collection[T]] = []
|
|
||||||
|
|
||||||
# List of collection attributes that should be modified on append
|
|
||||||
# Key: collection attribute (str) of appended element
|
|
||||||
# Value: main collection to sync to
|
|
||||||
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
|
||||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
|
||||||
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
|
|
||||||
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
|
||||||
|
|
||||||
self.contain_self_on_append: List[str] = []
|
|
||||||
|
|
||||||
self._indexed_values = defaultdict(set)
|
|
||||||
self._indexed_to_objects = defaultdict(list)
|
|
||||||
|
|
||||||
self.extend(data)
|
|
||||||
|
|
||||||
def _map_element(self, __object: T, from_map: bool = False):
|
|
||||||
self._contains_ids.add(__object.id)
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
self._indexed_values[name].add(value)
|
|
||||||
self._indexed_to_objects[value].append(__object)
|
|
||||||
|
|
||||||
if not from_map:
|
|
||||||
for attribute, new_object in self.contain_given_in_attribute.items():
|
|
||||||
__object.__getattribute__(attribute).contain_collection_inside(new_object)
|
|
||||||
|
|
||||||
for attribute, new_object in self.contain_given_in_attribute.items():
|
|
||||||
new_object.contain_collection_inside(__object.__getattribute__(attribute))
|
|
||||||
|
|
||||||
for attribute, new_object in self.append_object_to_attribute.items():
|
|
||||||
__object.__getattribute__(attribute).append(new_object, from_map=True)
|
|
||||||
|
|
||||||
def _unmap_element(self, __object: T):
|
|
||||||
self._contains_ids.remove(__object.id)
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value not in self._indexed_values[name]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._indexed_to_objects[value].remove(__object)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not len(self._indexed_to_objects[value]):
|
|
||||||
self._indexed_values[name].remove(value)
|
|
||||||
|
|
||||||
def _contained_in_self(self, __object: T) -> bool:
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return True
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value in self._indexed_values[name]:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _get_root_collections(self) -> List["Collection"]:
|
|
||||||
if not len(self.upper_collections):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
root_collections = []
|
|
||||||
for upper_collection in self.upper_collections:
|
|
||||||
root_collections.extend(upper_collection._get_root_collections())
|
|
||||||
return root_collections
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _is_root(self) -> bool:
|
|
||||||
return len(self.upper_collections) <= 0
|
|
||||||
|
|
||||||
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
|
|
||||||
results = []
|
|
||||||
|
|
||||||
if self._contained_in_self(__object):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
for collection in self.contained_collections:
|
|
||||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
|
||||||
if break_at_first:
|
|
||||||
return results
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _get_parents_of_multiple_contained_children(self, __object: T):
|
|
||||||
results = []
|
|
||||||
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
|
|
||||||
return results
|
|
||||||
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
for collection in self.contained_collections:
|
|
||||||
sub_results = collection._get_parents_of_multiple_contained_children(__object)
|
|
||||||
|
|
||||||
if len(sub_results) > 0:
|
|
||||||
count += 1
|
|
||||||
results.extend(sub_results)
|
|
||||||
|
|
||||||
if count >= 2:
|
|
||||||
results.append(self)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _merge_in_self(self, __object: T, from_map: bool = False):
|
|
||||||
"""
|
|
||||||
1. find existing objects
|
|
||||||
2. merge into existing object
|
|
||||||
3. remap existing object
|
|
||||||
"""
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return
|
|
||||||
|
|
||||||
existing_object: DatabaseObject = None
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value in self._indexed_values[name]:
|
|
||||||
existing_object = self._indexed_to_objects[value][0]
|
|
||||||
if existing_object.id == __object.id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
break
|
|
||||||
|
|
||||||
if existing_object is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
existing_object.merge(__object, replace_all_refs=True)
|
|
||||||
|
|
||||||
# just a check if it really worked
|
|
||||||
if existing_object.id != __object.id:
|
|
||||||
raise ValueError("This should NEVER happen. Merging doesn't work.")
|
|
||||||
|
|
||||||
self._map_element(existing_object, from_map=from_map)
|
|
||||||
|
|
||||||
def contains(self, __object: T) -> bool:
|
|
||||||
return len(self._contained_in_sub(__object)) > 0
|
|
||||||
|
|
||||||
def _append(self, __object: T, from_map: bool = False):
|
|
||||||
for attribute, to_sync_with in self.sync_on_append.items():
|
|
||||||
pass
|
|
||||||
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
|
|
||||||
|
|
||||||
self._map_element(__object, from_map=from_map)
|
|
||||||
self._data.append(__object)
|
|
||||||
|
|
||||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
|
||||||
if __object is None:
|
|
||||||
return
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return
|
|
||||||
|
|
||||||
exists_in_collection = self._contained_in_sub(__object)
|
|
||||||
if len(exists_in_collection) and self is exists_in_collection[0]:
|
|
||||||
# assuming that the object already is contained in the correct collections
|
|
||||||
if not already_is_parent:
|
|
||||||
self._merge_in_self(__object, from_map=from_map)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not len(exists_in_collection):
|
|
||||||
self._append(__object, from_map=from_map)
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
|
|
||||||
|
|
||||||
if not already_is_parent or not self._is_root:
|
|
||||||
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
|
|
||||||
pass
|
|
||||||
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
|
|
||||||
|
|
||||||
def extend(self, __iterable: Optional[Iterable[T]]):
|
|
||||||
if __iterable is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
for __object in __iterable:
|
|
||||||
self.append(__object)
|
|
||||||
|
|
||||||
def sync_with_other_collection(self, equal_collection: "Collection"):
|
|
||||||
"""
|
|
||||||
If two collections always need to have the same values, this can be used.
|
|
||||||
|
|
||||||
Internally:
|
|
||||||
1. import the data from other to self
|
|
||||||
- _data
|
|
||||||
- contained_collections
|
|
||||||
2. replace all refs from the other object, with refs from this object
|
|
||||||
"""
|
|
||||||
if equal_collection is self:
|
|
||||||
return
|
|
||||||
|
|
||||||
# don't add the elements from the subelements from the other collection.
|
|
||||||
# this will be done in the next step.
|
|
||||||
self.extend(equal_collection._data)
|
|
||||||
# add all submodules
|
|
||||||
for equal_sub_collection in equal_collection.contained_collections:
|
|
||||||
self.contain_collection_inside(equal_sub_collection)
|
|
||||||
|
|
||||||
# now the ugly part
|
|
||||||
# replace all refs of the other element with this one
|
|
||||||
self._risky_merge(equal_collection)
|
|
||||||
|
|
||||||
def contain_collection_inside(self, sub_collection: "Collection"):
|
|
||||||
"""
|
|
||||||
This collection will ALWAYS contain everything from the passed in collection
|
|
||||||
"""
|
|
||||||
if sub_collection in self.contained_collections:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.contained_collections.append(sub_collection)
|
|
||||||
sub_collection.upper_collections.append(self)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data(self) -> List[T]:
|
|
||||||
return [*self._data,
|
|
||||||
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
|
||||||
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
|
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[T]:
|
|
||||||
for element in self._data:
|
|
||||||
yield element
|
|
@ -1,256 +0,0 @@
|
|||||||
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
|
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
from .parents import DatabaseObject
|
|
||||||
from ..utils.support_classes.hacking import MetaClass
|
|
||||||
|
|
||||||
T = TypeVar('T', bound=DatabaseObject)
|
|
||||||
|
|
||||||
|
|
||||||
class Collection(Generic[T]):
|
|
||||||
_data: List[T]
|
|
||||||
|
|
||||||
_indexed_values: Dict[str, set]
|
|
||||||
_indexed_to_objects: Dict[any, list]
|
|
||||||
|
|
||||||
shallow_list = property(fget=lambda self: self.data)
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self, data: Optional[Iterable[T]] = None,
|
|
||||||
sync_on_append: Dict[str, "Collection"] = None,
|
|
||||||
contain_given_in_attribute: Dict[str, "Collection"] = None,
|
|
||||||
contain_attribute_in_given: Dict[str, "Collection"] = None,
|
|
||||||
append_object_to_attribute: Dict[str, DatabaseObject] = None
|
|
||||||
) -> None:
|
|
||||||
self._contains_ids = set()
|
|
||||||
self._data = []
|
|
||||||
self.upper_collections: List[Collection[T]] = []
|
|
||||||
self.contained_collections: List[Collection[T]] = []
|
|
||||||
|
|
||||||
# List of collection attributes that should be modified on append
|
|
||||||
# Key: collection attribute (str) of appended element
|
|
||||||
# Value: main collection to sync to
|
|
||||||
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
|
||||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
|
||||||
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
|
|
||||||
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
|
|
||||||
|
|
||||||
self.contain_self_on_append: List[str] = []
|
|
||||||
|
|
||||||
self._indexed_values = defaultdict(set)
|
|
||||||
self._indexed_to_objects = defaultdict(list)
|
|
||||||
|
|
||||||
self.extend(data)
|
|
||||||
|
|
||||||
def _map_element(self, __object: T, from_map: bool = False):
|
|
||||||
self._contains_ids.add(__object.id)
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
self._indexed_values[name].add(value)
|
|
||||||
self._indexed_to_objects[value].append(__object)
|
|
||||||
|
|
||||||
if not from_map:
|
|
||||||
for attribute, new_object in self.contain_given_in_attribute.items():
|
|
||||||
__object.__getattribute__(attribute).contain_collection_inside(new_object)
|
|
||||||
|
|
||||||
for attribute, new_object in self.contain_given_in_attribute.items():
|
|
||||||
new_object.contain_collection_inside(__object.__getattribute__(attribute))
|
|
||||||
|
|
||||||
for attribute, new_object in self.append_object_to_attribute.items():
|
|
||||||
__object.__getattribute__(attribute).append(new_object, from_map=True)
|
|
||||||
|
|
||||||
def _unmap_element(self, __object: T):
|
|
||||||
self._contains_ids.remove(__object.id)
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value not in self._indexed_values[name]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._indexed_to_objects[value].remove(__object)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not len(self._indexed_to_objects[value]):
|
|
||||||
self._indexed_values[name].remove(value)
|
|
||||||
|
|
||||||
def _contained_in_self(self, __object: T) -> bool:
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return True
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value in self._indexed_values[name]:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _get_root_collections(self) -> List["Collection"]:
|
|
||||||
if not len(self.upper_collections):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
root_collections = []
|
|
||||||
for upper_collection in self.upper_collections:
|
|
||||||
root_collections.extend(upper_collection._get_root_collections())
|
|
||||||
return root_collections
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _is_root(self) -> bool:
|
|
||||||
return len(self.upper_collections) <= 0
|
|
||||||
|
|
||||||
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
|
|
||||||
results = []
|
|
||||||
|
|
||||||
if self._contained_in_self(__object):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
for collection in self.contained_collections:
|
|
||||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
|
||||||
if break_at_first:
|
|
||||||
return results
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _get_parents_of_multiple_contained_children(self, __object: T):
|
|
||||||
results = []
|
|
||||||
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
|
|
||||||
return results
|
|
||||||
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
for collection in self.contained_collections:
|
|
||||||
sub_results = collection._get_parents_of_multiple_contained_children(__object)
|
|
||||||
|
|
||||||
if len(sub_results) > 0:
|
|
||||||
count += 1
|
|
||||||
results.extend(sub_results)
|
|
||||||
|
|
||||||
if count >= 2:
|
|
||||||
results.append(self)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _merge_in_self(self, __object: T, from_map: bool = False):
|
|
||||||
"""
|
|
||||||
1. find existing objects
|
|
||||||
2. merge into existing object
|
|
||||||
3. remap existing object
|
|
||||||
"""
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return
|
|
||||||
|
|
||||||
existing_object: DatabaseObject = None
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value in self._indexed_values[name]:
|
|
||||||
existing_object = self._indexed_to_objects[value][0]
|
|
||||||
if existing_object.id == __object.id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
break
|
|
||||||
|
|
||||||
if existing_object is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
existing_object.merge(__object, replace_all_refs=True)
|
|
||||||
|
|
||||||
# just a check if it really worked
|
|
||||||
if existing_object.id != __object.id:
|
|
||||||
raise ValueError("This should NEVER happen. Merging doesn't work.")
|
|
||||||
|
|
||||||
self._map_element(existing_object, from_map=from_map)
|
|
||||||
|
|
||||||
def contains(self, __object: T) -> bool:
|
|
||||||
return len(self._contained_in_sub(__object)) > 0
|
|
||||||
|
|
||||||
def _append(self, __object: T, from_map: bool = False):
|
|
||||||
for attribute, to_sync_with in self.sync_on_append.items():
|
|
||||||
pass
|
|
||||||
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
|
|
||||||
|
|
||||||
self._map_element(__object, from_map=from_map)
|
|
||||||
self._data.append(__object)
|
|
||||||
|
|
||||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
|
||||||
if __object is None:
|
|
||||||
return
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return
|
|
||||||
|
|
||||||
exists_in_collection = self._contained_in_sub(__object)
|
|
||||||
if len(exists_in_collection) and self is exists_in_collection[0]:
|
|
||||||
# assuming that the object already is contained in the correct collections
|
|
||||||
if not already_is_parent:
|
|
||||||
self._merge_in_self(__object, from_map=from_map)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not len(exists_in_collection):
|
|
||||||
self._append(__object, from_map=from_map)
|
|
||||||
else:
|
|
||||||
pass
|
|
||||||
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
|
|
||||||
|
|
||||||
if not already_is_parent or not self._is_root:
|
|
||||||
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
|
|
||||||
pass
|
|
||||||
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
|
|
||||||
|
|
||||||
def extend(self, __iterable: Optional[Iterable[T]]):
|
|
||||||
if __iterable is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
for __object in __iterable:
|
|
||||||
self.append(__object)
|
|
||||||
|
|
||||||
def sync_with_other_collection(self, equal_collection: "Collection"):
|
|
||||||
"""
|
|
||||||
If two collections always need to have the same values, this can be used.
|
|
||||||
|
|
||||||
Internally:
|
|
||||||
1. import the data from other to self
|
|
||||||
- _data
|
|
||||||
- contained_collections
|
|
||||||
2. replace all refs from the other object, with refs from this object
|
|
||||||
"""
|
|
||||||
if equal_collection is self:
|
|
||||||
return
|
|
||||||
|
|
||||||
# don't add the elements from the subelements from the other collection.
|
|
||||||
# this will be done in the next step.
|
|
||||||
self.extend(equal_collection._data)
|
|
||||||
# add all submodules
|
|
||||||
for equal_sub_collection in equal_collection.contained_collections:
|
|
||||||
self.contain_collection_inside(equal_sub_collection)
|
|
||||||
|
|
||||||
# now the ugly part
|
|
||||||
# replace all refs of the other element with this one
|
|
||||||
self._risky_merge(equal_collection)
|
|
||||||
|
|
||||||
def contain_collection_inside(self, sub_collection: "Collection"):
|
|
||||||
"""
|
|
||||||
This collection will ALWAYS contain everything from the passed in collection
|
|
||||||
"""
|
|
||||||
if sub_collection in self.contained_collections:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.contained_collections.append(sub_collection)
|
|
||||||
sub_collection.upper_collections.append(self)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data(self) -> List[T]:
|
|
||||||
return [*self._data,
|
|
||||||
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
|
||||||
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
|
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[T]:
|
|
||||||
for element in self._data:
|
|
||||||
yield element
|
|
@ -26,6 +26,8 @@ class InnerData:
|
|||||||
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
|
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_multiple_instances = False
|
||||||
|
|
||||||
def __init__(self, object_type, **kwargs):
|
def __init__(self, object_type, **kwargs):
|
||||||
# initialize the default values
|
# initialize the default values
|
||||||
self.__default_values = {}
|
self.__default_values = {}
|
||||||
@ -177,8 +179,20 @@ class OuterProxy:
|
|||||||
_ = "debug"
|
_ = "debug"
|
||||||
return
|
return
|
||||||
|
|
||||||
self._inner.__merge__(__other._inner, override=override)
|
a = self
|
||||||
__other._inner = self._inner
|
b = __other
|
||||||
|
|
||||||
|
if a._inner._multiple_instances and b._inner._multiple_instances:
|
||||||
|
LOGGER.warning(f"Both instances data obj are shared over multiple objects. This will lead so them being unsynchronized at some point. {a} {b}")
|
||||||
|
|
||||||
|
if b._inner._multiple_instances:
|
||||||
|
a, b = b, a
|
||||||
|
|
||||||
|
a._inner.__merge__(b._inner, override=override)
|
||||||
|
b._inner = a._inner
|
||||||
|
|
||||||
|
|
||||||
|
b._inner._multiple_instances = True
|
||||||
|
|
||||||
def mark_as_fetched(self, *url_hash_list: List[str]):
|
def mark_as_fetched(self, *url_hash_list: List[str]):
|
||||||
for url_hash in url_hash_list:
|
for url_hash in url_hash_list:
|
||||||
|
@ -119,7 +119,7 @@ class Song(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('title', self.unified_title),
|
('title', unify(self.unified_title)),
|
||||||
('isrc', self.isrc),
|
('isrc', self.isrc),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
@ -244,6 +244,9 @@ class Album(Base):
|
|||||||
self.song_collection.contain_attribute_in_given = {
|
self.song_collection.contain_attribute_in_given = {
|
||||||
"main_artist_collection": self.artist_collection
|
"main_artist_collection": self.artist_collection
|
||||||
}
|
}
|
||||||
|
self.song_collection.append_object_to_attribute = {
|
||||||
|
"album_collection": self
|
||||||
|
}
|
||||||
|
|
||||||
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
||||||
if object_type is Song:
|
if object_type is Song:
|
||||||
@ -265,7 +268,7 @@ class Album(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('title', self.unified_title),
|
('title', unify(self.title)),
|
||||||
('barcode', self.barcode),
|
('barcode', self.barcode),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
@ -530,7 +533,7 @@ class Artist(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('name', self.unified_name),
|
('name', unify(self.name)),
|
||||||
*[('url', source.url) for source in self.source_collection],
|
*[('url', source.url) for source in self.source_collection],
|
||||||
*[('contact', contact.value) for contact in self.contact_collection]
|
*[('contact', contact.value) for contact in self.contact_collection]
|
||||||
]
|
]
|
||||||
@ -643,7 +646,7 @@ class Label(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('name', self.unified_name),
|
('name', unify(self.name)),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ from functools import lru_cache
|
|||||||
|
|
||||||
import youtube_dl
|
import youtube_dl
|
||||||
from youtube_dl.extractor.youtube import YoutubeIE
|
from youtube_dl.extractor.youtube import YoutubeIE
|
||||||
|
from youtube_dl.utils import DownloadError
|
||||||
|
|
||||||
from ...utils.exception.config import SettingValueError
|
from ...utils.exception.config import SettingValueError
|
||||||
from ...utils.config import main_settings, youtube_settings, logging_settings
|
from ...utils.config import main_settings, youtube_settings, logging_settings
|
||||||
@ -201,6 +202,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
|
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
|
||||||
|
|
||||||
self.download_values_by_url: dict = {}
|
self.download_values_by_url: dict = {}
|
||||||
|
self.not_download: Dict[str, DownloadError] = {}
|
||||||
|
|
||||||
def _fetch_from_main_page(self):
|
def _fetch_from_main_page(self):
|
||||||
"""
|
"""
|
||||||
@ -483,7 +485,13 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
|
|
||||||
|
|
||||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||||
|
ydl_res: dict = {}
|
||||||
|
try:
|
||||||
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
|
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
|
||||||
|
except DownloadError as e:
|
||||||
|
self.not_download[source.hash_url] = e
|
||||||
|
self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}")
|
||||||
|
return Song()
|
||||||
|
|
||||||
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
||||||
|
|
||||||
@ -556,6 +564,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||||
media = self.fetch_media_url(source)
|
media = self.fetch_media_url(source)
|
||||||
|
|
||||||
|
if source.hash_url not in self.not_download:
|
||||||
result = self.download_connection.stream_into(
|
result = self.download_connection.stream_into(
|
||||||
media["url"],
|
media["url"],
|
||||||
target,
|
target,
|
||||||
@ -567,6 +576,8 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
|
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
|
||||||
method="GET",
|
method="GET",
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
result = DownloadResult(error_message=str(self.not_download[source.hash_url]))
|
||||||
|
|
||||||
if result.is_fatal_error:
|
if result.is_fatal_error:
|
||||||
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
|
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
|
||||||
|
@ -19,6 +19,9 @@ def unify(string: str) -> str:
|
|||||||
- is lowercase
|
- is lowercase
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if string is None:
|
||||||
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
string = translit(string, reversed=True)
|
string = translit(string, reversed=True)
|
||||||
except LanguageDetectionError:
|
except LanguageDetectionError:
|
||||||
|
90
tests/collection.py
Normal file
90
tests/collection.py
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from music_kraken.objects import Song, Album, Artist, Collection, Country
|
||||||
|
|
||||||
|
class TestCollection(unittest.TestCase):
|
||||||
|
@staticmethod
|
||||||
|
def complicated_object() -> Artist:
|
||||||
|
return Artist(
|
||||||
|
name="artist",
|
||||||
|
country=Country.by_alpha_2("DE"),
|
||||||
|
main_album_list=[
|
||||||
|
Album(
|
||||||
|
title="album",
|
||||||
|
song_list=[
|
||||||
|
Song(
|
||||||
|
title="song",
|
||||||
|
album_list=[
|
||||||
|
Album(title="album", albumsort=123),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
Song(
|
||||||
|
title="other_song",
|
||||||
|
album_list=[
|
||||||
|
Album(title="album", albumsort=423),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
),
|
||||||
|
Album(title="album", barcode="1234567890123"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_song_album_relation(self):
|
||||||
|
"""
|
||||||
|
Tests that
|
||||||
|
album = album.any_song.one_album
|
||||||
|
is the same object
|
||||||
|
"""
|
||||||
|
|
||||||
|
a = self.complicated_object().main_album_collection[0]
|
||||||
|
b = a.song_collection[0].album_collection[0]
|
||||||
|
c = a.song_collection[1].album_collection[0]
|
||||||
|
d = b.song_collection[0].album_collection[0]
|
||||||
|
e = d.song_collection[0].album_collection[0]
|
||||||
|
f = e.song_collection[0].album_collection[0]
|
||||||
|
g = f.song_collection[0].album_collection[0]
|
||||||
|
|
||||||
|
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
|
||||||
|
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
|
||||||
|
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
|
||||||
|
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
|
||||||
|
|
||||||
|
d.title = "new_title"
|
||||||
|
|
||||||
|
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
|
||||||
|
|
||||||
|
def test_album_artist_relation(self):
|
||||||
|
"""
|
||||||
|
Tests that
|
||||||
|
artist = artist.any_album.any_song.one_artist
|
||||||
|
is the same object
|
||||||
|
"""
|
||||||
|
|
||||||
|
a = self.complicated_object()
|
||||||
|
b = a.main_album_collection[0].artist_collection[0]
|
||||||
|
c = b.main_album_collection[0].artist_collection[0]
|
||||||
|
d = c.main_album_collection[0].artist_collection[0]
|
||||||
|
|
||||||
|
self.assertTrue(a.id == b.id == c.id == d.id)
|
||||||
|
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
||||||
|
self.assertTrue(a.country == b.country == c.country == d.country)
|
||||||
|
|
||||||
|
def test_song_artist_relations(self):
|
||||||
|
"""
|
||||||
|
Tests that
|
||||||
|
artist = artist.any_album.any_song.one_artist
|
||||||
|
is the same object
|
||||||
|
"""
|
||||||
|
|
||||||
|
a = self.complicated_object()
|
||||||
|
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
||||||
|
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
||||||
|
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
||||||
|
|
||||||
|
self.assertTrue(a.id == b.id == c.id == d.id)
|
||||||
|
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
||||||
|
self.assertTrue(a.country == b.country == c.country == d.country)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user