fix/collections #7
							
								
								
									
										6
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							@@ -19,9 +19,13 @@
 | 
			
		||||
        "APIC",
 | 
			
		||||
        "Bandcamp",
 | 
			
		||||
        "dotenv",
 | 
			
		||||
        "encyclopaedia",
 | 
			
		||||
        "levenshtein",
 | 
			
		||||
        "metallum",
 | 
			
		||||
        "musify",
 | 
			
		||||
        "OKBLUE",
 | 
			
		||||
        "Referer",
 | 
			
		||||
        "tracksort"
 | 
			
		||||
        "tracksort",
 | 
			
		||||
        "youtube"
 | 
			
		||||
    ]
 | 
			
		||||
}
 | 
			
		||||
@@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    commands = [
 | 
			
		||||
        "s: #a Toxoplasma",
 | 
			
		||||
        "d: 16",
 | 
			
		||||
        "s: #a And End...",
 | 
			
		||||
        "d: 10",
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										92
									
								
								development/objects_collection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								development/objects_collection.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,92 @@
 | 
			
		||||
import music_kraken
 | 
			
		||||
from music_kraken.objects import Song, Album, Artist, Collection
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    artist: Artist = Artist(
 | 
			
		||||
        name="artist",
 | 
			
		||||
        main_album_list=[
 | 
			
		||||
            Album(
 | 
			
		||||
                title="album",
 | 
			
		||||
                song_list=[
 | 
			
		||||
                    Song(
 | 
			
		||||
                        title="song",
 | 
			
		||||
                        album_list=[
 | 
			
		||||
                            Album(
 | 
			
		||||
                                title="album", 
 | 
			
		||||
                                albumsort=123,
 | 
			
		||||
                                main_artist=Artist(name="artist"),
 | 
			
		||||
                            ),
 | 
			
		||||
                        ],
 | 
			
		||||
                    ),
 | 
			
		||||
                    Song(
 | 
			
		||||
                        title="other_song",
 | 
			
		||||
                        album_list=[
 | 
			
		||||
                            Album(title="album", albumsort=423),
 | 
			
		||||
                        ],
 | 
			
		||||
                    ),
 | 
			
		||||
                ]
 | 
			
		||||
            ),
 | 
			
		||||
            Album(title="album", barcode="1234567890123"),
 | 
			
		||||
        ]
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    other_artist: Artist = Artist(
 | 
			
		||||
        name="artist",
 | 
			
		||||
        main_album_list=[
 | 
			
		||||
            Album(
 | 
			
		||||
                title="album",
 | 
			
		||||
                song_list=[
 | 
			
		||||
                    Song(
 | 
			
		||||
                        title="song",
 | 
			
		||||
                        album_list=[
 | 
			
		||||
                            Album(
 | 
			
		||||
                                title="album", 
 | 
			
		||||
                                albumsort=123,
 | 
			
		||||
                                main_artist=Artist(name="other_artist"),
 | 
			
		||||
                            ),
 | 
			
		||||
                        ],
 | 
			
		||||
                    ),
 | 
			
		||||
                    Song(
 | 
			
		||||
                        title="other_song",
 | 
			
		||||
                        album_list=[
 | 
			
		||||
                            Album(title="album", albumsort=423),
 | 
			
		||||
                        ],
 | 
			
		||||
                    ),
 | 
			
		||||
                ]
 | 
			
		||||
            ),
 | 
			
		||||
            Album(title="album", barcode="1234567890123"),
 | 
			
		||||
        ]
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    artist.merge(other_artist)
 | 
			
		||||
 | 
			
		||||
    a = artist.main_album_collection[0]
 | 
			
		||||
    b = a.song_collection[0].album_collection[0]
 | 
			
		||||
    c = a.song_collection[1].album_collection[0]
 | 
			
		||||
    d = b.song_collection[0].album_collection[0]
 | 
			
		||||
    e = d.song_collection[0].album_collection[0]
 | 
			
		||||
    f = e.song_collection[0].album_collection[0]
 | 
			
		||||
    g = f.song_collection[0].album_collection[0]
 | 
			
		||||
 | 
			
		||||
    print(a.id, a.title, a.barcode, a.albumsort)
 | 
			
		||||
    print(b.id, b.title, b.barcode, b.albumsort)
 | 
			
		||||
    print(c.id, c.title, c.barcode, c.albumsort)
 | 
			
		||||
    print(d.id, d.title, d.barcode, d.albumsort)
 | 
			
		||||
    print(e.id, e.title, e.barcode, e.albumsort)
 | 
			
		||||
    print(f.id, f.title, f.barcode, f.albumsort)
 | 
			
		||||
    print(g.id, g.title, g.barcode, g.albumsort)
 | 
			
		||||
    print()
 | 
			
		||||
 | 
			
		||||
    d.title = "new_title"
 | 
			
		||||
 | 
			
		||||
    print(a.id, a.title, a.barcode, a.albumsort)
 | 
			
		||||
    print(b.id, b.title, b.barcode, b.albumsort)
 | 
			
		||||
    print(c.id, c.title, c.barcode, c.albumsort)
 | 
			
		||||
    print(d.id, d.title, d.barcode, d.albumsort)
 | 
			
		||||
    print(e.id, e.title, e.barcode, e.albumsort)
 | 
			
		||||
    print(f.id, f.title, f.barcode, f.albumsort)
 | 
			
		||||
    print(g.id, g.title, g.barcode, g.albumsort)
 | 
			
		||||
    print()
 | 
			
		||||
 | 
			
		||||
    print(artist.main_album_collection._indexed_values)
 | 
			
		||||
@@ -46,7 +46,7 @@ init_logging()
 | 
			
		||||
from . import cli
 | 
			
		||||
 | 
			
		||||
if DEBUG:
 | 
			
		||||
    sys.setrecursionlimit(100)
 | 
			
		||||
    sys.setrecursionlimit(500)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if main_settings['modify_gc']:
 | 
			
		||||
 
 | 
			
		||||
@@ -29,6 +29,8 @@ class AudioMetadata:
 | 
			
		||||
            """
 | 
			
		||||
            https://www.programcreek.com/python/example/84797/mutagen.id3.ID3
 | 
			
		||||
            """
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            self.frames.add(value)
 | 
			
		||||
 | 
			
		||||
    def add_song_metadata(self, song: Song):
 | 
			
		||||
 
 | 
			
		||||
@@ -110,7 +110,7 @@ class Collection(Generic[T]):
 | 
			
		||||
        if self._contained_in_self(__object):
 | 
			
		||||
            return [self]
 | 
			
		||||
 | 
			
		||||
        for collection in self.children:
 | 
			
		||||
        for collection in (*self.children, *self.parents):
 | 
			
		||||
            results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
 | 
			
		||||
 | 
			
		||||
            if break_at_first:
 | 
			
		||||
 
 | 
			
		||||
@@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
 | 
			
		||||
    @property
 | 
			
		||||
    def metadata(self) -> Metadata:
 | 
			
		||||
        return Metadata({
 | 
			
		||||
            id3Mapping.UNSYNCED_LYRICS: self.text.html
 | 
			
		||||
            id3Mapping.UNSYNCED_LYRICS: [self.text.html]
 | 
			
		||||
        })
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,257 +0,0 @@
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List
 | 
			
		||||
from .parents import OuterProxy
 | 
			
		||||
 | 
			
		||||
T = TypeVar('T', bound=OuterProxy)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Collection(Generic[T]):
 | 
			
		||||
    _data: List[T]
 | 
			
		||||
 | 
			
		||||
    _indexed_values: Dict[str, set]
 | 
			
		||||
    _indexed_to_objects: Dict[any, list]
 | 
			
		||||
 | 
			
		||||
    shallow_list = property(fget=lambda self: self.data)
 | 
			
		||||
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self,
 | 
			
		||||
            data: Optional[Iterable[T]] = None,
 | 
			
		||||
            sync_on_append: Dict[str, "Collection"] = None,
 | 
			
		||||
            contain_given_in_attribute: Dict[str, "Collection"] = None,
 | 
			
		||||
            contain_attribute_in_given: Dict[str, "Collection"] = None,
 | 
			
		||||
            append_object_to_attribute: Dict[str, T] = None
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._contains_ids = set()
 | 
			
		||||
        self._data = []
 | 
			
		||||
        self.upper_collections: List[Collection[T]] = []
 | 
			
		||||
        self.contained_collections: List[Collection[T]] = []
 | 
			
		||||
 | 
			
		||||
        # List of collection attributes that should be modified on append
 | 
			
		||||
        # Key: collection attribute (str) of appended element
 | 
			
		||||
        # Value: main collection to sync to
 | 
			
		||||
        self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
 | 
			
		||||
        self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
 | 
			
		||||
        self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
 | 
			
		||||
        self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
 | 
			
		||||
 | 
			
		||||
        self.contain_self_on_append: List[str] = []
 | 
			
		||||
 | 
			
		||||
        self._indexed_values = defaultdict(set)
 | 
			
		||||
        self._indexed_to_objects = defaultdict(list)
 | 
			
		||||
 | 
			
		||||
        self.extend(data)
 | 
			
		||||
 | 
			
		||||
    def _map_element(self, __object: T, from_map: bool = False):
 | 
			
		||||
        self._contains_ids.add(__object.id)
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            self._indexed_values[name].add(value)
 | 
			
		||||
            self._indexed_to_objects[value].append(__object)
 | 
			
		||||
 | 
			
		||||
        if not from_map:
 | 
			
		||||
            for attribute, new_object in self.contain_given_in_attribute.items():
 | 
			
		||||
                __object.__getattribute__(attribute).contain_collection_inside(new_object)
 | 
			
		||||
 | 
			
		||||
            for attribute, new_object in self.contain_given_in_attribute.items():
 | 
			
		||||
                new_object.contain_collection_inside(__object.__getattribute__(attribute))
 | 
			
		||||
 | 
			
		||||
            for attribute, new_object in self.append_object_to_attribute.items():
 | 
			
		||||
                __object.__getattribute__(attribute).append(new_object, from_map=True)
 | 
			
		||||
 | 
			
		||||
    def _unmap_element(self, __object: T):
 | 
			
		||||
        self._contains_ids.remove(__object.id)
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value not in self._indexed_values[name]:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                self._indexed_to_objects[value].remove(__object)
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            if not len(self._indexed_to_objects[value]):
 | 
			
		||||
                self._indexed_values[name].remove(value)
 | 
			
		||||
 | 
			
		||||
    def _contained_in_self(self, __object: T) -> bool:
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value in self._indexed_values[name]:
 | 
			
		||||
                return True
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def _get_root_collections(self) -> List["Collection"]:
 | 
			
		||||
        if not len(self.upper_collections):
 | 
			
		||||
            return [self]
 | 
			
		||||
 | 
			
		||||
        root_collections = []
 | 
			
		||||
        for upper_collection in self.upper_collections:
 | 
			
		||||
            root_collections.extend(upper_collection._get_root_collections())
 | 
			
		||||
        return root_collections
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def _is_root(self) -> bool:
 | 
			
		||||
        return len(self.upper_collections) <= 0
 | 
			
		||||
 | 
			
		||||
    def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
 | 
			
		||||
        results = []
 | 
			
		||||
 | 
			
		||||
        if self._contained_in_self(__object):
 | 
			
		||||
            return [self]
 | 
			
		||||
 | 
			
		||||
        for collection in self.contained_collections:
 | 
			
		||||
            results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
 | 
			
		||||
            if break_at_first:
 | 
			
		||||
                return results
 | 
			
		||||
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    def _get_parents_of_multiple_contained_children(self, __object: T):
 | 
			
		||||
        results = []
 | 
			
		||||
        if len(self.contained_collections) < 2 or self._contained_in_self(__object):
 | 
			
		||||
            return results
 | 
			
		||||
 | 
			
		||||
        count = 0
 | 
			
		||||
 | 
			
		||||
        for collection in self.contained_collections:
 | 
			
		||||
            sub_results = collection._get_parents_of_multiple_contained_children(__object)
 | 
			
		||||
 | 
			
		||||
            if len(sub_results) > 0:
 | 
			
		||||
                count += 1
 | 
			
		||||
                results.extend(sub_results)
 | 
			
		||||
 | 
			
		||||
        if count >= 2:
 | 
			
		||||
            results.append(self)
 | 
			
		||||
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    def _merge_in_self(self, __object: T, from_map: bool = False):
 | 
			
		||||
        """
 | 
			
		||||
        1. find existing objects
 | 
			
		||||
        2. merge into existing object
 | 
			
		||||
        3. remap existing object
 | 
			
		||||
        """
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        existing_object: DatabaseObject = None
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value in self._indexed_values[name]:
 | 
			
		||||
                existing_object = self._indexed_to_objects[value][0]
 | 
			
		||||
                if existing_object.id == __object.id:
 | 
			
		||||
                    return None
 | 
			
		||||
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
        if existing_object is None:
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        existing_object.merge(__object, replace_all_refs=True)
 | 
			
		||||
 | 
			
		||||
        # just a check if it really worked
 | 
			
		||||
        if existing_object.id != __object.id:
 | 
			
		||||
            raise ValueError("This should NEVER happen. Merging doesn't work.")
 | 
			
		||||
 | 
			
		||||
        self._map_element(existing_object, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
    def contains(self, __object: T) -> bool:
 | 
			
		||||
        return len(self._contained_in_sub(__object)) > 0
 | 
			
		||||
 | 
			
		||||
    def _append(self, __object: T, from_map: bool = False):
 | 
			
		||||
        for attribute, to_sync_with in self.sync_on_append.items():
 | 
			
		||||
            pass
 | 
			
		||||
            to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
 | 
			
		||||
 | 
			
		||||
        self._map_element(__object, from_map=from_map)
 | 
			
		||||
        self._data.append(__object)
 | 
			
		||||
 | 
			
		||||
    def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
 | 
			
		||||
        if __object is None:
 | 
			
		||||
            return
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        exists_in_collection = self._contained_in_sub(__object)
 | 
			
		||||
        if len(exists_in_collection) and self is exists_in_collection[0]:
 | 
			
		||||
            # assuming that the object already is contained in the correct collections
 | 
			
		||||
            if not already_is_parent:
 | 
			
		||||
                self._merge_in_self(__object, from_map=from_map)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if not len(exists_in_collection):
 | 
			
		||||
            self._append(__object, from_map=from_map)
 | 
			
		||||
        else:
 | 
			
		||||
            pass
 | 
			
		||||
            exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
        if not already_is_parent or not self._is_root:
 | 
			
		||||
            for parent_collection in self._get_parents_of_multiple_contained_children(__object):
 | 
			
		||||
                pass
 | 
			
		||||
                parent_collection.append(__object, already_is_parent=True, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
    def extend(self, __iterable: Optional[Iterable[T]]):
 | 
			
		||||
        if __iterable is None:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        for __object in __iterable:
 | 
			
		||||
            self.append(__object)
 | 
			
		||||
 | 
			
		||||
    def sync_with_other_collection(self, equal_collection: "Collection"):
 | 
			
		||||
        """
 | 
			
		||||
        If two collections always need to have the same values, this can be used.
 | 
			
		||||
 | 
			
		||||
        Internally:
 | 
			
		||||
        1. import the data from other to self
 | 
			
		||||
            - _data
 | 
			
		||||
            - contained_collections
 | 
			
		||||
        2. replace all refs from the other object, with refs from this object
 | 
			
		||||
        """
 | 
			
		||||
        if equal_collection is self:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # don't add the elements from the subelements from the other collection.
 | 
			
		||||
        # this will be done in the next step.
 | 
			
		||||
        self.extend(equal_collection._data)
 | 
			
		||||
        # add all submodules
 | 
			
		||||
        for equal_sub_collection in equal_collection.contained_collections:
 | 
			
		||||
            self.contain_collection_inside(equal_sub_collection)
 | 
			
		||||
 | 
			
		||||
        # now the ugly part
 | 
			
		||||
        # replace all refs of the other element with this one
 | 
			
		||||
        self._risky_merge(equal_collection)
 | 
			
		||||
 | 
			
		||||
    def contain_collection_inside(self, sub_collection: "Collection"):
 | 
			
		||||
        """
 | 
			
		||||
        This collection will ALWAYS contain everything from the passed in collection
 | 
			
		||||
        """
 | 
			
		||||
        if sub_collection in self.contained_collections:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self.contained_collections.append(sub_collection)
 | 
			
		||||
        sub_collection.upper_collections.append(self)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def data(self) -> List[T]:
 | 
			
		||||
        return [*self._data,
 | 
			
		||||
                *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
 | 
			
		||||
 | 
			
		||||
    def __len__(self) -> int:
 | 
			
		||||
        return len(self._data) + sum(len(collection) for collection in self.contained_collections)
 | 
			
		||||
 | 
			
		||||
    def __iter__(self) -> Iterator[T]:
 | 
			
		||||
        for element in self._data:
 | 
			
		||||
            yield element
 | 
			
		||||
@@ -1,256 +0,0 @@
 | 
			
		||||
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
 | 
			
		||||
from .parents import DatabaseObject
 | 
			
		||||
from ..utils.support_classes.hacking import MetaClass
 | 
			
		||||
 | 
			
		||||
T = TypeVar('T', bound=DatabaseObject)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Collection(Generic[T]):
 | 
			
		||||
    _data: List[T]
 | 
			
		||||
 | 
			
		||||
    _indexed_values: Dict[str, set]
 | 
			
		||||
    _indexed_to_objects: Dict[any, list]
 | 
			
		||||
 | 
			
		||||
    shallow_list = property(fget=lambda self: self.data)
 | 
			
		||||
 | 
			
		||||
    def __init__(
 | 
			
		||||
            self, data: Optional[Iterable[T]] = None,
 | 
			
		||||
            sync_on_append: Dict[str, "Collection"] = None,
 | 
			
		||||
            contain_given_in_attribute: Dict[str, "Collection"] = None,
 | 
			
		||||
            contain_attribute_in_given: Dict[str, "Collection"] = None,
 | 
			
		||||
            append_object_to_attribute: Dict[str, DatabaseObject] = None
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._contains_ids = set()
 | 
			
		||||
        self._data = []
 | 
			
		||||
        self.upper_collections: List[Collection[T]] = []
 | 
			
		||||
        self.contained_collections: List[Collection[T]] = []
 | 
			
		||||
 | 
			
		||||
        # List of collection attributes that should be modified on append
 | 
			
		||||
        # Key: collection attribute (str) of appended element
 | 
			
		||||
        # Value: main collection to sync to
 | 
			
		||||
        self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
 | 
			
		||||
        self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
 | 
			
		||||
        self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
 | 
			
		||||
        self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
 | 
			
		||||
 | 
			
		||||
        self.contain_self_on_append: List[str] = []
 | 
			
		||||
 | 
			
		||||
        self._indexed_values = defaultdict(set)
 | 
			
		||||
        self._indexed_to_objects = defaultdict(list)
 | 
			
		||||
 | 
			
		||||
        self.extend(data)
 | 
			
		||||
 | 
			
		||||
    def _map_element(self, __object: T, from_map: bool = False):
 | 
			
		||||
        self._contains_ids.add(__object.id)
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            self._indexed_values[name].add(value)
 | 
			
		||||
            self._indexed_to_objects[value].append(__object)
 | 
			
		||||
 | 
			
		||||
        if not from_map:
 | 
			
		||||
            for attribute, new_object in self.contain_given_in_attribute.items():
 | 
			
		||||
                __object.__getattribute__(attribute).contain_collection_inside(new_object)
 | 
			
		||||
 | 
			
		||||
            for attribute, new_object in self.contain_given_in_attribute.items():
 | 
			
		||||
                new_object.contain_collection_inside(__object.__getattribute__(attribute))
 | 
			
		||||
 | 
			
		||||
            for attribute, new_object in self.append_object_to_attribute.items():
 | 
			
		||||
                __object.__getattribute__(attribute).append(new_object, from_map=True)
 | 
			
		||||
 | 
			
		||||
    def _unmap_element(self, __object: T):
 | 
			
		||||
        self._contains_ids.remove(__object.id)
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value not in self._indexed_values[name]:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                self._indexed_to_objects[value].remove(__object)
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            if not len(self._indexed_to_objects[value]):
 | 
			
		||||
                self._indexed_values[name].remove(value)
 | 
			
		||||
 | 
			
		||||
    def _contained_in_self(self, __object: T) -> bool:
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value in self._indexed_values[name]:
 | 
			
		||||
                return True
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def _get_root_collections(self) -> List["Collection"]:
 | 
			
		||||
        if not len(self.upper_collections):
 | 
			
		||||
            return [self]
 | 
			
		||||
 | 
			
		||||
        root_collections = []
 | 
			
		||||
        for upper_collection in self.upper_collections:
 | 
			
		||||
            root_collections.extend(upper_collection._get_root_collections())
 | 
			
		||||
        return root_collections
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def _is_root(self) -> bool:
 | 
			
		||||
        return len(self.upper_collections) <= 0
 | 
			
		||||
 | 
			
		||||
    def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List["Collection"]:
 | 
			
		||||
        results = []
 | 
			
		||||
 | 
			
		||||
        if self._contained_in_self(__object):
 | 
			
		||||
            return [self]
 | 
			
		||||
 | 
			
		||||
        for collection in self.contained_collections:
 | 
			
		||||
            results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
 | 
			
		||||
            if break_at_first:
 | 
			
		||||
                return results
 | 
			
		||||
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    def _get_parents_of_multiple_contained_children(self, __object: T):
 | 
			
		||||
        results = []
 | 
			
		||||
        if len(self.contained_collections) < 2 or self._contained_in_self(__object):
 | 
			
		||||
            return results
 | 
			
		||||
 | 
			
		||||
        count = 0
 | 
			
		||||
 | 
			
		||||
        for collection in self.contained_collections:
 | 
			
		||||
            sub_results = collection._get_parents_of_multiple_contained_children(__object)
 | 
			
		||||
 | 
			
		||||
            if len(sub_results) > 0:
 | 
			
		||||
                count += 1
 | 
			
		||||
                results.extend(sub_results)
 | 
			
		||||
 | 
			
		||||
        if count >= 2:
 | 
			
		||||
            results.append(self)
 | 
			
		||||
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    def _merge_in_self(self, __object: T, from_map: bool = False):
 | 
			
		||||
        """
 | 
			
		||||
        1. find existing objects
 | 
			
		||||
        2. merge into existing object
 | 
			
		||||
        3. remap existing object
 | 
			
		||||
        """
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        existing_object: DatabaseObject = None
 | 
			
		||||
 | 
			
		||||
        for name, value in __object.indexing_values:
 | 
			
		||||
            if value is None:
 | 
			
		||||
                continue
 | 
			
		||||
            if value in self._indexed_values[name]:
 | 
			
		||||
                existing_object = self._indexed_to_objects[value][0]
 | 
			
		||||
                if existing_object.id == __object.id:
 | 
			
		||||
                    return None
 | 
			
		||||
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
        if existing_object is None:
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        existing_object.merge(__object, replace_all_refs=True)
 | 
			
		||||
 | 
			
		||||
        # just a check if it really worked
 | 
			
		||||
        if existing_object.id != __object.id:
 | 
			
		||||
            raise ValueError("This should NEVER happen. Merging doesn't work.")
 | 
			
		||||
 | 
			
		||||
        self._map_element(existing_object, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
    def contains(self, __object: T) -> bool:
 | 
			
		||||
        return len(self._contained_in_sub(__object)) > 0
 | 
			
		||||
 | 
			
		||||
    def _append(self, __object: T, from_map: bool = False):
 | 
			
		||||
        for attribute, to_sync_with in self.sync_on_append.items():
 | 
			
		||||
            pass
 | 
			
		||||
            to_sync_with.sync_with_other_collection(__object.__getattribute__(attribute))
 | 
			
		||||
 | 
			
		||||
        self._map_element(__object, from_map=from_map)
 | 
			
		||||
        self._data.append(__object)
 | 
			
		||||
 | 
			
		||||
    def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
 | 
			
		||||
        if __object is None:
 | 
			
		||||
            return
 | 
			
		||||
        if __object.id in self._contains_ids:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        exists_in_collection = self._contained_in_sub(__object)
 | 
			
		||||
        if len(exists_in_collection) and self is exists_in_collection[0]:
 | 
			
		||||
            # assuming that the object already is contained in the correct collections
 | 
			
		||||
            if not already_is_parent:
 | 
			
		||||
                self._merge_in_self(__object, from_map=from_map)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if not len(exists_in_collection):
 | 
			
		||||
            self._append(__object, from_map=from_map)
 | 
			
		||||
        else:
 | 
			
		||||
            pass
 | 
			
		||||
            exists_in_collection[0]._merge_in_self(__object, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
        if not already_is_parent or not self._is_root:
 | 
			
		||||
            for parent_collection in self._get_parents_of_multiple_contained_children(__object):
 | 
			
		||||
                pass
 | 
			
		||||
                parent_collection.append(__object, already_is_parent=True, from_map=from_map)
 | 
			
		||||
 | 
			
		||||
    def extend(self, __iterable: Optional[Iterable[T]]):
 | 
			
		||||
        if __iterable is None:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        for __object in __iterable:
 | 
			
		||||
            self.append(__object)
 | 
			
		||||
 | 
			
		||||
    def sync_with_other_collection(self, equal_collection: "Collection"):
 | 
			
		||||
        """
 | 
			
		||||
        If two collections always need to have the same values, this can be used.
 | 
			
		||||
 | 
			
		||||
        Internally:
 | 
			
		||||
        1. import the data from other to self
 | 
			
		||||
            - _data
 | 
			
		||||
            - contained_collections
 | 
			
		||||
        2. replace all refs from the other object, with refs from this object
 | 
			
		||||
        """
 | 
			
		||||
        if equal_collection is self:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # don't add the elements from the subelements from the other collection.
 | 
			
		||||
        # this will be done in the next step.
 | 
			
		||||
        self.extend(equal_collection._data)
 | 
			
		||||
        # add all submodules
 | 
			
		||||
        for equal_sub_collection in equal_collection.contained_collections:
 | 
			
		||||
            self.contain_collection_inside(equal_sub_collection)
 | 
			
		||||
 | 
			
		||||
        # now the ugly part
 | 
			
		||||
        # replace all refs of the other element with this one
 | 
			
		||||
        self._risky_merge(equal_collection)
 | 
			
		||||
 | 
			
		||||
    def contain_collection_inside(self, sub_collection: "Collection"):
 | 
			
		||||
        """
 | 
			
		||||
        This collection will ALWAYS contain everything from the passed in collection
 | 
			
		||||
        """
 | 
			
		||||
        if sub_collection in self.contained_collections:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self.contained_collections.append(sub_collection)
 | 
			
		||||
        sub_collection.upper_collections.append(self)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def data(self) -> List[T]:
 | 
			
		||||
        return [*self._data,
 | 
			
		||||
                *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
 | 
			
		||||
 | 
			
		||||
    def __len__(self) -> int:
 | 
			
		||||
        return len(self._data) + sum(len(collection) for collection in self.contained_collections)
 | 
			
		||||
 | 
			
		||||
    def __iter__(self) -> Iterator[T]:
 | 
			
		||||
        for element in self._data:
 | 
			
		||||
            yield element
 | 
			
		||||
@@ -7,7 +7,7 @@ from functools import lru_cache
 | 
			
		||||
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
 | 
			
		||||
 | 
			
		||||
from .metadata import Metadata
 | 
			
		||||
from ..utils import get_unix_time
 | 
			
		||||
from ..utils import get_unix_time, object_trace
 | 
			
		||||
from ..utils.config import logging_settings, main_settings
 | 
			
		||||
from ..utils.shared import HIGHEST_ID
 | 
			
		||||
from ..utils.hacking import MetaClass
 | 
			
		||||
@@ -26,7 +26,11 @@ class InnerData:
 | 
			
		||||
    If the data in the wrapper class has to be merged, then this class is just replaced and garbage collected.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    _refers_to_instances: set = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, object_type, **kwargs):
 | 
			
		||||
        self._refers_to_instances =set()
 | 
			
		||||
        
 | 
			
		||||
        # initialize the default values
 | 
			
		||||
        self.__default_values = {}
 | 
			
		||||
        for name, factory in object_type._default_factories.items():
 | 
			
		||||
@@ -101,6 +105,9 @@ class OuterProxy:
 | 
			
		||||
 | 
			
		||||
        self._fetched_from: dict = {}
 | 
			
		||||
        self._inner: InnerData = InnerData(type(self), **kwargs)
 | 
			
		||||
        self._inner._refers_to_instances.add(self)
 | 
			
		||||
 | 
			
		||||
        object_trace(f"creating {type(self).__name__} [{self.title_string}]")
 | 
			
		||||
        self.__init_collections__()
 | 
			
		||||
 | 
			
		||||
        for name, data_list in collection_data.items():
 | 
			
		||||
@@ -174,11 +181,25 @@ class OuterProxy:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        if __other is None:
 | 
			
		||||
            _ = "debug"
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        self._inner.__merge__(__other._inner, override=override)
 | 
			
		||||
        __other._inner = self._inner
 | 
			
		||||
        object_trace(f"merging {type(self).__name__} [{self.title_string}] with {type(__other).__name__} [{__other.title_string}]")
 | 
			
		||||
 | 
			
		||||
        a = self
 | 
			
		||||
        b = __other
 | 
			
		||||
 | 
			
		||||
        if a._inner is b._inner:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # switch instances if more efficient
 | 
			
		||||
        if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
 | 
			
		||||
            a, b = b, a
 | 
			
		||||
 | 
			
		||||
        a._inner.__merge__(b._inner, override=override)
 | 
			
		||||
        a._inner._refers_to_instances.update(b._inner._refers_to_instances)
 | 
			
		||||
 | 
			
		||||
        for instance in b._inner._refers_to_instances:
 | 
			
		||||
            instance._inner = a._inner
 | 
			
		||||
 | 
			
		||||
    def mark_as_fetched(self, *url_hash_list: List[str]):
 | 
			
		||||
        for url_hash in url_hash_list:
 | 
			
		||||
 
 | 
			
		||||
@@ -119,7 +119,7 @@ class Song(Base):
 | 
			
		||||
    def indexing_values(self) -> List[Tuple[str, object]]:
 | 
			
		||||
        return [
 | 
			
		||||
            ('id', self.id),
 | 
			
		||||
            ('title', self.unified_title),
 | 
			
		||||
            ('title', unify(self.unified_title)),
 | 
			
		||||
            ('isrc', self.isrc),
 | 
			
		||||
            *[('url', source.url) for source in self.source_collection]
 | 
			
		||||
        ]
 | 
			
		||||
@@ -244,6 +244,9 @@ class Album(Base):
 | 
			
		||||
        self.song_collection.contain_attribute_in_given = {
 | 
			
		||||
            "main_artist_collection": self.artist_collection
 | 
			
		||||
        }
 | 
			
		||||
        self.song_collection.append_object_to_attribute = {
 | 
			
		||||
            "album_collection": self
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
 | 
			
		||||
        if object_type is Song:
 | 
			
		||||
@@ -265,7 +268,7 @@ class Album(Base):
 | 
			
		||||
    def indexing_values(self) -> List[Tuple[str, object]]:
 | 
			
		||||
        return [
 | 
			
		||||
            ('id', self.id),
 | 
			
		||||
            ('title', self.unified_title),
 | 
			
		||||
            ('title', unify(self.title)),
 | 
			
		||||
            ('barcode', self.barcode),
 | 
			
		||||
            *[('url', source.url) for source in self.source_collection]
 | 
			
		||||
        ]
 | 
			
		||||
@@ -530,7 +533,7 @@ class Artist(Base):
 | 
			
		||||
    def indexing_values(self) -> List[Tuple[str, object]]:
 | 
			
		||||
        return [
 | 
			
		||||
            ('id', self.id),
 | 
			
		||||
            ('name', self.unified_name),
 | 
			
		||||
            ('name', unify(self.name)),
 | 
			
		||||
            *[('url', source.url) for source in self.source_collection],
 | 
			
		||||
            *[('contact', contact.value) for contact in self.contact_collection]
 | 
			
		||||
        ]
 | 
			
		||||
@@ -643,7 +646,7 @@ class Label(Base):
 | 
			
		||||
    def indexing_values(self) -> List[Tuple[str, object]]:
 | 
			
		||||
        return [
 | 
			
		||||
            ('id', self.id),
 | 
			
		||||
            ('name', self.unified_name),
 | 
			
		||||
            ('name', unify(self.name)),
 | 
			
		||||
            *[('url', source.url) for source in self.source_collection]
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,3 +1,5 @@
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from enum import Enum
 | 
			
		||||
from typing import List, Dict, Set, Tuple, Optional, Iterable
 | 
			
		||||
@@ -103,12 +105,23 @@ class Source(OuterProxy):
 | 
			
		||||
            ('audio_url', self.audio_url),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    def __merge__(self, __other: Source, override: bool = False):
 | 
			
		||||
        if override:
 | 
			
		||||
            self.audio_url = __other.audio_url
 | 
			
		||||
 | 
			
		||||
        if self.audio_url is None or (override and __other.audio_url is not None):
 | 
			
		||||
            self.audio_url = __other.audio_url
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return self.__repr__()
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
        return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def title_string(self) -> str:
 | 
			
		||||
        return self.url
 | 
			
		||||
 | 
			
		||||
    page_str = property(fget=lambda self: self.page_enum.value)
 | 
			
		||||
    type_str = property(fget=lambda self: self.type_enum.value)
 | 
			
		||||
    homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ from functools import lru_cache
 | 
			
		||||
 | 
			
		||||
import youtube_dl
 | 
			
		||||
from youtube_dl.extractor.youtube import YoutubeIE
 | 
			
		||||
from youtube_dl.utils import DownloadError
 | 
			
		||||
 | 
			
		||||
from ...utils.exception.config import SettingValueError
 | 
			
		||||
from ...utils.config import main_settings, youtube_settings, logging_settings
 | 
			
		||||
@@ -201,6 +202,7 @@ class YoutubeMusic(SuperYouTube):
 | 
			
		||||
        self.yt_ie = MusicKrakenYoutubeIE(downloader=self.ydl, main_instance=self)
 | 
			
		||||
 | 
			
		||||
        self.download_values_by_url: dict = {}
 | 
			
		||||
        self.not_download: Dict[str, DownloadError] = {}
 | 
			
		||||
 | 
			
		||||
    def _fetch_from_main_page(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -483,7 +485,13 @@ class YoutubeMusic(SuperYouTube):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
 | 
			
		||||
        ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
 | 
			
		||||
        ydl_res: dict = {}
 | 
			
		||||
        try:
 | 
			
		||||
            ydl_res: dict = self.ydl.extract_info(url=source.url, download=False)
 | 
			
		||||
        except DownloadError as e:
 | 
			
		||||
            self.not_download[source.hash_url] = e
 | 
			
		||||
            self.LOGGER.error(f"Couldn't fetch song from {source.url}. {e}")
 | 
			
		||||
            return Song()
 | 
			
		||||
 | 
			
		||||
        self.fetch_media_url(source=source, ydl_res=ydl_res)
 | 
			
		||||
 | 
			
		||||
@@ -556,17 +564,20 @@ class YoutubeMusic(SuperYouTube):
 | 
			
		||||
    def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
 | 
			
		||||
        media = self.fetch_media_url(source)
 | 
			
		||||
 | 
			
		||||
        result = self.download_connection.stream_into(
 | 
			
		||||
            media["url"], 
 | 
			
		||||
            target, 
 | 
			
		||||
            name=desc, 
 | 
			
		||||
            raw_url=True, 
 | 
			
		||||
            raw_headers=True,
 | 
			
		||||
            disable_cache=True,
 | 
			
		||||
            headers=media.get("headers", {}),
 | 
			
		||||
            # chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
 | 
			
		||||
            method="GET",
 | 
			
		||||
        )
 | 
			
		||||
        if source.hash_url not in self.not_download:
 | 
			
		||||
            result = self.download_connection.stream_into(
 | 
			
		||||
                media["url"], 
 | 
			
		||||
                target, 
 | 
			
		||||
                name=desc, 
 | 
			
		||||
                raw_url=True, 
 | 
			
		||||
                raw_headers=True,
 | 
			
		||||
                disable_cache=True,
 | 
			
		||||
                headers=media.get("headers", {}),
 | 
			
		||||
                # chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
 | 
			
		||||
                method="GET",
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            result = DownloadResult(error_message=str(self.not_download[source.hash_url]))
 | 
			
		||||
 | 
			
		||||
        if result.is_fatal_error:
 | 
			
		||||
            result.merge(super().download_song_to_target(source=source, target=target, desc=desc))
 | 
			
		||||
 
 | 
			
		||||
@@ -3,7 +3,7 @@ from pathlib import Path
 | 
			
		||||
import json
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE
 | 
			
		||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE
 | 
			
		||||
from .config import config, read_config, write_config
 | 
			
		||||
from .enums.colors import BColors
 | 
			
		||||
from .path_manager import LOCATIONS
 | 
			
		||||
@@ -52,6 +52,12 @@ def trace(msg: str):
 | 
			
		||||
 | 
			
		||||
    output("trace: " + msg, BColors.OKBLUE)
 | 
			
		||||
 | 
			
		||||
def object_trace(obj):
 | 
			
		||||
    if not DEBUG_OBJECT_TRACE:
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    output("object: " + str(obj), BColors.GREY)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
misc functions
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ __stage__ = os.getenv("STAGE", "prod")
 | 
			
		||||
DEBUG = (__stage__ == "dev") and True
 | 
			
		||||
DEBUG_LOGGING = DEBUG and False
 | 
			
		||||
DEBUG_TRACE = DEBUG and True
 | 
			
		||||
DEBUG_OBJECT_TRACE = DEBUG and False
 | 
			
		||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
 | 
			
		||||
DEBUG_PAGES = DEBUG and False
 | 
			
		||||
DEBUG_DUMP = DEBUG and True
 | 
			
		||||
 
 | 
			
		||||
@@ -16,9 +16,12 @@ def unify(string: str) -> str:
 | 
			
		||||
    """
 | 
			
		||||
    returns a unified str, to make comparisons easy.
 | 
			
		||||
    a unified string has the following attributes:
 | 
			
		||||
     - is lowercase
 | 
			
		||||
    - is lowercase
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    if string is None:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        string = translit(string, reversed=True)
 | 
			
		||||
    except LanguageDetectionError:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										86
									
								
								tests/test_collection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										86
									
								
								tests/test_collection.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,86 @@
 | 
			
		||||
import unittest
 | 
			
		||||
 | 
			
		||||
from music_kraken.objects import Song, Album, Artist, Collection, Country
 | 
			
		||||
 | 
			
		||||
class TestCollection(unittest.TestCase):
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def complicated_object() -> Artist:
 | 
			
		||||
        return Artist(
 | 
			
		||||
            name="artist",
 | 
			
		||||
            country=Country.by_alpha_2("DE"),
 | 
			
		||||
            main_album_list=[
 | 
			
		||||
                Album(
 | 
			
		||||
                    title="album",
 | 
			
		||||
                    song_list=[
 | 
			
		||||
                        Song(
 | 
			
		||||
                            title="song",
 | 
			
		||||
                            album_list=[
 | 
			
		||||
                                Album(title="album", albumsort=123),
 | 
			
		||||
                            ],
 | 
			
		||||
                        ),
 | 
			
		||||
                        Song(
 | 
			
		||||
                            title="other_song",
 | 
			
		||||
                            album_list=[
 | 
			
		||||
                                Album(title="album", albumsort=423),
 | 
			
		||||
                            ],
 | 
			
		||||
                        ),
 | 
			
		||||
                    ]
 | 
			
		||||
                ),
 | 
			
		||||
                Album(title="album", barcode="1234567890123"),
 | 
			
		||||
            ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def test_song_album_relation(self):
 | 
			
		||||
        """
 | 
			
		||||
        Tests that
 | 
			
		||||
        album = album.any_song.one_album
 | 
			
		||||
        is the same object
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        a = self.complicated_object().main_album_collection[0]
 | 
			
		||||
        b = a.song_collection[0].album_collection[0]
 | 
			
		||||
        c = a.song_collection[1].album_collection[0]
 | 
			
		||||
        d = b.song_collection[0].album_collection[0]
 | 
			
		||||
        e = d.song_collection[0].album_collection[0]
 | 
			
		||||
        f = e.song_collection[0].album_collection[0]
 | 
			
		||||
        g = f.song_collection[0].album_collection[0]
 | 
			
		||||
 | 
			
		||||
        self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
 | 
			
		||||
        self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
 | 
			
		||||
        self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
 | 
			
		||||
        self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
 | 
			
		||||
 | 
			
		||||
        d.title = "new_title"
 | 
			
		||||
 | 
			
		||||
        self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
 | 
			
		||||
 | 
			
		||||
    def test_album_artist_relation(self):
 | 
			
		||||
        """
 | 
			
		||||
        Tests that
 | 
			
		||||
        artist = artist.any_album.any_song.one_artist
 | 
			
		||||
        is the same object
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        a = self.complicated_object()
 | 
			
		||||
        b = a.main_album_collection[0].artist_collection[0]
 | 
			
		||||
        c = b.main_album_collection[0].artist_collection[0]
 | 
			
		||||
        d = c.main_album_collection[0].artist_collection[0]
 | 
			
		||||
 | 
			
		||||
        self.assertTrue(a.id == b.id == c.id == d.id)
 | 
			
		||||
        self.assertTrue(a.name == b.name == c.name == d.name == "artist")
 | 
			
		||||
        self.assertTrue(a.country == b.country == c.country == d.country)
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    def test_song_artist_relations(self):
 | 
			
		||||
        a = self.complicated_object()
 | 
			
		||||
        b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
 | 
			
		||||
        c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
 | 
			
		||||
        d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
 | 
			
		||||
 | 
			
		||||
        self.assertTrue(a.id == b.id == c.id == d.id)
 | 
			
		||||
        self.assertTrue(a.name == b.name == c.name == d.name == "artist")
 | 
			
		||||
        self.assertTrue(a.country == b.country == c.country == d.country)
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    unittest.main()
 | 
			
		||||
		Reference in New Issue
	
	Block a user