Compare commits
No commits in common. "4473ff08d340d4f252f29270ba50af029a459ee3" and "92495f73fd12f272866a3079ee329a1e0731e2ae" have entirely different histories.
4473ff08d3
...
92495f73fd
6
.vscode/settings.json
vendored
6
.vscode/settings.json
vendored
@ -19,13 +19,9 @@
|
|||||||
"APIC",
|
"APIC",
|
||||||
"Bandcamp",
|
"Bandcamp",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"encyclopaedia",
|
|
||||||
"levenshtein",
|
"levenshtein",
|
||||||
"metallum",
|
|
||||||
"musify",
|
|
||||||
"OKBLUE",
|
"OKBLUE",
|
||||||
"Referer",
|
"Referer",
|
||||||
"tracksort",
|
"tracksort"
|
||||||
"youtube"
|
|
||||||
]
|
]
|
||||||
}
|
}
|
0
development/__init__.py
Normal file
0
development/__init__.py
Normal file
@ -1,57 +0,0 @@
|
|||||||
import music_kraken
|
|
||||||
from music_kraken.objects import Song, Album, Artist, Collection
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
artist: Artist = Artist(
|
|
||||||
name="artist",
|
|
||||||
main_album_list=[
|
|
||||||
Album(
|
|
||||||
title="album",
|
|
||||||
song_list=[
|
|
||||||
Song(
|
|
||||||
title="song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=123),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
Song(
|
|
||||||
title="other_song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=423),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
),
|
|
||||||
Album(title="album", barcode="1234567890123"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
a = artist.main_album_collection[0]
|
|
||||||
b = a.song_collection[0].album_collection[0]
|
|
||||||
c = a.song_collection[1].album_collection[0]
|
|
||||||
d = b.song_collection[0].album_collection[0]
|
|
||||||
e = d.song_collection[0].album_collection[0]
|
|
||||||
f = e.song_collection[0].album_collection[0]
|
|
||||||
g = f.song_collection[0].album_collection[0]
|
|
||||||
|
|
||||||
print(a.id, a.title, a.barcode, a.albumsort)
|
|
||||||
print(b.id, b.title, b.barcode, b.albumsort)
|
|
||||||
print(c.id, c.title, c.barcode, c.albumsort)
|
|
||||||
print(d.id, d.title, d.barcode, d.albumsort)
|
|
||||||
print(e.id, e.title, e.barcode, e.albumsort)
|
|
||||||
print(f.id, f.title, f.barcode, f.albumsort)
|
|
||||||
print(g.id, g.title, g.barcode, g.albumsort)
|
|
||||||
print()
|
|
||||||
|
|
||||||
d.title = "new_title"
|
|
||||||
|
|
||||||
print(a.id, a.title, a.barcode, a.albumsort)
|
|
||||||
print(b.id, b.title, b.barcode, b.albumsort)
|
|
||||||
print(c.id, c.title, c.barcode, c.albumsort)
|
|
||||||
print(d.id, d.title, d.barcode, d.albumsort)
|
|
||||||
print(e.id, e.title, e.barcode, e.albumsort)
|
|
||||||
print(f.id, f.title, f.barcode, f.albumsort)
|
|
||||||
print(g.id, g.title, g.barcode, g.albumsort)
|
|
||||||
print()
|
|
||||||
|
|
||||||
print(artist.main_album_collection._indexed_values)
|
|
@ -110,7 +110,7 @@ class Collection(Generic[T]):
|
|||||||
if self._contained_in_self(__object):
|
if self._contained_in_self(__object):
|
||||||
return [self]
|
return [self]
|
||||||
|
|
||||||
for collection in (*self.children, *self.parents):
|
for collection in self.children:
|
||||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
||||||
|
|
||||||
if break_at_first:
|
if break_at_first:
|
||||||
|
257
music_kraken/objects/new_collection.py
Normal file
257
music_kraken/objects/new_collection.py
Normal file
@ -0,0 +1,257 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
|
||||||
|
from .parents import OuterProxy
|
||||||
|
|
||||||
|
T = TypeVar('T', bound=OuterProxy)
|
||||||
|
|
||||||
|
|
||||||
|
class Collection(Generic[T]):
|
||||||
|
_data: List[T]
|
||||||
|
|
||||||
|
_indexed_values: Dict[str, set]
|
||||||
|
_indexed_to_objects: Dict[any, list]
|
||||||
|
|
||||||
|
shallow_list = property(fget=lambda self: self.data)
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
data: Optional[Iterable[T]] = None,
|
||||||
|
sync_on_append: Dict[str, "Collection"] = None,
|
||||||
|
contain_given_in_attribute: Dict[str, "Collection"] = None,
|
||||||
|
contain_attribute_in_given: Dict[str, "Collection"] = None,
|
||||||
|
append_object_to_attribute: Dict[str, T] = None
|
||||||
|
) -> None:
|
||||||
|
self._contains_ids = set()
|
||||||
|
self._data = []
|
||||||
|
self.upper_collections: List[Collection[T]] = []
|
||||||
|
self.contained_collections: List[Collection[T]] = []
|
||||||
|
|
||||||
|
# List of collection attributes that should be modified on append
|
||||||
|
# Key: collection attribute (str) of appended element
|
||||||
|
# Value: main collection to sync to
|
||||||
|
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||||
|
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||||
|
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
|
||||||
|
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
||||||
|
|
||||||
|
self.contain_self_on_append: List[str] = []
|
||||||
|
|
||||||
|
self._indexed_values = defaultdict(set)
|
||||||
|
self._indexed_to_objects = defaultdict(list)
|
||||||
|
|
||||||
|
self.extend(data)
|
||||||
|
|
||||||
|
def _map_element(self, __object: T, from_map: bool = False):
|
||||||
|
self._contains_ids.add(__object.id)
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._indexed_values[name].add(value)
|
||||||
|
self._indexed_to_objects[value].append(__object)
|
||||||
|
|
||||||
|
if not from_map:
|
||||||
|
for attribute, new_object in self.contain_given_in_attribute.items():
|
||||||
|
__object.__getattribute__(attribute).contain_collection_inside(new_object)
|
||||||
|
|
||||||
|
for attribute, new_object in self.contain_given_in_attribute.items():
|
||||||
|
new_object.contain_collection_inside(__object.__getattribute__(attribute))
|
||||||
|
|
||||||
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
|
__object.__getattribute__(attribute).append(new_object, from_map=True)
|
||||||
|
|
||||||
|
def _unmap_element(self, __object: T):
|
||||||
|
self._contains_ids.remove(__object.id)
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value not in self._indexed_values[name]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._indexed_to_objects[value].remove(__object)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not len(self._indexed_to_objects[value]):
|
||||||
|
self._indexed_values[name].remove(value)
|
||||||
|
|
||||||
|
def _contained_in_self(self, __object: T) -> bool:
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value in self._indexed_values[name]:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_root_collections(self) -> List["Collection"]:
|
||||||
|
if not len(self.upper_collections):
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
root_collections = []
|
||||||
|
for upper_collection in self.upper_collections:
|
||||||
|
root_collections.extend(upper_collection._get_root_collections())
|
||||||
|
return root_collections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _is_root(self) -> bool:
|
||||||
|
return len(self.upper_collections) <= 0
|
||||||
|
|
||||||
|
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
|
||||||
|
results = []
|
||||||
|
|
||||||
|
if self._contained_in_self(__object):
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
for collection in self.contained_collections:
|
||||||
|
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
||||||
|
if break_at_first:
|
||||||
|
return results
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def _get_parents_of_multiple_contained_children(self, __object: T):
|
||||||
|
results = []
|
||||||
|
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
|
||||||
|
return results
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
for collection in self.contained_collections:
|
||||||
|
sub_results = collection._get_parents_of_multiple_contained_children(__object)
|
||||||
|
|
||||||
|
if len(sub_results) > 0:
|
||||||
|
count += 1
|
||||||
|
results.extend(sub_results)
|
||||||
|
|
||||||
|
if count >= 2:
|
||||||
|
results.append(self)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def _merge_in_self(self, __object: T, from_map: bool = False):
|
||||||
|
"""
|
||||||
|
1. find existing objects
|
||||||
|
2. merge into existing object
|
||||||
|
3. remap existing object
|
||||||
|
"""
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
existing_object: DatabaseObject = None
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value in self._indexed_values[name]:
|
||||||
|
existing_object = self._indexed_to_objects[value][0]
|
||||||
|
if existing_object.id == __object.id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
if existing_object is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
existing_object.merge(__object, replace_all_refs=True)
|
||||||
|
|
||||||
|
# just a check if it really worked
|
||||||
|
if existing_object.id != __object.id:
|
||||||
|
raise ValueError("This should NEVER happen. Merging doesn't work.")
|
||||||
|
|
||||||
|
self._map_element(existing_object, from_map=from_map)
|
||||||
|
|
||||||
|
def contains(self, __object: T) -> bool:
|
||||||
|
return len(self._contained_in_sub(__object)) > 0
|
||||||
|
|
||||||
|
def _append(self, __object: T, from_map: bool = False):
|
||||||
|
for attribute, to_sync_with in self.sync_on_append.items():
|
||||||
|
pass
|
||||||
|
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
|
||||||
|
|
||||||
|
self._map_element(__object, from_map=from_map)
|
||||||
|
self._data.append(__object)
|
||||||
|
|
||||||
|
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||||
|
if __object is None:
|
||||||
|
return
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
exists_in_collection = self._contained_in_sub(__object)
|
||||||
|
if len(exists_in_collection) and self is exists_in_collection[0]:
|
||||||
|
# assuming that the object already is contained in the correct collections
|
||||||
|
if not already_is_parent:
|
||||||
|
self._merge_in_self(__object, from_map=from_map)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not len(exists_in_collection):
|
||||||
|
self._append(__object, from_map=from_map)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
|
||||||
|
|
||||||
|
if not already_is_parent or not self._is_root:
|
||||||
|
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
|
||||||
|
pass
|
||||||
|
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
|
||||||
|
|
||||||
|
def extend(self, __iterable: Optional[Iterable[T]]):
|
||||||
|
if __iterable is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
for __object in __iterable:
|
||||||
|
self.append(__object)
|
||||||
|
|
||||||
|
def sync_with_other_collection(self, equal_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
If two collections always need to have the same values, this can be used.
|
||||||
|
|
||||||
|
Internally:
|
||||||
|
1. import the data from other to self
|
||||||
|
- _data
|
||||||
|
- contained_collections
|
||||||
|
2. replace all refs from the other object, with refs from this object
|
||||||
|
"""
|
||||||
|
if equal_collection is self:
|
||||||
|
return
|
||||||
|
|
||||||
|
# don't add the elements from the subelements from the other collection.
|
||||||
|
# this will be done in the next step.
|
||||||
|
self.extend(equal_collection._data)
|
||||||
|
# add all submodules
|
||||||
|
for equal_sub_collection in equal_collection.contained_collections:
|
||||||
|
self.contain_collection_inside(equal_sub_collection)
|
||||||
|
|
||||||
|
# now the ugly part
|
||||||
|
# replace all refs of the other element with this one
|
||||||
|
self._risky_merge(equal_collection)
|
||||||
|
|
||||||
|
def contain_collection_inside(self, sub_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
This collection will ALWAYS contain everything from the passed in collection
|
||||||
|
"""
|
||||||
|
if sub_collection in self.contained_collections:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.contained_collections.append(sub_collection)
|
||||||
|
sub_collection.upper_collections.append(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data(self) -> List[T]:
|
||||||
|
return [*self._data,
|
||||||
|
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
|
||||||
|
|
||||||
|
def __iter__(self) -> Iterator[T]:
|
||||||
|
for element in self._data:
|
||||||
|
yield element
|
256
music_kraken/objects/old_collection.py
Normal file
256
music_kraken/objects/old_collection.py
Normal file
@ -0,0 +1,256 @@
|
|||||||
|
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from .parents import DatabaseObject
|
||||||
|
from ..utils.support_classes.hacking import MetaClass
|
||||||
|
|
||||||
|
T = TypeVar('T', bound=DatabaseObject)
|
||||||
|
|
||||||
|
|
||||||
|
class Collection(Generic[T]):
|
||||||
|
_data: List[T]
|
||||||
|
|
||||||
|
_indexed_values: Dict[str, set]
|
||||||
|
_indexed_to_objects: Dict[any, list]
|
||||||
|
|
||||||
|
shallow_list = property(fget=lambda self: self.data)
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, data: Optional[Iterable[T]] = None,
|
||||||
|
sync_on_append: Dict[str, "Collection"] = None,
|
||||||
|
contain_given_in_attribute: Dict[str, "Collection"] = None,
|
||||||
|
contain_attribute_in_given: Dict[str, "Collection"] = None,
|
||||||
|
append_object_to_attribute: Dict[str, DatabaseObject] = None
|
||||||
|
) -> None:
|
||||||
|
self._contains_ids = set()
|
||||||
|
self._data = []
|
||||||
|
self.upper_collections: List[Collection[T]] = []
|
||||||
|
self.contained_collections: List[Collection[T]] = []
|
||||||
|
|
||||||
|
# List of collection attributes that should be modified on append
|
||||||
|
# Key: collection attribute (str) of appended element
|
||||||
|
# Value: main collection to sync to
|
||||||
|
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||||
|
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||||
|
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
|
||||||
|
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
|
||||||
|
|
||||||
|
self.contain_self_on_append: List[str] = []
|
||||||
|
|
||||||
|
self._indexed_values = defaultdict(set)
|
||||||
|
self._indexed_to_objects = defaultdict(list)
|
||||||
|
|
||||||
|
self.extend(data)
|
||||||
|
|
||||||
|
def _map_element(self, __object: T, from_map: bool = False):
|
||||||
|
self._contains_ids.add(__object.id)
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._indexed_values[name].add(value)
|
||||||
|
self._indexed_to_objects[value].append(__object)
|
||||||
|
|
||||||
|
if not from_map:
|
||||||
|
for attribute, new_object in self.contain_given_in_attribute.items():
|
||||||
|
__object.__getattribute__(attribute).contain_collection_inside(new_object)
|
||||||
|
|
||||||
|
for attribute, new_object in self.contain_given_in_attribute.items():
|
||||||
|
new_object.contain_collection_inside(__object.__getattribute__(attribute))
|
||||||
|
|
||||||
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
|
__object.__getattribute__(attribute).append(new_object, from_map=True)
|
||||||
|
|
||||||
|
def _unmap_element(self, __object: T):
|
||||||
|
self._contains_ids.remove(__object.id)
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value not in self._indexed_values[name]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._indexed_to_objects[value].remove(__object)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not len(self._indexed_to_objects[value]):
|
||||||
|
self._indexed_values[name].remove(value)
|
||||||
|
|
||||||
|
def _contained_in_self(self, __object: T) -> bool:
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value in self._indexed_values[name]:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_root_collections(self) -> List["Collection"]:
|
||||||
|
if not len(self.upper_collections):
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
root_collections = []
|
||||||
|
for upper_collection in self.upper_collections:
|
||||||
|
root_collections.extend(upper_collection._get_root_collections())
|
||||||
|
return root_collections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _is_root(self) -> bool:
|
||||||
|
return len(self.upper_collections) <= 0
|
||||||
|
|
||||||
|
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
|
||||||
|
results = []
|
||||||
|
|
||||||
|
if self._contained_in_self(__object):
|
||||||
|
return [self]
|
||||||
|
|
||||||
|
for collection in self.contained_collections:
|
||||||
|
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
||||||
|
if break_at_first:
|
||||||
|
return results
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def _get_parents_of_multiple_contained_children(self, __object: T):
|
||||||
|
results = []
|
||||||
|
if len(self.contained_collections) < 2 or self._contained_in_self(__object):
|
||||||
|
return results
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
for collection in self.contained_collections:
|
||||||
|
sub_results = collection._get_parents_of_multiple_contained_children(__object)
|
||||||
|
|
||||||
|
if len(sub_results) > 0:
|
||||||
|
count += 1
|
||||||
|
results.extend(sub_results)
|
||||||
|
|
||||||
|
if count >= 2:
|
||||||
|
results.append(self)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def _merge_in_self(self, __object: T, from_map: bool = False):
|
||||||
|
"""
|
||||||
|
1. find existing objects
|
||||||
|
2. merge into existing object
|
||||||
|
3. remap existing object
|
||||||
|
"""
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
existing_object: DatabaseObject = None
|
||||||
|
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
if value in self._indexed_values[name]:
|
||||||
|
existing_object = self._indexed_to_objects[value][0]
|
||||||
|
if existing_object.id == __object.id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
if existing_object is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
existing_object.merge(__object, replace_all_refs=True)
|
||||||
|
|
||||||
|
# just a check if it really worked
|
||||||
|
if existing_object.id != __object.id:
|
||||||
|
raise ValueError("This should NEVER happen. Merging doesn't work.")
|
||||||
|
|
||||||
|
self._map_element(existing_object, from_map=from_map)
|
||||||
|
|
||||||
|
def contains(self, __object: T) -> bool:
|
||||||
|
return len(self._contained_in_sub(__object)) > 0
|
||||||
|
|
||||||
|
def _append(self, __object: T, from_map: bool = False):
|
||||||
|
for attribute, to_sync_with in self.sync_on_append.items():
|
||||||
|
pass
|
||||||
|
to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
|
||||||
|
|
||||||
|
self._map_element(__object, from_map=from_map)
|
||||||
|
self._data.append(__object)
|
||||||
|
|
||||||
|
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||||
|
if __object is None:
|
||||||
|
return
|
||||||
|
if __object.id in self._contains_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
exists_in_collection = self._contained_in_sub(__object)
|
||||||
|
if len(exists_in_collection) and self is exists_in_collection[0]:
|
||||||
|
# assuming that the object already is contained in the correct collections
|
||||||
|
if not already_is_parent:
|
||||||
|
self._merge_in_self(__object, from_map=from_map)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not len(exists_in_collection):
|
||||||
|
self._append(__object, from_map=from_map)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
|
||||||
|
|
||||||
|
if not already_is_parent or not self._is_root:
|
||||||
|
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
|
||||||
|
pass
|
||||||
|
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
|
||||||
|
|
||||||
|
def extend(self, __iterable: Optional[Iterable[T]]):
|
||||||
|
if __iterable is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
for __object in __iterable:
|
||||||
|
self.append(__object)
|
||||||
|
|
||||||
|
def sync_with_other_collection(self, equal_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
If two collections always need to have the same values, this can be used.
|
||||||
|
|
||||||
|
Internally:
|
||||||
|
1. import the data from other to self
|
||||||
|
- _data
|
||||||
|
- contained_collections
|
||||||
|
2. replace all refs from the other object, with refs from this object
|
||||||
|
"""
|
||||||
|
if equal_collection is self:
|
||||||
|
return
|
||||||
|
|
||||||
|
# don't add the elements from the subelements from the other collection.
|
||||||
|
# this will be done in the next step.
|
||||||
|
self.extend(equal_collection._data)
|
||||||
|
# add all submodules
|
||||||
|
for equal_sub_collection in equal_collection.contained_collections:
|
||||||
|
self.contain_collection_inside(equal_sub_collection)
|
||||||
|
|
||||||
|
# now the ugly part
|
||||||
|
# replace all refs of the other element with this one
|
||||||
|
self._risky_merge(equal_collection)
|
||||||
|
|
||||||
|
def contain_collection_inside(self, sub_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
This collection will ALWAYS contain everything from the passed in collection
|
||||||
|
"""
|
||||||
|
if sub_collection in self.contained_collections:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.contained_collections.append(sub_collection)
|
||||||
|
sub_collection.upper_collections.append(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data(self) -> List[T]:
|
||||||
|
return [*self._data,
|
||||||
|
*(__object for collection in self.contained_collections for __object in collection.shallow_list)]
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
|
||||||
|
|
||||||
|
def __iter__(self) -> Iterator[T]:
|
||||||
|
for element in self._data:
|
||||||
|
yield element
|
@ -26,8 +26,6 @@ class InnerData:
|
|||||||
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
|
If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_multiple_instances = False
|
|
||||||
|
|
||||||
def __init__(self, object_type, **kwargs):
|
def __init__(self, object_type, **kwargs):
|
||||||
# initialize the default values
|
# initialize the default values
|
||||||
self.__default_values = {}
|
self.__default_values = {}
|
||||||
@ -179,20 +177,8 @@ class OuterProxy:
|
|||||||
_ = "debug"
|
_ = "debug"
|
||||||
return
|
return
|
||||||
|
|
||||||
a = self
|
self._inner.__merge__(__other._inner, override=override)
|
||||||
b = __other
|
__other._inner = self._inner
|
||||||
|
|
||||||
if a._inner._multiple_instances and b._inner._multiple_instances:
|
|
||||||
LOGGER.warning(f"Both instances data obj are shared over multiple objects. This will lead so them being unsynchronized at some point. {a} {b}")
|
|
||||||
|
|
||||||
if b._inner._multiple_instances:
|
|
||||||
a, b = b, a
|
|
||||||
|
|
||||||
a._inner.__merge__(b._inner, override=override)
|
|
||||||
b._inner = a._inner
|
|
||||||
|
|
||||||
|
|
||||||
b._inner._multiple_instances = True
|
|
||||||
|
|
||||||
def mark_as_fetched(self, *url_hash_list: List[str]):
|
def mark_as_fetched(self, *url_hash_list: List[str]):
|
||||||
for url_hash in url_hash_list:
|
for url_hash in url_hash_list:
|
||||||
|
@ -119,7 +119,7 @@ class Song(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('title', unify(self.unified_title)),
|
('title', self.unified_title),
|
||||||
('isrc', self.isrc),
|
('isrc', self.isrc),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
@ -244,9 +244,6 @@ class Album(Base):
|
|||||||
self.song_collection.contain_attribute_in_given = {
|
self.song_collection.contain_attribute_in_given = {
|
||||||
"main_artist_collection": self.artist_collection
|
"main_artist_collection": self.artist_collection
|
||||||
}
|
}
|
||||||
self.song_collection.append_object_to_attribute = {
|
|
||||||
"album_collection": self
|
|
||||||
}
|
|
||||||
|
|
||||||
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
||||||
if object_type is Song:
|
if object_type is Song:
|
||||||
@ -268,7 +265,7 @@ class Album(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('title', unify(self.title)),
|
('title', self.unified_title),
|
||||||
('barcode', self.barcode),
|
('barcode', self.barcode),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
@ -533,7 +530,7 @@ class Artist(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('name', unify(self.name)),
|
('name', self.unified_name),
|
||||||
*[('url', source.url) for source in self.source_collection],
|
*[('url', source.url) for source in self.source_collection],
|
||||||
*[('contact', contact.value) for contact in self.contact_collection]
|
*[('contact', contact.value) for contact in self.contact_collection]
|
||||||
]
|
]
|
||||||
@ -646,7 +643,7 @@ class Label(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('name', unify(self.name)),
|
('name', self.unified_name),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -11,7 +11,6 @@ from functools import lru_cache
|
|||||||
|
|
||||||
import youtube_dl
|
import youtube_dl
|
||||||
from youtube_dl.extractor.youtube import YoutubeIE
|
from youtube_dl.extractor.youtube import YoutubeIE
|
||||||
from youtube_dl.utils import DownloadError
|
|
||||||
|
|
||||||
from ...utils.exception.config import SettingValueError
|
from ...utils.exception.config import SettingValueError
|
||||||
from ...utils.config import main_settings, youtube_settings, logging_settings
|
from ...utils.config import main_settings, youtube_settings, logging_settings
|
||||||
@ -202,7 +201,6 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
|
self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
|
||||||
|
|
||||||
self.download_values_by_url: dict = {}
|
self.download_values_by_url: dict = {}
|
||||||
self.not_download: Dict[str, DownloadError] = {}
|
|
||||||
|
|
||||||
def _fetch_from_main_page(self):
|
def _fetch_from_main_page(self):
|
||||||
"""
|
"""
|
||||||
@ -485,13 +483,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
|
|
||||||
|
|
||||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||||
ydl_res: dict = {}
|
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
|
||||||
try:
|
|
||||||
ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
|
|
||||||
except DownloadError as e:
|
|
||||||
self.not_download[source.hash_url] = e
|
|
||||||
self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}")
|
|
||||||
return Song()
|
|
||||||
|
|
||||||
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
||||||
|
|
||||||
@ -564,20 +556,17 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||||
media = self.fetch_media_url(source)
|
media = self.fetch_media_url(source)
|
||||||
|
|
||||||
if source.hash_url not in self.not_download:
|
result = self.download_connection.stream_into(
|
||||||
result = self.download_connection.stream_into(
|
media["url"],
|
||||||
media["url"],
|
target,
|
||||||
target,
|
name=desc,
|
||||||
name=desc,
|
raw_url=True,
|
||||||
raw_url=True,
|
raw_headers=True,
|
||||||
raw_headers=True,
|
disable_cache=True,
|
||||||
disable_cache=True,
|
headers=media.get("headers", {}),
|
||||||
headers=media.get("headers", {}),
|
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
|
||||||
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
|
method="GET",
|
||||||
method="GET",
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
result = DownloadResult(error_message=str(self.not_download[source.hash_url]))
|
|
||||||
|
|
||||||
if result.is_fatal_error:
|
if result.is_fatal_error:
|
||||||
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
|
result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
|
||||||
|
@ -16,12 +16,9 @@ def unify(string: str) -> str:
|
|||||||
"""
|
"""
|
||||||
returns a unified str, to make comparisons easy.
|
returns a unified str, to make comparisons easy.
|
||||||
a unified string has the following attributes:
|
a unified string has the following attributes:
|
||||||
- is lowercase
|
- is lowercase
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if string is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
string = translit(string, reversed=True)
|
string = translit(string, reversed=True)
|
||||||
except LanguageDetectionError:
|
except LanguageDetectionError:
|
||||||
|
@ -1,90 +0,0 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from music_kraken.objects import Song, Album, Artist, Collection, Country
|
|
||||||
|
|
||||||
class TestCollection(unittest.TestCase):
|
|
||||||
@staticmethod
|
|
||||||
def complicated_object() -> Artist:
|
|
||||||
return Artist(
|
|
||||||
name="artist",
|
|
||||||
country=Country.by_alpha_2("DE"),
|
|
||||||
main_album_list=[
|
|
||||||
Album(
|
|
||||||
title="album",
|
|
||||||
song_list=[
|
|
||||||
Song(
|
|
||||||
title="song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=123),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
Song(
|
|
||||||
title="other_song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=423),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
),
|
|
||||||
Album(title="album", barcode="1234567890123"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_song_album_relation(self):
|
|
||||||
"""
|
|
||||||
Tests that
|
|
||||||
album = album.any_song.one_album
|
|
||||||
is the same object
|
|
||||||
"""
|
|
||||||
|
|
||||||
a = self.complicated_object().main_album_collection[0]
|
|
||||||
b = a.song_collection[0].album_collection[0]
|
|
||||||
c = a.song_collection[1].album_collection[0]
|
|
||||||
d = b.song_collection[0].album_collection[0]
|
|
||||||
e = d.song_collection[0].album_collection[0]
|
|
||||||
f = e.song_collection[0].album_collection[0]
|
|
||||||
g = f.song_collection[0].album_collection[0]
|
|
||||||
|
|
||||||
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
|
|
||||||
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
|
|
||||||
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
|
|
||||||
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
|
|
||||||
|
|
||||||
d.title = "new_title"
|
|
||||||
|
|
||||||
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
|
|
||||||
|
|
||||||
def test_album_artist_relation(self):
|
|
||||||
"""
|
|
||||||
Tests that
|
|
||||||
artist = artist.any_album.any_song.one_artist
|
|
||||||
is the same object
|
|
||||||
"""
|
|
||||||
|
|
||||||
a = self.complicated_object()
|
|
||||||
b = a.main_album_collection[0].artist_collection[0]
|
|
||||||
c = b.main_album_collection[0].artist_collection[0]
|
|
||||||
d = c.main_album_collection[0].artist_collection[0]
|
|
||||||
|
|
||||||
self.assertTrue(a.id == b.id == c.id == d.id)
|
|
||||||
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
|
||||||
self.assertTrue(a.country == b.country == c.country == d.country)
|
|
||||||
|
|
||||||
def test_song_artist_relations(self):
|
|
||||||
"""
|
|
||||||
Tests that
|
|
||||||
artist = artist.any_album.any_song.one_artist
|
|
||||||
is the same object
|
|
||||||
"""
|
|
||||||
|
|
||||||
a = self.complicated_object()
|
|
||||||
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
|
||||||
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
|
||||||
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
|
||||||
|
|
||||||
self.assertTrue(a.id == b.id == c.id == d.id)
|
|
||||||
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
|
||||||
self.assertTrue(a.country == b.country == c.country == d.country)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
Loading…
Reference in New Issue
Block a user