Merge pull request 'fix/metal_archives' (#10) from fix/metal_archives into experimental
Reviewed-on: #10
This commit is contained in:
commit
d374ca324d
1
.vscode/settings.json
vendored
1
.vscode/settings.json
vendored
@ -26,6 +26,7 @@
|
||||
"OKBLUE",
|
||||
"Referer",
|
||||
"tracksort",
|
||||
"unmap",
|
||||
"youtube"
|
||||
]
|
||||
}
|
@ -6,8 +6,8 @@ logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
if __name__ == "__main__":
|
||||
commands = [
|
||||
"s: #a And End...",
|
||||
"d: 10",
|
||||
"s: #a Ghost Bath",
|
||||
"4",
|
||||
]
|
||||
|
||||
|
||||
|
@ -44,7 +44,7 @@ init_logging()
|
||||
from . import cli
|
||||
|
||||
if DEBUG:
|
||||
sys.setrecursionlimit(500)
|
||||
sys.setrecursionlimit(300)
|
||||
|
||||
|
||||
if main_settings['modify_gc']:
|
||||
|
@ -101,7 +101,7 @@ class Connection:
|
||||
}
|
||||
|
||||
if self.HOST is not None:
|
||||
headers["Host"] = self.HOST.netloc
|
||||
# headers["Host"] = self.HOST.netloc
|
||||
headers["Referer"] = self.base_url(url=self.HOST)
|
||||
|
||||
headers.update(header_values)
|
||||
@ -215,10 +215,6 @@ class Connection:
|
||||
self.save(r, name, **kwargs)
|
||||
return r
|
||||
|
||||
if self.SEMANTIC_NOT_FOUND and r.status_code == 404:
|
||||
self.LOGGER.warning(f"Couldn't find url (404): {request_url}")
|
||||
return None
|
||||
|
||||
# the server rejected the request, or the internet is lacking
|
||||
except requests.exceptions.Timeout:
|
||||
self.LOGGER.warning(f"Request timed out at \"{request_url}\": ({try_count}-{self.TRIES})")
|
||||
@ -231,15 +227,20 @@ class Connection:
|
||||
finally:
|
||||
self.lock = False
|
||||
|
||||
if not connection_failed:
|
||||
if r is None:
|
||||
self.LOGGER.warning(f"{self.HOST.netloc} didn't respond at {url}. ({try_count}-{self.TRIES})")
|
||||
self.LOGGER.debug("request headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in headers.items()))
|
||||
else:
|
||||
self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} at {url}. ({try_count}-{self.TRIES})")
|
||||
if r is not None:
|
||||
self.LOGGER.debug("request headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.request.headers.items()))
|
||||
self.LOGGER.debug("response headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.headers.items()))
|
||||
self.LOGGER.debug(r.content)
|
||||
|
||||
if name != "":
|
||||
self.save(r, name, error=True, **kwargs)
|
||||
self.LOGGER.debug("request headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.request.headers.items()))
|
||||
self.LOGGER.debug("response headers:\n\t"+ "\n\t".join(f"{k}\t=\t{v}" for k, v in r.headers.items()))
|
||||
self.LOGGER.debug(r.content)
|
||||
|
||||
if name != "":
|
||||
self.save(r, name, error=True, **kwargs)
|
||||
|
||||
if self.SEMANTIC_NOT_FOUND and r.status_code == 404:
|
||||
return None
|
||||
|
||||
if sleep_after_404 != 0:
|
||||
self.LOGGER.warning(f"Waiting for {sleep_after_404} seconds.")
|
||||
|
@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple
|
||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union
|
||||
from .parents import OuterProxy
|
||||
|
||||
T = TypeVar('T', bound=OuterProxy)
|
||||
@ -35,56 +35,40 @@ class Collection(Generic[T]):
|
||||
# Key: collection attribute (str) of appended element
|
||||
# Value: main collection to sync to
|
||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
||||
self.contain_attribute_in_given: Dict[str, Collection] = contain_attribute_in_given or {}
|
||||
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
||||
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||
|
||||
self.contain_self_on_append: List[str] = []
|
||||
|
||||
self._indexed_values = defaultdict(set)
|
||||
self._indexed_to_objects = defaultdict(list)
|
||||
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
||||
self._indexed_values = defaultdict(lambda: None)
|
||||
self._indexed_to_objects = defaultdict(lambda: None)
|
||||
|
||||
self.extend(data)
|
||||
|
||||
def _map_element(self, __object: T, from_map: bool = False):
|
||||
if __object.id in self._contains_ids:
|
||||
return
|
||||
|
||||
self._contains_ids.add(__object.id)
|
||||
|
||||
for name, value in __object.indexing_values:
|
||||
if value is None:
|
||||
for name, value in (*__object.indexing_values, ('id', __object.id)):
|
||||
if value is None or value == __object._inner._default_values.get(name):
|
||||
continue
|
||||
|
||||
self._indexed_values[name].add(value)
|
||||
self._indexed_to_objects[value].append(__object)
|
||||
self._indexed_values[name] = value
|
||||
self._indexed_to_objects[value] = __object
|
||||
|
||||
if not from_map:
|
||||
for attribute, new_object in self.contain_given_in_attribute.items():
|
||||
__object.__getattribute__(attribute).contain_collection_inside(new_object)
|
||||
self._id_to_index_values[__object.id].add((name, value))
|
||||
|
||||
for attribute, new_object in self.contain_attribute_in_given.items():
|
||||
new_object.contain_collection_inside(__object.__getattribute__(attribute))
|
||||
def _unmap_element(self, __object: Union[T, int]):
|
||||
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
|
||||
|
||||
for attribute, new_object in self.append_object_to_attribute.items():
|
||||
__object.__getattribute__(attribute).append(new_object)
|
||||
if obj_id in self._contains_ids:
|
||||
self._contains_ids.remove(obj_id)
|
||||
|
||||
def _unmap_element(self, __object: T):
|
||||
if __object.id in self._contains_ids:
|
||||
self._contains_ids.remove(__object.id)
|
||||
for name, value in self._id_to_index_values[obj_id]:
|
||||
if name in self._indexed_values:
|
||||
del self._indexed_values[name]
|
||||
if value in self._indexed_to_objects:
|
||||
del self._indexed_to_objects[value]
|
||||
|
||||
for name, value in __object.indexing_values:
|
||||
if value is None:
|
||||
continue
|
||||
if value not in self._indexed_values[name]:
|
||||
continue
|
||||
|
||||
try:
|
||||
self._indexed_to_objects[value].remove(__object)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if not len(self._indexed_to_objects[value]):
|
||||
self._indexed_values[name].remove(value)
|
||||
del self._id_to_index_values[obj_id]
|
||||
|
||||
def _contained_in_self(self, __object: T) -> bool:
|
||||
if __object.id in self._contains_ids:
|
||||
@ -93,7 +77,7 @@ class Collection(Generic[T]):
|
||||
for name, value in __object.indexing_values:
|
||||
if value is None:
|
||||
continue
|
||||
if value in self._indexed_values[name]:
|
||||
if value == self._indexed_values[name]:
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -110,7 +94,7 @@ class Collection(Generic[T]):
|
||||
if self._contained_in_self(__object):
|
||||
return [self]
|
||||
|
||||
for collection in (*self.children, *self.parents):
|
||||
for collection in self.children:
|
||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
||||
|
||||
if break_at_first:
|
||||
@ -165,8 +149,8 @@ class Collection(Generic[T]):
|
||||
if value is None:
|
||||
continue
|
||||
|
||||
if value in self._indexed_values[name]:
|
||||
existing_object = self._indexed_to_objects[value][0]
|
||||
if value == self._indexed_values[name]:
|
||||
existing_object = self._indexed_to_objects[value]
|
||||
if existing_object.id == __object.id:
|
||||
return None
|
||||
|
||||
@ -186,19 +170,12 @@ class Collection(Generic[T]):
|
||||
def contains(self, __object: T) -> bool:
|
||||
return len(self._contained_in_sub(__object)) > 0
|
||||
|
||||
def _append(self, __object: T, from_map: bool = False):
|
||||
print(self, __object)
|
||||
self._map_element(__object, from_map=from_map)
|
||||
self._data.append(__object)
|
||||
|
||||
def _find_object_in_self(self, __object: T) -> Optional[T]:
|
||||
for name, value in __object.indexing_values:
|
||||
if value is None or value == __object._default_factories.get(name, lambda: None)():
|
||||
continue
|
||||
if value in self._indexed_values[name]:
|
||||
return self._indexed_to_objects[value][0]
|
||||
if value == self._indexed_values[name]:
|
||||
return self._indexed_to_objects[value]
|
||||
|
||||
def _find_object(self, __object: T) -> Tuple[Collection[T], Optional[T]]:
|
||||
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
|
||||
other_object = self._find_object_in_self(__object)
|
||||
if other_object is not None:
|
||||
return self, other_object
|
||||
@ -208,6 +185,21 @@ class Collection(Generic[T]):
|
||||
if other_object is not None:
|
||||
return o, other_object
|
||||
|
||||
if no_sibling:
|
||||
return self, None
|
||||
|
||||
"""
|
||||
# find in siblings and all children of siblings
|
||||
for parent in self.parents:
|
||||
for sibling in parent.children:
|
||||
if sibling is self:
|
||||
continue
|
||||
|
||||
o, other_object = sibling._find_object(__object, no_sibling=True)
|
||||
if other_object is not None:
|
||||
return o, other_object
|
||||
"""
|
||||
|
||||
return self, None
|
||||
|
||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||
@ -222,82 +214,64 @@ class Collection(Generic[T]):
|
||||
:return:
|
||||
"""
|
||||
|
||||
if __object is None or __object.id in self._contains_ids:
|
||||
if __object is None:
|
||||
return
|
||||
|
||||
append_to, existing_object = self._find_object(__object)
|
||||
|
||||
if existing_object is None:
|
||||
# append
|
||||
# print("appending", existing_object, __object)
|
||||
append_to._data.append(__object)
|
||||
append_to._map_element(__object)
|
||||
|
||||
# only modify collections if the object actually has been appended
|
||||
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
|
||||
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
|
||||
|
||||
for attribute, new_object in self.append_object_to_attribute.items():
|
||||
__object.__getattribute__(attribute).append(new_object)
|
||||
|
||||
for attribute, collection in self.sync_on_append.items():
|
||||
collection.extend(__object.__getattribute__(attribute))
|
||||
__object.__setattr__(attribute, collection)
|
||||
|
||||
else:
|
||||
# merge
|
||||
append_to._unmap_element(existing_object)
|
||||
# merge only if the two objects are not the same
|
||||
if existing_object.id == __object.id:
|
||||
return
|
||||
|
||||
old_id = existing_object.id
|
||||
|
||||
existing_object.merge(__object)
|
||||
|
||||
append_to._map_element(__object, from_map=from_map)
|
||||
if existing_object.id != old_id:
|
||||
append_to._unmap_element(old_id)
|
||||
|
||||
"""
|
||||
exists_in_collection = self._contained_in_sub(__object)
|
||||
if len(exists_in_collection) and self is exists_in_collection[0]:
|
||||
# assuming that the object already is contained in the correct collections
|
||||
if not already_is_parent:
|
||||
self.merge_into_self(__object, from_map=from_map)
|
||||
return
|
||||
append_to._map_element(existing_object)
|
||||
|
||||
if not len(exists_in_collection):
|
||||
self._append(__object, from_map=from_map)
|
||||
else:
|
||||
exists_in_collection[0].merge_into_self(__object, from_map=from_map)
|
||||
|
||||
if not already_is_parent or not self._is_root:
|
||||
for parent_collection in self._get_parents_of_multiple_contained_children(__object):
|
||||
pass
|
||||
parent_collection.append(__object, already_is_parent=True, from_map=from_map)
|
||||
"""
|
||||
|
||||
def extend(self, __iterable: Optional[Iterable[T]], from_map: bool = False):
|
||||
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
||||
if __iterable is None:
|
||||
return
|
||||
|
||||
for __object in __iterable:
|
||||
self.append(__object, from_map=from_map)
|
||||
self.append(__object)
|
||||
|
||||
def sync_with_other_collection(self, equal_collection: Collection):
|
||||
"""
|
||||
If two collections always need to have the same values, this can be used.
|
||||
|
||||
Internally:
|
||||
1. import the data from other to self
|
||||
- _data
|
||||
- contained_collections
|
||||
2. replace all refs from the other object, with refs from this object
|
||||
"""
|
||||
if equal_collection is self:
|
||||
return
|
||||
|
||||
# don't add the elements from the subelements from the other collection.
|
||||
# this will be done in the next step.
|
||||
self.extend(equal_collection._data)
|
||||
# add all submodules
|
||||
for equal_sub_collection in equal_collection.children:
|
||||
self.contain_collection_inside(equal_sub_collection)
|
||||
|
||||
def contain_collection_inside(self, sub_collection: Collection):
|
||||
def contain_collection_inside(self, sub_collection: Collection, _object: T):
|
||||
"""
|
||||
This collection will ALWAYS contain everything from the passed in collection
|
||||
"""
|
||||
if self is sub_collection or sub_collection in self.children:
|
||||
return
|
||||
|
||||
_object._inner._is_collection_child[self] = sub_collection
|
||||
_object._inner._is_collection_parent[sub_collection] = self
|
||||
|
||||
self.children.append(sub_collection)
|
||||
sub_collection.parents.append(self)
|
||||
|
||||
@property
|
||||
def data(self) -> List[T]:
|
||||
return [*self._data,
|
||||
*(__object for collection in self.children for __object in collection.shallow_list)]
|
||||
return list(self.__iter__())
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data) + sum(len(collection) for collection in self.children)
|
||||
@ -306,16 +280,20 @@ class Collection(Generic[T]):
|
||||
def empty(self) -> bool:
|
||||
return self.__len__() <= 0
|
||||
|
||||
def __iter__(self) -> Iterator[T]:
|
||||
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
|
||||
_finished_ids = finished_ids or set()
|
||||
|
||||
for element in self._data:
|
||||
if element.id in _finished_ids:
|
||||
continue
|
||||
_finished_ids.add(element.id)
|
||||
yield element
|
||||
|
||||
for c in self.children:
|
||||
for element in c:
|
||||
yield element
|
||||
yield from c.__iter__(finished_ids=finished_ids)
|
||||
|
||||
def __merge__(self, __other: Collection, override: bool = False):
|
||||
self.extend(__other._data, from_map=True)
|
||||
self.extend(__other)
|
||||
|
||||
def __getitem__(self, item: int):
|
||||
if item < len(self._data):
|
||||
|
@ -3,9 +3,11 @@ from __future__ import annotations
|
||||
import random
|
||||
from collections import defaultdict
|
||||
from functools import lru_cache
|
||||
|
||||
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
|
||||
|
||||
from pathlib import Path
|
||||
import inspect
|
||||
|
||||
from .metadata import Metadata
|
||||
from ..utils import get_unix_time, object_trace
|
||||
from ..utils.config import logging_settings, main_settings
|
||||
@ -29,12 +31,16 @@ class InnerData:
|
||||
_refers_to_instances: set = None
|
||||
|
||||
def __init__(self, object_type, **kwargs):
|
||||
self._refers_to_instances =set()
|
||||
self._refers_to_instances = set()
|
||||
|
||||
# collection : collection that is a collection of self
|
||||
self._is_collection_child: Dict[Collection, Collection] = {}
|
||||
self._is_collection_parent: Dict[Collection, Collection] = {}
|
||||
|
||||
# initialize the default values
|
||||
self.__default_values = {}
|
||||
self._default_values = {}
|
||||
for name, factory in object_type._default_factories.items():
|
||||
self.__default_values[name] = factory()
|
||||
self._default_values[name] = factory()
|
||||
|
||||
for key, value in kwargs.items():
|
||||
self.__setattr__(key, value)
|
||||
@ -48,7 +54,7 @@ class InnerData:
|
||||
|
||||
for key, value in __other.__dict__.copy().items():
|
||||
# just set the other value if self doesn't already have it
|
||||
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self.__default_values.get(key)):
|
||||
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
|
||||
self.__setattr__(key, value)
|
||||
continue
|
||||
|
||||
@ -158,15 +164,7 @@ class OuterProxy:
|
||||
self._add_other_db_objects(key, value)
|
||||
|
||||
def __hash__(self):
|
||||
"""
|
||||
:raise: IsDynamicException
|
||||
:return:
|
||||
"""
|
||||
|
||||
if self.dynamic:
|
||||
return id(self._inner)
|
||||
|
||||
return self.id
|
||||
return id(self)
|
||||
|
||||
def __eq__(self, other: Any):
|
||||
return self.__hash__() == other.__hash__()
|
||||
@ -183,23 +181,42 @@ class OuterProxy:
|
||||
if __other is None:
|
||||
return
|
||||
|
||||
object_trace(f"merging {type(self).__name__} [{self.title_string}] with {type(__other).__name__} [{__other.title_string}]")
|
||||
|
||||
a = self
|
||||
b = __other
|
||||
|
||||
if a._inner is b._inner:
|
||||
if a.id == b.id:
|
||||
return
|
||||
|
||||
|
||||
# switch instances if more efficient
|
||||
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
|
||||
a, b = b, a
|
||||
|
||||
a._inner.__merge__(b._inner, override=override)
|
||||
a._inner._refers_to_instances.update(b._inner._refers_to_instances)
|
||||
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}] called by [{' | '.join(f'{s.function} {Path(s.filename).name}:{str(s.lineno)}' for s in inspect.stack()[1:5])}]")
|
||||
|
||||
for collection, child_collection in b._inner._is_collection_child.items():
|
||||
try:
|
||||
collection.children.remove(child_collection)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
for collection, parent_collection in b._inner._is_collection_parent.items():
|
||||
try:
|
||||
collection.parents.remove(parent_collection)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
for instance in b._inner._refers_to_instances:
|
||||
old_inner = b._inner
|
||||
|
||||
for instance in b._inner._refers_to_instances.copy():
|
||||
instance._inner = a._inner
|
||||
a._inner._refers_to_instances.add(instance)
|
||||
|
||||
a._inner.__merge__(old_inner, override=override)
|
||||
del old_inner
|
||||
|
||||
def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
|
||||
self.merge(__other, override)
|
||||
|
||||
def mark_as_fetched(self, *url_hash_list: List[str]):
|
||||
for url_hash in url_hash_list:
|
||||
|
@ -49,6 +49,7 @@ class Song(Base):
|
||||
source_collection: SourceCollection
|
||||
target_collection: Collection[Target]
|
||||
lyrics_collection: Collection[Lyrics]
|
||||
|
||||
main_artist_collection: Collection[Artist]
|
||||
feature_artist_collection: Collection[Artist]
|
||||
album_collection: Collection[Album]
|
||||
@ -85,9 +86,15 @@ class Song(Base):
|
||||
TITEL = "title"
|
||||
|
||||
def __init_collections__(self) -> None:
|
||||
"""
|
||||
self.album_collection.contain_given_in_attribute = {
|
||||
"artist_collection": self.main_artist_collection,
|
||||
}
|
||||
"""
|
||||
self.album_collection.sync_on_append = {
|
||||
"artist_collection": self.main_artist_collection,
|
||||
}
|
||||
|
||||
self.album_collection.append_object_to_attribute = {
|
||||
"song_collection": self,
|
||||
}
|
||||
@ -241,12 +248,19 @@ class Album(Base):
|
||||
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
|
||||
|
||||
def __init_collections__(self):
|
||||
self.song_collection.contain_attribute_in_given = {
|
||||
"main_artist_collection": self.artist_collection
|
||||
}
|
||||
self.song_collection.append_object_to_attribute = {
|
||||
"album_collection": self
|
||||
}
|
||||
self.song_collection.sync_on_append = {
|
||||
"main_artist_collection": self.artist_collection
|
||||
}
|
||||
|
||||
self.artist_collection.append_object_to_attribute = {
|
||||
"main_album_collection": self
|
||||
}
|
||||
self.artist_collection.contain_given_in_attribute = {
|
||||
"label_collection": self.label_collection
|
||||
}
|
||||
|
||||
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
||||
if object_type is Song:
|
||||
@ -299,7 +313,7 @@ class Album(Base):
|
||||
@property
|
||||
def option_string(self) -> str:
|
||||
return f"{self.__repr__()} " \
|
||||
f"by Artist({OPTION_STRING_DELIMITER.join([artist.name for artist in self.artist_collection])}) " \
|
||||
f"by Artist({OPTION_STRING_DELIMITER.join([artist.name + str(artist.id) for artist in self.artist_collection])}) " \
|
||||
f"under Label({OPTION_STRING_DELIMITER.join([label.name for label in self.label_collection])})"
|
||||
|
||||
@property
|
||||
@ -642,6 +656,15 @@ class Label(Base):
|
||||
contact_list=contact_list, album_list=album_list, current_artist_list=current_artist_list,
|
||||
**kwargs)
|
||||
|
||||
def __init_collections__(self):
|
||||
self.album_collection.append_object_to_attribute = {
|
||||
"label_collection": self
|
||||
}
|
||||
|
||||
self.current_artist_collection.append_object_to_attribute = {
|
||||
"label_collection": self
|
||||
}
|
||||
|
||||
@property
|
||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||
return [
|
||||
|
@ -104,14 +104,7 @@ class Source(OuterProxy):
|
||||
('url', self.url),
|
||||
('audio_url', self.audio_url),
|
||||
]
|
||||
|
||||
def __merge__(self, __other: Source, override: bool = False):
|
||||
if override:
|
||||
self.audio_url = __other.audio_url
|
||||
|
||||
if self.audio_url is None or (override and __other.audio_url is not None):
|
||||
self.audio_url = __other.audio_url
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import List, Optional, Type, Union
|
||||
from typing import List, Optional, Type, Union, Generator
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pycountry
|
||||
@ -1056,7 +1056,7 @@ class Musify(Page):
|
||||
date=date
|
||||
)
|
||||
|
||||
def _get_discography(self, url: MusifyUrl, artist_name: str = None, stop_at_level: int = 1) -> List[Album]:
|
||||
def _get_discography(self, url: MusifyUrl, artist_name: str = None, stop_at_level: int = 1) -> Generator[Album, None, None]:
|
||||
"""
|
||||
POST https://musify.club/artist/filteralbums
|
||||
ArtistID: 280348
|
||||
@ -1077,18 +1077,8 @@ class Musify(Page):
|
||||
return []
|
||||
soup: BeautifulSoup = BeautifulSoup(r.content, features="html.parser")
|
||||
|
||||
discography: List[Album] = []
|
||||
for card_soup in soup.find_all("div", {"class": "card"}):
|
||||
new_album: Album = self._parse_album_card(card_soup, artist_name)
|
||||
album_source: Source
|
||||
|
||||
if stop_at_level > 1:
|
||||
for album_source in new_album.source_collection.get_sources_from_page(self.SOURCE_TYPE):
|
||||
new_album.merge(self.fetch_album(album_source, stop_at_level=stop_at_level-1))
|
||||
|
||||
discography.append(new_album)
|
||||
|
||||
return discography
|
||||
yield self._parse_album_card(card_soup, artist_name)
|
||||
|
||||
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
||||
"""
|
||||
@ -1110,8 +1100,7 @@ class Musify(Page):
|
||||
|
||||
artist = self._get_artist_attributes(url)
|
||||
|
||||
discography: List[Album] = self._get_discography(url, artist.name)
|
||||
artist.main_album_collection.extend(discography)
|
||||
artist.main_album_collection.extend(self._get_discography(url, artist.name))
|
||||
|
||||
return artist
|
||||
|
||||
|
@ -13,7 +13,7 @@ if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
|
||||
__stage__ = os.getenv("STAGE", "prod")
|
||||
|
||||
DEBUG = (__stage__ == "dev") and True
|
||||
DEBUG_LOGGING = DEBUG and False
|
||||
DEBUG_LOGGING = DEBUG and True
|
||||
DEBUG_TRACE = DEBUG and True
|
||||
DEBUG_OBJECT_TRACE = DEBUG and False
|
||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||
|
Loading…
Reference in New Issue
Block a user