feat: merge
This commit is contained in:
commit
df351c6b2e
@ -8,7 +8,8 @@ from .options.first_config import initial_config
|
|||||||
from ..utils.config import write_config, main_settings
|
from ..utils.config import write_config, main_settings
|
||||||
from ..utils.regex import URL_PATTERN
|
from ..utils.regex import URL_PATTERN
|
||||||
from ..utils.string_processing import fit_to_file_system
|
from ..utils.string_processing import fit_to_file_system
|
||||||
from ..utils.support_classes import Query, DownloadResult
|
from ..utils.support_classes.query import Query
|
||||||
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
from ..utils.exception.download import UrlNotFoundException
|
from ..utils.exception.download import UrlNotFoundException
|
||||||
from ..download.results import Results, Option, PageResults
|
from ..download.results import Results, Option, PageResults
|
||||||
from ..download.page_attributes import Pages
|
from ..download.page_attributes import Pages
|
||||||
|
@ -9,7 +9,7 @@ from tqdm import tqdm
|
|||||||
|
|
||||||
from .rotating import RotatingProxy
|
from .rotating import RotatingProxy
|
||||||
from ..utils.config import main_settings
|
from ..utils.config import main_settings
|
||||||
from ..utils.support_classes import DownloadResult
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
from ..objects import Target
|
from ..objects import Target
|
||||||
|
|
||||||
|
|
||||||
|
@ -5,7 +5,8 @@ from ..objects import DatabaseObject, Source
|
|||||||
|
|
||||||
from ..utils.config import youtube_settings
|
from ..utils.config import youtube_settings
|
||||||
from ..utils.enums.source import SourcePages
|
from ..utils.enums.source import SourcePages
|
||||||
from ..utils.support_classes import Query, DownloadResult
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
|
from ..utils.support_classes.query import Query
|
||||||
from ..utils.exception.download import UrlNotFoundException
|
from ..utils.exception.download import UrlNotFoundException
|
||||||
from ..utils.shared import DEBUG_PAGES
|
from ..utils.shared import DEBUG_PAGES
|
||||||
|
|
||||||
|
@ -1,221 +1,151 @@
|
|||||||
from typing import List, Iterable, Dict, TypeVar, Generic, Iterator
|
from typing import List, Iterable, Iterator, Optional, TypeVar, Generic, Dict, Type
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
from .parents import DatabaseObject
|
from .parents import DatabaseObject
|
||||||
from ..utils.hooks import HookEventTypes, Hooks, Event
|
from ..utils.support_classes.hacking import MetaClass
|
||||||
|
|
||||||
|
|
||||||
class CollectionHooks(HookEventTypes):
|
|
||||||
APPEND_NEW = "append_new"
|
|
||||||
|
|
||||||
|
|
||||||
T = TypeVar('T', bound=DatabaseObject)
|
T = TypeVar('T', bound=DatabaseObject)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
class Collection(Generic[T], metaclass=MetaClass):
|
||||||
class AppendResult:
|
|
||||||
was_in_collection: bool
|
|
||||||
current_element: DatabaseObject
|
|
||||||
was_the_same: bool
|
|
||||||
|
|
||||||
|
|
||||||
class Collection(Generic[T]):
|
|
||||||
"""
|
|
||||||
This a class for the iterables
|
|
||||||
like tracklist or discography
|
|
||||||
"""
|
|
||||||
_data: List[T]
|
_data: List[T]
|
||||||
|
|
||||||
_by_url: dict
|
_indexed_values: Dict[str, set]
|
||||||
_by_attribute: dict
|
_indexed_to_objects: Dict[any, list]
|
||||||
|
|
||||||
def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None:
|
shallow_list = property(fget=lambda self: self.data)
|
||||||
# Attribute needs to point to
|
|
||||||
self.element_type = element_type
|
|
||||||
|
|
||||||
self._data: List[T] = list()
|
def __init__(
|
||||||
|
self, data: Optional[Iterable[T]],
|
||||||
|
sync_on_append: Dict[str, Collection] = None,
|
||||||
|
contain_given_in_attribute: Dict[str, Collection] = None,
|
||||||
|
contain_attribute_in_given: Dict[str, Collection] = None,
|
||||||
|
append_object_to_attribute: Dict[str, DatabaseObject] = None
|
||||||
|
) -> None:
|
||||||
|
self._data = []
|
||||||
|
self.upper_collections: List[Collection[T]] = []
|
||||||
|
self.contained_collections: List[Collection[T]] = []
|
||||||
|
|
||||||
"""
|
# List of collection attributes that should be modified on append
|
||||||
example of attribute_to_object_map
|
# Key: collection attribute (str) of appended element
|
||||||
the song objects are references pointing to objects
|
# Value: main collection to sync to
|
||||||
in _data
|
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||||
|
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||||
|
self.contain_attribute_in_given: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||||
|
self.append_object_to_attribute: Dict[str, DatabaseObject] = append_object_to_attribute or {}
|
||||||
|
|
||||||
|
self.contain_self_on_append: List[str] = []
|
||||||
|
|
||||||
|
self._indexed_values = defaultdict(set)
|
||||||
|
self._indexed_to_objects = defaultdict(list)
|
||||||
|
|
||||||
```python
|
self.extend(data)
|
||||||
{
|
|
||||||
'id': {323: song_1, 563: song_2, 666: song_3},
|
|
||||||
'url': {'www.song_2.com': song_2}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
"""
|
|
||||||
self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict)
|
|
||||||
self._used_ids: set = set()
|
|
||||||
|
|
||||||
self.hooks: Hooks = Hooks(self)
|
def _map_element(self, __object: T):
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
if data is not None:
|
|
||||||
self.extend(data, merge_on_conflict=True)
|
|
||||||
|
|
||||||
def sort(self, reverse: bool = False, **kwargs):
|
|
||||||
self._data.sort(reverse=reverse, **kwargs)
|
|
||||||
|
|
||||||
def map_element(self, element: T):
|
|
||||||
for name, value in element.indexing_values:
|
|
||||||
if value is None:
|
if value is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._attribute_to_object_map[name][value] = element
|
self._indexed_values[name].add(value)
|
||||||
|
self._indexed_to_objects[value].append(__object)
|
||||||
|
|
||||||
self._used_ids.add(element.id)
|
def _unmap_element(self, __object: T):
|
||||||
|
for name, value in __object.indexing_values:
|
||||||
def unmap_element(self, element: T):
|
|
||||||
for name, value in element.indexing_values:
|
|
||||||
if value is None:
|
if value is None:
|
||||||
continue
|
continue
|
||||||
|
if value not in self._indexed_values[name]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._indexed_to_objects[value].remove(__object)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
if value in self._attribute_to_object_map[name]:
|
if not len(self._indexed_to_objects[value]):
|
||||||
if element is self._attribute_to_object_map[name][value]:
|
self._indexed_values[name].remove(value)
|
||||||
try:
|
|
||||||
self._attribute_to_object_map[name].pop(value)
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def append(self, element: T, merge_on_conflict: bool = True,
|
def _contained_in_self(self, __object: T) -> bool:
|
||||||
merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult:
|
for name, value in __object.indexing_values:
|
||||||
"""
|
if value is None:
|
||||||
:param element:
|
continue
|
||||||
:param merge_on_conflict:
|
if value in self._indexed_values[name]:
|
||||||
:param merge_into_existing:
|
return True
|
||||||
:return did_not_exist:
|
return False
|
||||||
"""
|
|
||||||
if element is None:
|
|
||||||
return AppendResult(False, None, False)
|
|
||||||
|
|
||||||
for existing_element in self._data:
|
def _contained_in(self, __object: T) -> Optional["Collection"]:
|
||||||
if element is existing_element:
|
if self._contained_in_self(__object):
|
||||||
return AppendResult(False, None, False)
|
return self
|
||||||
|
|
||||||
# if the element type has been defined in the initializer it checks if the type matches
|
|
||||||
if self.element_type is not None and not isinstance(element, self.element_type):
|
|
||||||
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
|
|
||||||
|
|
||||||
# return if the same instance of the object is in the list
|
for collection in self.contained_collections:
|
||||||
for existing in self._data:
|
if collection._contained_in_self(__object):
|
||||||
if element is existing:
|
return collection
|
||||||
return AppendResult(True, element, True)
|
|
||||||
|
return None
|
||||||
for name, value in element.indexing_values:
|
|
||||||
if value in self._attribute_to_object_map[name]:
|
|
||||||
existing_object = self._attribute_to_object_map[name][value]
|
|
||||||
|
|
||||||
if not merge_on_conflict:
|
|
||||||
return AppendResult(True, existing_object, False)
|
|
||||||
|
|
||||||
# if the object does already exist
|
|
||||||
# thus merging and don't add it afterward
|
|
||||||
if merge_into_existing:
|
|
||||||
existing_object.merge(element)
|
|
||||||
# in case any relevant data has been added (e.g. it remaps the old object)
|
|
||||||
self.map_element(existing_object)
|
|
||||||
return AppendResult(True, existing_object, False)
|
|
||||||
|
|
||||||
element.merge(existing_object)
|
|
||||||
|
|
||||||
exists_at = self._data.index(existing_object)
|
|
||||||
self._data[exists_at] = element
|
|
||||||
|
|
||||||
self.unmap_element(existing_object)
|
|
||||||
self.map_element(element)
|
|
||||||
return AppendResult(True, existing_object, False)
|
|
||||||
|
|
||||||
if not no_hook:
|
|
||||||
self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element)
|
|
||||||
self._data.append(element)
|
|
||||||
self.map_element(element)
|
|
||||||
|
|
||||||
return AppendResult(False, element, False)
|
|
||||||
|
|
||||||
def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True,
|
|
||||||
merge_into_existing: bool = True, no_hook: bool = False):
|
|
||||||
if element_list is None:
|
|
||||||
return
|
|
||||||
if len(element_list) <= 0:
|
|
||||||
return
|
|
||||||
if element_list is self:
|
|
||||||
return
|
|
||||||
for element in element_list:
|
|
||||||
self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook)
|
|
||||||
|
|
||||||
def sync_collection(self, collection_attribute: str):
|
|
||||||
def on_append(event: Event, new_object: T, *args, **kwargs):
|
|
||||||
new_collection = new_object.__getattribute__(collection_attribute)
|
|
||||||
if self is new_collection:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
|
|
||||||
new_object.__setattr__(collection_attribute, self)
|
|
||||||
|
|
||||||
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
|
||||||
|
|
||||||
def sync_main_collection(self, main_collection: "Collection", collection_attribute: str):
|
|
||||||
def on_append(event: Event, new_object: T, *args, **kwargs):
|
|
||||||
new_collection = new_object.__getattribute__(collection_attribute)
|
|
||||||
if main_collection is new_collection:
|
|
||||||
return
|
|
||||||
|
|
||||||
main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
|
def contains(self, __object: T) -> bool:
|
||||||
new_object.__setattr__(collection_attribute, main_collection)
|
return self._contained_in(__object) is not None
|
||||||
|
|
||||||
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
|
||||||
|
|
||||||
"""
|
def _append(self, __object: T):
|
||||||
def on_append(event: Event, new_object: T, *args, **kwargs):
|
self._map_element(__object)
|
||||||
new_collection: Collection = new_object.__getattribute__(collection_attribute)
|
self._data.append(__object)
|
||||||
if self is new_collection:
|
|
||||||
return
|
def append(self, __object: Optional[T]):
|
||||||
|
if __object is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._append(__object)
|
||||||
|
|
||||||
|
def extend(self, __iterable: Optional[Iterable[T]]):
|
||||||
|
if __iterable is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
for __object in __iterable:
|
||||||
|
self.append(__object)
|
||||||
|
|
||||||
|
def sync_with_other_collection(self, equal_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
If two collections always need to have the same values, this can be used.
|
||||||
|
|
||||||
|
Internally:
|
||||||
|
1. import the data from other to self
|
||||||
|
- _data
|
||||||
|
- contained_collections
|
||||||
|
2. replace all refs from the other object, with refs from this object
|
||||||
|
"""
|
||||||
|
if equal_collection is self:
|
||||||
|
return
|
||||||
|
|
||||||
|
# don't add the elements from the subelements from the other collection.
|
||||||
|
# this will be done in the next step.
|
||||||
|
self.extend(equal_collection._data)
|
||||||
|
# add all submodules
|
||||||
|
for equal_sub_collection in equal_collection.contained_collections:
|
||||||
|
self.contain_collection_inside(equal_sub_collection)
|
||||||
|
|
||||||
|
# now the ugly part
|
||||||
|
# replace all refs of the other element with this one
|
||||||
|
self.merge(equal_collection)
|
||||||
|
|
||||||
|
|
||||||
|
def contain_collection_inside(self, sub_collection: "Collection"):
|
||||||
|
"""
|
||||||
|
This collection will ALWAYS contain everything from the passed in collection
|
||||||
|
"""
|
||||||
|
if sub_collection in self.contained_collections:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.contained_collections.append(sub_collection)
|
||||||
|
sub_collection.upper_collections.append(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def data(self) -> List[T]:
|
||||||
|
return [*self._data, *(__object for collection in self.contained_collections for __object in collection.shallow_list)]
|
||||||
|
|
||||||
self.extend(new_collection.shallow_list, no_hook=False)
|
def __len__(self) -> int:
|
||||||
new_object.__setattr__(collection_attribute, self)
|
return len(self._data) + sum(len(collection) for collection in self.contained_collections)
|
||||||
|
|
||||||
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[T]:
|
def __iter__(self) -> Iterator[T]:
|
||||||
for element in self._data:
|
for element in self._data:
|
||||||
yield element
|
yield element
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
|
|
||||||
|
|
||||||
def __len__(self) -> int:
|
|
||||||
return len(self._data)
|
|
||||||
|
|
||||||
def __getitem__(self, key) -> T:
|
|
||||||
if type(key) != int:
|
|
||||||
return ValueError("key needs to be an integer")
|
|
||||||
|
|
||||||
return self._data[key]
|
|
||||||
|
|
||||||
def __setitem__(self, key, value: T):
|
|
||||||
if type(key) != int:
|
|
||||||
return ValueError("key needs to be an integer")
|
|
||||||
|
|
||||||
old_item = self._data[key]
|
|
||||||
self.unmap_element(old_item)
|
|
||||||
self.map_element(value)
|
|
||||||
|
|
||||||
self._data[key] = value
|
|
||||||
|
|
||||||
@property
|
|
||||||
def shallow_list(self) -> List[T]:
|
|
||||||
"""
|
|
||||||
returns a shallow copy of the data list
|
|
||||||
"""
|
|
||||||
return self._data.copy()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def empty(self) -> bool:
|
|
||||||
return len(self._data) == 0
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self.__init__(element_type=self.element_type)
|
|
221
src/music_kraken/objects/old_collection.py
Normal file
221
src/music_kraken/objects/old_collection.py
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
from typing import List, Iterable, Dict, TypeVar, Generic, Iterator
|
||||||
|
from collections import defaultdict
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from .parents import DatabaseObject
|
||||||
|
from ..utils.hooks import HookEventTypes, Hooks, Event
|
||||||
|
|
||||||
|
|
||||||
|
class CollectionHooks(HookEventTypes):
|
||||||
|
APPEND_NEW = "append_new"
|
||||||
|
|
||||||
|
|
||||||
|
T = TypeVar('T', bound=DatabaseObject)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AppendResult:
|
||||||
|
was_in_collection: bool
|
||||||
|
current_element: DatabaseObject
|
||||||
|
was_the_same: bool
|
||||||
|
|
||||||
|
|
||||||
|
class Collection(Generic[T]):
|
||||||
|
"""
|
||||||
|
This a class for the iterables
|
||||||
|
like tracklist or discography
|
||||||
|
"""
|
||||||
|
_data: List[T]
|
||||||
|
|
||||||
|
_by_url: dict
|
||||||
|
_by_attribute: dict
|
||||||
|
|
||||||
|
def __init__(self, data: List[T] = None, element_type=None, *args, **kwargs) -> None:
|
||||||
|
# Attribute needs to point to
|
||||||
|
self.element_type = element_type
|
||||||
|
|
||||||
|
self._data: List[T] = list()
|
||||||
|
|
||||||
|
"""
|
||||||
|
example of attribute_to_object_map
|
||||||
|
the song objects are references pointing to objects
|
||||||
|
in _data
|
||||||
|
|
||||||
|
```python
|
||||||
|
{
|
||||||
|
'id': {323: song_1, 563: song_2, 666: song_3},
|
||||||
|
'url': {'www.song_2.com': song_2}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
self._attribute_to_object_map: Dict[str, Dict[object, T]] = defaultdict(dict)
|
||||||
|
self._used_ids: set = set()
|
||||||
|
|
||||||
|
self.hooks: Hooks = Hooks(self)
|
||||||
|
|
||||||
|
if data is not None:
|
||||||
|
self.extend(data, merge_on_conflict=True)
|
||||||
|
|
||||||
|
def sort(self, reverse: bool = False, **kwargs):
|
||||||
|
self._data.sort(reverse=reverse, **kwargs)
|
||||||
|
|
||||||
|
def map_element(self, element: T):
|
||||||
|
for name, value in element.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._attribute_to_object_map[name][value] = element
|
||||||
|
|
||||||
|
self._used_ids.add(element.id)
|
||||||
|
|
||||||
|
def unmap_element(self, element: T):
|
||||||
|
for name, value in element.indexing_values:
|
||||||
|
if value is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if value in self._attribute_to_object_map[name]:
|
||||||
|
if element is self._attribute_to_object_map[name][value]:
|
||||||
|
try:
|
||||||
|
self._attribute_to_object_map[name].pop(value)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def append(self, element: T, merge_on_conflict: bool = True,
|
||||||
|
merge_into_existing: bool = True, no_hook: bool = False) -> AppendResult:
|
||||||
|
"""
|
||||||
|
:param element:
|
||||||
|
:param merge_on_conflict:
|
||||||
|
:param merge_into_existing:
|
||||||
|
:return did_not_exist:
|
||||||
|
"""
|
||||||
|
if element is None:
|
||||||
|
return AppendResult(False, None, False)
|
||||||
|
|
||||||
|
for existing_element in self._data:
|
||||||
|
if element is existing_element:
|
||||||
|
return AppendResult(False, None, False)
|
||||||
|
|
||||||
|
# if the element type has been defined in the initializer it checks if the type matches
|
||||||
|
if self.element_type is not None and not isinstance(element, self.element_type):
|
||||||
|
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
|
||||||
|
|
||||||
|
# return if the same instance of the object is in the list
|
||||||
|
for existing in self._data:
|
||||||
|
if element is existing:
|
||||||
|
return AppendResult(True, element, True)
|
||||||
|
|
||||||
|
for name, value in element.indexing_values:
|
||||||
|
if value in self._attribute_to_object_map[name]:
|
||||||
|
existing_object = self._attribute_to_object_map[name][value]
|
||||||
|
|
||||||
|
if not merge_on_conflict:
|
||||||
|
return AppendResult(True, existing_object, False)
|
||||||
|
|
||||||
|
# if the object does already exist
|
||||||
|
# thus merging and don't add it afterward
|
||||||
|
if merge_into_existing:
|
||||||
|
existing_object.merge(element)
|
||||||
|
# in case any relevant data has been added (e.g. it remaps the old object)
|
||||||
|
self.map_element(existing_object)
|
||||||
|
return AppendResult(True, existing_object, False)
|
||||||
|
|
||||||
|
element.merge(existing_object)
|
||||||
|
|
||||||
|
exists_at = self._data.index(existing_object)
|
||||||
|
self._data[exists_at] = element
|
||||||
|
|
||||||
|
self.unmap_element(existing_object)
|
||||||
|
self.map_element(element)
|
||||||
|
return AppendResult(True, existing_object, False)
|
||||||
|
|
||||||
|
if not no_hook:
|
||||||
|
self.hooks.trigger_event(CollectionHooks.APPEND_NEW, new_object=element)
|
||||||
|
self._data.append(element)
|
||||||
|
self.map_element(element)
|
||||||
|
|
||||||
|
return AppendResult(False, element, False)
|
||||||
|
|
||||||
|
def extend(self, element_list: Iterable[T], merge_on_conflict: bool = True,
|
||||||
|
merge_into_existing: bool = True, no_hook: bool = False):
|
||||||
|
if element_list is None:
|
||||||
|
return
|
||||||
|
if len(element_list) <= 0:
|
||||||
|
return
|
||||||
|
if element_list is self:
|
||||||
|
return
|
||||||
|
for element in element_list:
|
||||||
|
self.append(element, merge_on_conflict=merge_on_conflict, merge_into_existing=merge_into_existing, no_hook=no_hook)
|
||||||
|
|
||||||
|
def sync_collection(self, collection_attribute: str):
|
||||||
|
def on_append(event: Event, new_object: T, *args, **kwargs):
|
||||||
|
new_collection = new_object.__getattribute__(collection_attribute)
|
||||||
|
if self is new_collection:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
|
||||||
|
new_object.__setattr__(collection_attribute, self)
|
||||||
|
|
||||||
|
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
||||||
|
|
||||||
|
def sync_main_collection(self, main_collection: "Collection", collection_attribute: str):
|
||||||
|
def on_append(event: Event, new_object: T, *args, **kwargs):
|
||||||
|
new_collection = new_object.__getattribute__(collection_attribute)
|
||||||
|
if main_collection is new_collection:
|
||||||
|
return
|
||||||
|
|
||||||
|
main_collection.extend(new_object.__getattribute__(collection_attribute), no_hook=True)
|
||||||
|
new_object.__setattr__(collection_attribute, main_collection)
|
||||||
|
|
||||||
|
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
||||||
|
|
||||||
|
"""
|
||||||
|
def on_append(event: Event, new_object: T, *args, **kwargs):
|
||||||
|
new_collection: Collection = new_object.__getattribute__(collection_attribute)
|
||||||
|
if self is new_collection:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.extend(new_collection.shallow_list, no_hook=False)
|
||||||
|
new_object.__setattr__(collection_attribute, self)
|
||||||
|
|
||||||
|
self.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_append)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __iter__(self) -> Iterator[T]:
|
||||||
|
for element in self._data:
|
||||||
|
yield element
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "\n".join([f"{str(j).zfill(2)}: {i.__repr__()}" for j, i in enumerate(self._data)])
|
||||||
|
|
||||||
|
def __len__(self) -> int:
|
||||||
|
return len(self._data)
|
||||||
|
|
||||||
|
def __getitem__(self, key) -> T:
|
||||||
|
if type(key) != int:
|
||||||
|
return ValueError("key needs to be an integer")
|
||||||
|
|
||||||
|
return self._data[key]
|
||||||
|
|
||||||
|
def __setitem__(self, key, value: T):
|
||||||
|
if type(key) != int:
|
||||||
|
return ValueError("key needs to be an integer")
|
||||||
|
|
||||||
|
old_item = self._data[key]
|
||||||
|
self.unmap_element(old_item)
|
||||||
|
self.map_element(value)
|
||||||
|
|
||||||
|
self._data[key] = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shallow_list(self) -> List[T]:
|
||||||
|
"""
|
||||||
|
returns a shallow copy of the data list
|
||||||
|
"""
|
||||||
|
return self._data.copy()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def empty(self) -> bool:
|
||||||
|
return len(self._data) == 0
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.__init__(element_type=self.element_type)
|
@ -7,6 +7,7 @@ from .metadata import Metadata
|
|||||||
from .option import Options
|
from .option import Options
|
||||||
from ..utils.shared import HIGHEST_ID
|
from ..utils.shared import HIGHEST_ID
|
||||||
from ..utils.config import main_settings, logging_settings
|
from ..utils.config import main_settings, logging_settings
|
||||||
|
from ..utils.support_classes.hacking import MetaClass
|
||||||
|
|
||||||
|
|
||||||
LOGGER = logging_settings["object_logger"]
|
LOGGER = logging_settings["object_logger"]
|
||||||
@ -42,7 +43,7 @@ class Attribute(Generic[P]):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseObject:
|
class DatabaseObject(metaclass=MetaClass):
|
||||||
COLLECTION_STRING_ATTRIBUTES: tuple = tuple()
|
COLLECTION_STRING_ATTRIBUTES: tuple = tuple()
|
||||||
SIMPLE_STRING_ATTRIBUTES: dict = dict()
|
SIMPLE_STRING_ATTRIBUTES: dict = dict()
|
||||||
|
|
||||||
@ -145,7 +146,9 @@ class DatabaseObject:
|
|||||||
|
|
||||||
return list()
|
return list()
|
||||||
|
|
||||||
def merge(self, other, override: bool = False):
|
def merge(self, other, override: bool = False, replace_all_refs: bool = False):
|
||||||
|
print("merge")
|
||||||
|
|
||||||
if other is None:
|
if other is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -168,6 +171,9 @@ class DatabaseObject:
|
|||||||
if override or getattr(self, simple_attribute) == default_value:
|
if override or getattr(self, simple_attribute) == default_value:
|
||||||
setattr(self, simple_attribute, getattr(other, simple_attribute))
|
setattr(self, simple_attribute, getattr(other, simple_attribute))
|
||||||
|
|
||||||
|
if replace_all_refs:
|
||||||
|
self.merge(other)
|
||||||
|
|
||||||
def strip_details(self):
|
def strip_details(self):
|
||||||
for collection in type(self).DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
for collection in type(self).DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
|
||||||
getattr(self, collection).clear()
|
getattr(self, collection).clear()
|
||||||
|
@ -5,7 +5,7 @@ from typing import List, Optional, Dict, Tuple, Type
|
|||||||
import pycountry
|
import pycountry
|
||||||
|
|
||||||
from ..utils.enums.album import AlbumType, AlbumStatus
|
from ..utils.enums.album import AlbumType, AlbumStatus
|
||||||
from .collection import Collection, CollectionHooks
|
from .collection import Collection
|
||||||
from .formatted_text import FormattedText
|
from .formatted_text import FormattedText
|
||||||
from .lyrics import Lyrics
|
from .lyrics import Lyrics
|
||||||
from .contact import Contact
|
from .contact import Contact
|
||||||
@ -100,25 +100,29 @@ class Song(MainObject):
|
|||||||
self.notes: FormattedText = notes or FormattedText()
|
self.notes: FormattedText = notes or FormattedText()
|
||||||
|
|
||||||
self.source_collection: SourceCollection = SourceCollection(source_list)
|
self.source_collection: SourceCollection = SourceCollection(source_list)
|
||||||
self.target_collection: Collection[Target] = Collection(data=target_list, element_type=Target)
|
self.target_collection: Collection[Target] = Collection(data=target_list)
|
||||||
self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list, element_type=Lyrics)
|
self.lyrics_collection: Collection[Lyrics] = Collection(data=lyrics_list)
|
||||||
|
|
||||||
# main_artist_collection = album.artist collection
|
# main_artist_collection = album.artist collection
|
||||||
self.main_artist_collection: Collection[Artist] = Collection(data=main_artist_list, element_type=Artist)
|
self.main_artist_collection: Collection[Artist] = Collection(data=[])
|
||||||
|
|
||||||
# this album_collection equals no collection
|
# this album_collection equals no collection
|
||||||
self.album_collection: Collection[Album] = Collection(data=[], element_type=Album)
|
self.album_collection: Collection[Album] = Collection(data=album_list,
|
||||||
self.album_collection.sync_main_collection(self.main_artist_collection, "artist_collection")
|
contain_given_in_attribute={
|
||||||
self.album_collection.extend(album_list)
|
"artist_collection": self.main_artist_collection
|
||||||
# self.album_collection.sync_collection("song_collection")
|
}, append_object_to_attribute={
|
||||||
# self.album_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_album_append)
|
"song_collection": self
|
||||||
|
})
|
||||||
|
|
||||||
# on feature_artist_collection append, append self to artist self
|
self.main_artist_collection.contain_given_in_attribute = {"main_album_collection": self.album_collection}
|
||||||
self.feature_artist_collection: Collection[Artist] = Collection(data=[], element_type=Artist)
|
self.main_artist_collection.extend(main_artist_list)
|
||||||
def on_feature_artist_append(event, new_object: Artist, *args, **kwargs):
|
|
||||||
new_object.feature_song_collection.append(self, no_hook=True)
|
self.feature_artist_collection: Collection[Artist] = Collection(
|
||||||
self.feature_artist_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_feature_artist_append)
|
data=feature_artist_list,
|
||||||
self.feature_artist_collection.extend(feature_artist_list)
|
append_object_to_attribute={
|
||||||
|
"feature_song_collection": self
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
def _build_recursive_structures(self, build_version: int, merge: bool):
|
def _build_recursive_structures(self, build_version: int, merge: bool):
|
||||||
if build_version == self.build_version:
|
if build_version == self.build_version:
|
||||||
@ -322,13 +326,16 @@ class Album(MainObject):
|
|||||||
|
|
||||||
self.source_collection: SourceCollection = SourceCollection(source_list)
|
self.source_collection: SourceCollection = SourceCollection(source_list)
|
||||||
|
|
||||||
self.artist_collection: Collection[Artist] = Collection(data=artist_list, element_type=Artist)
|
self.artist_collection: Collection[Artist] = Collection(data=artist_list)
|
||||||
|
|
||||||
self.song_collection: Collection[Song] = Collection(data=[], element_type=Song)
|
self.song_collection: Collection[Song] = Collection(
|
||||||
self.song_collection.sync_main_collection(self.artist_collection, "main_artist_collection")
|
data=song_list,
|
||||||
self.song_collection.extend(song_list)
|
contain_attribute_in_given={
|
||||||
|
"main_artist_collection": self.artist_collection
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self.label_collection: Collection[Label] = Collection(data=label_list, element_type=Label)
|
self.label_collection: Collection[Label] = Collection(data=label_list)
|
||||||
|
|
||||||
def _build_recursive_structures(self, build_version: int, merge: bool):
|
def _build_recursive_structures(self, build_version: int, merge: bool):
|
||||||
if build_version == self.build_version:
|
if build_version == self.build_version:
|
||||||
@ -589,22 +596,26 @@ class Artist(MainObject):
|
|||||||
self.source_collection: SourceCollection = SourceCollection(source_list)
|
self.source_collection: SourceCollection = SourceCollection(source_list)
|
||||||
self.contact_collection: Collection[Label] = Collection(data=contact_list, element_type=Contact)
|
self.contact_collection: Collection[Label] = Collection(data=contact_list, element_type=Contact)
|
||||||
|
|
||||||
self.feature_song_collection: Collection[Song] = Collection(data=[], element_type=Song)
|
self.feature_song_collection: Collection[Song] = Collection(
|
||||||
def on_feature_song_append(event, new_object: Song, *args, **kwargs):
|
data=feature_song_list,
|
||||||
new_object.feature_artist_collection.append(self, no_hook=True)
|
append_object_to_attribute={
|
||||||
self.feature_song_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_feature_song_append)
|
"feature_artist_collection": self
|
||||||
self.feature_song_collection.extend(feature_song_list)
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self.main_album_collection: Collection[Album] = Collection(data=[], element_type=Album)
|
self.main_album_collection: Collection[Album] = Collection(
|
||||||
def on_album_append(event, new_object: Album, *args, **kwargs):
|
data=main_album_list,
|
||||||
new_object.artist_collection.append(self, no_hook=True)
|
append_object_to_attribute={
|
||||||
self.main_album_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_album_append)
|
"artist_collection": self
|
||||||
self.main_album_collection.extend(main_album_list)
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self.label_collection: Collection[Label] = Collection(data=label_list, element_type=Label)
|
self.label_collection: Collection[Label] = Collection(
|
||||||
def on_label_append(event, new_object: Label, *args, **kwargs):
|
data=label_list,
|
||||||
new_object.current_artist_collection.append(self, no_hook=True)
|
append_object_to_attribute={
|
||||||
self.label_collection.hooks.add_event_listener(CollectionHooks.APPEND_NEW, on_label_append)
|
"current_artist_collection": self
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):
|
def _add_other_db_objects(self, object_type: Type["DatabaseObject"], object_list: List["DatabaseObject"]):
|
||||||
|
@ -23,7 +23,8 @@ from ..utils.enums.source import SourcePages
|
|||||||
from ..utils.enums.album import AlbumType
|
from ..utils.enums.album import AlbumType
|
||||||
from ..audio import write_metadata_to_target, correct_codec
|
from ..audio import write_metadata_to_target, correct_codec
|
||||||
from ..utils.config import main_settings
|
from ..utils.config import main_settings
|
||||||
from ..utils.support_classes import Query, DownloadResult
|
from ..utils.support_classes.query import Query
|
||||||
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
from ..utils.string_processing import fit_to_file_system
|
from ..utils.string_processing import fit_to_file_system
|
||||||
|
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ from ..objects import (
|
|||||||
FormattedText
|
FormattedText
|
||||||
)
|
)
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..utils.support_classes import DownloadResult
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
from ..utils.config import main_settings, logging_settings
|
from ..utils.config import main_settings, logging_settings
|
||||||
from ..utils.shared import DEBUG
|
from ..utils.shared import DEBUG
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
|
@ -9,7 +9,7 @@ from ..utils.config import logging_settings
|
|||||||
from .abstract import Page
|
from .abstract import Page
|
||||||
from ..utils.enums.source import SourcePages
|
from ..utils.enums.source import SourcePages
|
||||||
from ..utils.enums.album import AlbumType
|
from ..utils.enums.album import AlbumType
|
||||||
from ..utils.support_classes import Query
|
from ..utils.support_classes.query import Query
|
||||||
from ..objects import (
|
from ..objects import (
|
||||||
Lyrics,
|
Lyrics,
|
||||||
Artist,
|
Artist,
|
||||||
|
@ -25,7 +25,8 @@ from ..objects import (
|
|||||||
)
|
)
|
||||||
from ..utils.config import logging_settings
|
from ..utils.config import logging_settings
|
||||||
from ..utils import string_processing, shared
|
from ..utils import string_processing, shared
|
||||||
from ..utils.support_classes import DownloadResult, Query
|
from ..utils.support_classes.query import Query
|
||||||
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
|
|
||||||
"""
|
"""
|
||||||
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
|
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
|
||||||
|
@ -15,7 +15,8 @@ from ..objects import (
|
|||||||
Target
|
Target
|
||||||
)
|
)
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..utils.support_classes import DownloadResult
|
from ..utils.support_classes.query import Query
|
||||||
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
|
|
||||||
class Preset(Page):
|
class Preset(Page):
|
||||||
# CHANGE
|
# CHANGE
|
||||||
|
@ -20,7 +20,7 @@ from ..objects import (
|
|||||||
)
|
)
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..utils.string_processing import clean_song_title
|
from ..utils.string_processing import clean_song_title
|
||||||
from ..utils.support_classes import DownloadResult
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
from ..utils.config import youtube_settings, main_settings, logging_settings
|
from ..utils.config import youtube_settings, main_settings, logging_settings
|
||||||
|
|
||||||
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
|
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
|
||||||
|
@ -19,7 +19,7 @@ from ...objects import (
|
|||||||
ID3Timestamp
|
ID3Timestamp
|
||||||
)
|
)
|
||||||
from ...connection import Connection
|
from ...connection import Connection
|
||||||
from ...utils.support_classes import DownloadResult
|
from ...utils.support_classes.download_result import DownloadResult
|
||||||
from ...utils.config import youtube_settings, logging_settings, main_settings
|
from ...utils.config import youtube_settings, logging_settings, main_settings
|
||||||
|
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ from ...objects import (
|
|||||||
Target
|
Target
|
||||||
)
|
)
|
||||||
from ...connection import Connection
|
from ...connection import Connection
|
||||||
from ...utils.support_classes import DownloadResult
|
from ...utils.support_classes.download_result import DownloadResult
|
||||||
|
|
||||||
from ._list_render import parse_renderer
|
from ._list_render import parse_renderer
|
||||||
from .super_youtube import SuperYouTube
|
from .super_youtube import SuperYouTube
|
||||||
|
@ -1,25 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import guppy
|
|
||||||
from guppy.heapy import Path
|
|
||||||
|
|
||||||
|
|
||||||
hp = guppy.hpy()
|
|
||||||
|
|
||||||
def replace_all_refs(replace_with, replace):
|
|
||||||
"""
|
|
||||||
NO
|
|
||||||
I have a very good reason to use this here
|
|
||||||
DONT use this anywhere else...
|
|
||||||
|
|
||||||
This replaces **ALL** references to replace with a reference to replace_with.
|
|
||||||
|
|
||||||
https://benkurtovic.com/2015/01/28/python-object-replacement.html
|
|
||||||
"""
|
|
||||||
for path in hp.iso(replace).pathsin:
|
|
||||||
relation = path.path[1]
|
|
||||||
if isinstance(relation, Path.R_INDEXVAL):
|
|
||||||
path.src.theone[relation.r] = replace_with
|
|
||||||
|
|
||||||
|
|
||||||
def clear_console():
|
def clear_console():
|
||||||
|
@ -1,3 +1 @@
|
|||||||
from .download_result import DownloadResult
|
|
||||||
from .query import Query
|
|
||||||
from .thread_classes import EndThread, FinishedSearch
|
from .thread_classes import EndThread, FinishedSearch
|
||||||
|
67
src/music_kraken/utils/support_classes/hacking.py
Normal file
67
src/music_kraken/utils/support_classes/hacking.py
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
from types import FunctionType
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
class Lake:
|
||||||
|
def __init__(self):
|
||||||
|
self.redirects: Dict[int, int] = {}
|
||||||
|
self.id_to_object: Dict[int, object] = {}
|
||||||
|
|
||||||
|
def get_real_object(self, db_object: object) -> object:
|
||||||
|
def _get_real_id(_id: int) -> int:
|
||||||
|
return self.redirects.get(_id, _id)
|
||||||
|
|
||||||
|
_id = _get_real_id(id(db_object))
|
||||||
|
if _id not in self.id_to_object:
|
||||||
|
self.add(db_object)
|
||||||
|
|
||||||
|
return self.id_to_object[_id]
|
||||||
|
|
||||||
|
def add(self, db_object: object):
|
||||||
|
self.id_to_object[id(db_object)] = db_object
|
||||||
|
|
||||||
|
def override(self, to_override: object, new_db_object: object):
|
||||||
|
self.redirects[id(to_override)] = id(new_db_object)
|
||||||
|
del self.id_to_object[id(to_override)]
|
||||||
|
|
||||||
|
|
||||||
|
lake = Lake()
|
||||||
|
|
||||||
|
|
||||||
|
def wrapper(method):
|
||||||
|
@wraps(method)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
if len(args) >= 0 and method.__name__ != "__init__":
|
||||||
|
_self = lake.get_real_object(args[0])
|
||||||
|
args = (_self, *args[1:])
|
||||||
|
|
||||||
|
return method(*args, **kwargs)
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class BaseClass:
|
||||||
|
def merge(self, to_replace):
|
||||||
|
lake.override(to_replace, self)
|
||||||
|
|
||||||
|
|
||||||
|
class MetaClass(type):
|
||||||
|
def __new__(meta, classname, bases, classDict):
|
||||||
|
bases = (*bases, BaseClass)
|
||||||
|
newClassDict = {}
|
||||||
|
|
||||||
|
for attributeName, attribute in classDict.items():
|
||||||
|
if isinstance(attribute, FunctionType) and attributeName not in ("__new__", "__init__"):
|
||||||
|
attribute = wrapper(attribute)
|
||||||
|
newClassDict[attributeName] = attribute
|
||||||
|
|
||||||
|
for key, value in object.__dict__.items( ):
|
||||||
|
if hasattr( value, '__call__' ) and value not in newClassDict and key not in ("__new__", "__repr__", "__init__"):
|
||||||
|
newClassDict[key] = wrapper(value)
|
||||||
|
|
||||||
|
new_instance = type.__new__(meta, classname, bases, newClassDict)
|
||||||
|
|
||||||
|
lake.add(new_instance)
|
||||||
|
|
||||||
|
return new_instance
|
Loading…
Reference in New Issue
Block a user