music-kraken-core/src/music_kraken/objects/collection.py

125 lines
3.9 KiB
Python
Raw Normal View History

from typing import List, Iterable, Dict
2023-03-09 17:35:56 +00:00
from collections import defaultdict
2023-02-07 18:26:14 +00:00
2023-03-09 17:19:49 +00:00
from .parents import DatabaseObject
2023-02-07 18:26:14 +00:00
2023-02-23 22:52:41 +00:00
2023-02-07 18:26:14 +00:00
class Collection:
"""
This a class for the iterables
2023-02-07 18:26:14 +00:00
like tracklist or discography
"""
_data: List[DatabaseObject]
2023-02-23 22:52:41 +00:00
2023-02-07 18:26:14 +00:00
_by_url: dict
_by_attribute: dict
2023-03-09 17:19:49 +00:00
def __init__(self, data: List[DatabaseObject] = None, element_type = None, *args, **kwargs) -> None:
# Attribute needs to point to
self.element_type = element_type
2023-03-09 17:35:56 +00:00
self._data: List[DatabaseObject] = list()
2023-03-09 17:19:49 +00:00
2023-02-07 18:26:14 +00:00
"""
2023-03-09 17:19:49 +00:00
example of attribute_to_object_map
the song objects are references pointing to objects
in _data
```python
{
'id': {323: song_1, 563: song_2, 666: song_3},
'url': {'www.song_2.com': song_2}
}
```
2023-02-07 18:26:14 +00:00
"""
2023-03-09 17:35:56 +00:00
self._attribute_to_object_map: Dict[str, Dict[object, DatabaseObject]] = defaultdict(dict)
2023-03-14 10:03:54 +00:00
self._used_ids: set = set()
2023-03-09 17:19:49 +00:00
2023-03-10 09:13:35 +00:00
if data is not None:
self.extend(data, merge_on_conflict=True)
2023-02-07 18:26:14 +00:00
2023-02-23 22:52:41 +00:00
def sort(self, reverse: bool = False, **kwargs):
self._data.sort(reverse=reverse, **kwargs)
2023-03-09 17:35:56 +00:00
def map_element(self, element: DatabaseObject):
for name, value in element.indexing_values:
2023-03-10 17:38:32 +00:00
if value is None:
continue
2023-03-09 17:35:56 +00:00
self._attribute_to_object_map[name][value] = element
2023-03-14 10:03:54 +00:00
self._used_ids.add(element.id)
2023-02-07 18:26:14 +00:00
2023-03-21 11:46:32 +00:00
def append(self, element: DatabaseObject, merge_on_conflict: bool = True) -> DatabaseObject:
2023-03-09 21:14:39 +00:00
"""
:param element:
:param merge_on_conflict:
:return:
"""
# if the element type has been defined in the initializer it checks if the type matches
2023-03-10 09:13:35 +00:00
if self.element_type is not None and not isinstance(element, self.element_type):
2023-02-08 12:16:48 +00:00
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
2023-03-09 17:35:56 +00:00
for name, value in element.indexing_values:
if value in self._attribute_to_object_map[name]:
2023-03-21 11:46:32 +00:00
existing_object = self._attribute_to_object_map[name][value]
2023-03-10 17:38:32 +00:00
if merge_on_conflict:
# if the object does already exist
# thus merging and don't add it afterwards
existing_object.merge(element)
# in case any relevant data has been added (e.g. it remaps the old object)
self.map_element(existing_object)
2023-03-21 11:46:32 +00:00
return existing_object
2023-03-02 15:23:02 +00:00
2023-02-07 18:26:14 +00:00
self._data.append(element)
self.map_element(element)
2023-03-21 11:46:32 +00:00
return element
2023-02-08 12:16:48 +00:00
2023-03-09 17:35:56 +00:00
def extend(self, element_list: Iterable[DatabaseObject], merge_on_conflict: bool = True):
2023-03-03 11:32:08 +00:00
for element in element_list:
self.append(element, merge_on_conflict=merge_on_conflict)
2023-02-08 12:16:48 +00:00
def __iter__(self):
for element in self._data:
yield element
def __str__(self) -> str:
2023-03-02 15:23:02 +00:00
return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
2023-02-08 16:14:51 +00:00
def __len__(self) -> int:
return len(self._data)
2023-02-23 22:52:41 +00:00
def __getitem__(self, item):
if type(item) != int:
return ValueError("key needs to be an integer")
return self._data[item]
2023-03-10 08:09:35 +00:00
@property
def shallow_list(self) -> List[DatabaseObject]:
2023-02-08 16:14:51 +00:00
"""
returns a shallow copy of the data list
"""
return self._data.copy()
2023-03-14 10:03:54 +00:00
def insecure_append(self, element: DatabaseObject):
if element.id in self._used_ids:
return False
self._used_ids.add(element.id)
self._data.append(element)
self.map_element(element)
return True
def insecure_extend(self, element_list: Iterable[DatabaseObject]):
success = False
for element in element_list:
if self.insecure_append(element):
success = True
return success