Compare commits
46 Commits
b15d0839ef
...
fix/bandca
Author | SHA1 | Date | |
---|---|---|---|
0179246ec0 | |||
3d432cd0d7 | |||
0080a48e70 | |||
ea5adfbe8a | |||
fa723d7747 | |||
312e57d82e | |||
a998e52cd9 | |||
b4c73d56a7 | |||
1735ff4e1d | |||
be09562632 | |||
29770825a4 | |||
81708ba100 | |||
301ff82bcf | |||
06ffae06a6 | |||
919a99885c | |||
e20b14a9df | |||
b933c6ac14 | |||
3c5bbc19af | |||
06acf22abb | |||
1e62d371cd | |||
24a90f1cdf | |||
d9c711a2f8 | |||
d374ca324d | |||
81eb43c8ef | |||
ba94e38a2d | |||
3532fea36c | |||
3cd9daf512 | |||
662f207529 | |||
85923e2a79 | |||
f000ad4484 | |||
7e4ba0b1a0 | |||
56101d4a31 | |||
cde3b3dbbb | |||
e47e22428d | |||
64c95aabfa | |||
329aa39271 | |||
4367c2274d | |||
5e04c480bd | |||
eab19304e1 | |||
d0fe0c3f86 | |||
9dbe34de88 | |||
9783e3f24e | |||
eb94a328c7 | |||
2f4a9f9801 | |||
92a03355c3 | |||
2f29ad2415 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -18,3 +18,6 @@ venv
|
|||||||
windows
|
windows
|
||||||
|
|
||||||
.env
|
.env
|
||||||
|
|
||||||
|
# setuptools_scm
|
||||||
|
_version.py
|
||||||
|
57
.woodpecker.yml
Normal file
57
.woodpecker.yml
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
labels:
|
||||||
|
platform: linux/amd64
|
||||||
|
|
||||||
|
clone:
|
||||||
|
git:
|
||||||
|
image: woodpeckerci/plugin-git
|
||||||
|
settings:
|
||||||
|
tags: true
|
||||||
|
|
||||||
|
steps:
|
||||||
|
build-stable:
|
||||||
|
image: python
|
||||||
|
commands:
|
||||||
|
- sed -i 's/name = "music-kraken"/name = "music-kraken-stable"/' pyproject.toml
|
||||||
|
- python -m pip install -r requirements-dev.txt
|
||||||
|
- python3 -m build
|
||||||
|
environment:
|
||||||
|
- SETUPTOOLS_SCM_PRETEND_VERSION=${CI_COMMIT_TAG}
|
||||||
|
when:
|
||||||
|
- event: tag
|
||||||
|
|
||||||
|
build-dev:
|
||||||
|
image: python
|
||||||
|
commands:
|
||||||
|
- python -m pip install -r requirements-dev.txt
|
||||||
|
- python3 -m build
|
||||||
|
when:
|
||||||
|
- event: manual
|
||||||
|
- event: push
|
||||||
|
branch: experimental
|
||||||
|
|
||||||
|
publish-gitea:
|
||||||
|
image: gitea.elara.ws/music-kraken/plugin-twine
|
||||||
|
settings:
|
||||||
|
repository_url: "https://gitea.elara.ws/api/packages/music-kraken/pypi"
|
||||||
|
username:
|
||||||
|
from_secret: gitea_username
|
||||||
|
password:
|
||||||
|
from_secret: gitea_password
|
||||||
|
when:
|
||||||
|
- event: manual
|
||||||
|
- event: tag
|
||||||
|
- event: push
|
||||||
|
branch: experimental
|
||||||
|
|
||||||
|
publish-pypi:
|
||||||
|
image: gitea.elara.ws/music-kraken/plugin-twine
|
||||||
|
settings:
|
||||||
|
username:
|
||||||
|
from_secret: pypi_username
|
||||||
|
password:
|
||||||
|
from_secret: pypi_password
|
||||||
|
when:
|
||||||
|
- event: manual
|
||||||
|
- event: tag
|
||||||
|
- event: push
|
||||||
|
branch: experimental
|
@@ -1,5 +1,7 @@
|
|||||||
# Music Kraken
|
# Music Kraken
|
||||||
|
|
||||||
|
[](https://ci.elara.ws/repos/59)
|
||||||
|
|
||||||
<img src="assets/logo.svg" width=300 alt="music kraken logo"/>
|
<img src="assets/logo.svg" width=300 alt="music kraken logo"/>
|
||||||
|
|
||||||
- [Music Kraken](#music-kraken)
|
- [Music Kraken](#music-kraken)
|
||||||
|
@@ -7,7 +7,8 @@ logging.getLogger().setLevel(logging.DEBUG)
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
commands = [
|
commands = [
|
||||||
"s: #a Ghost Bath",
|
"s: #a Ghost Bath",
|
||||||
"4",
|
"0",
|
||||||
|
"d: 1",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@@ -2,91 +2,30 @@ import music_kraken
|
|||||||
from music_kraken.objects import Song, Album, Artist, Collection
|
from music_kraken.objects import Song, Album, Artist, Collection
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
artist: Artist = Artist(
|
album_1 = Album(
|
||||||
name="artist",
|
title="album",
|
||||||
main_album_list=[
|
song_list=[
|
||||||
Album(
|
Song(title="song", main_artist_list=[Artist(name="artist")]),
|
||||||
title="album",
|
],
|
||||||
song_list=[
|
artist_list=[
|
||||||
Song(
|
Artist(name="artist 3"),
|
||||||
title="song",
|
|
||||||
album_list=[
|
|
||||||
Album(
|
|
||||||
title="album",
|
|
||||||
albumsort=123,
|
|
||||||
main_artist=Artist(name="artist"),
|
|
||||||
),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
Song(
|
|
||||||
title="other_song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=423),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
),
|
|
||||||
Album(title="album", barcode="1234567890123"),
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
album_2 = Album(
|
||||||
other_artist: Artist = Artist(
|
title="album",
|
||||||
name="artist",
|
song_list=[
|
||||||
main_album_list=[
|
Song(title="song", main_artist_list=[Artist(name="artist 2")]),
|
||||||
Album(
|
],
|
||||||
title="album",
|
artist_list=[
|
||||||
song_list=[
|
Artist(name="artist"),
|
||||||
Song(
|
|
||||||
title="song",
|
|
||||||
album_list=[
|
|
||||||
Album(
|
|
||||||
title="album",
|
|
||||||
albumsort=123,
|
|
||||||
main_artist=Artist(name="other_artist"),
|
|
||||||
),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
Song(
|
|
||||||
title="other_song",
|
|
||||||
album_list=[
|
|
||||||
Album(title="album", albumsort=423),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
),
|
|
||||||
Album(title="album", barcode="1234567890123"),
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
artist.merge(other_artist)
|
album_1.merge(album_2)
|
||||||
|
|
||||||
a = artist.main_album_collection[0]
|
|
||||||
b = a.song_collection[0].album_collection[0]
|
|
||||||
c = a.song_collection[1].album_collection[0]
|
|
||||||
d = b.song_collection[0].album_collection[0]
|
|
||||||
e = d.song_collection[0].album_collection[0]
|
|
||||||
f = e.song_collection[0].album_collection[0]
|
|
||||||
g = f.song_collection[0].album_collection[0]
|
|
||||||
|
|
||||||
print(a.id, a.title, a.barcode, a.albumsort)
|
|
||||||
print(b.id, b.title, b.barcode, b.albumsort)
|
|
||||||
print(c.id, c.title, c.barcode, c.albumsort)
|
|
||||||
print(d.id, d.title, d.barcode, d.albumsort)
|
|
||||||
print(e.id, e.title, e.barcode, e.albumsort)
|
|
||||||
print(f.id, f.title, f.barcode, f.albumsort)
|
|
||||||
print(g.id, g.title, g.barcode, g.albumsort)
|
|
||||||
print()
|
print()
|
||||||
|
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
|
||||||
|
|
||||||
d.title = "new_title"
|
print(id(album_1.artist_collection), id(album_2.artist_collection))
|
||||||
|
print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
|
||||||
print(a.id, a.title, a.barcode, a.albumsort)
|
|
||||||
print(b.id, b.title, b.barcode, b.albumsort)
|
|
||||||
print(c.id, c.title, c.barcode, c.albumsort)
|
|
||||||
print(d.id, d.title, d.barcode, d.albumsort)
|
|
||||||
print(e.id, e.title, e.barcode, e.albumsort)
|
|
||||||
print(f.id, f.title, f.barcode, f.albumsort)
|
|
||||||
print(g.id, g.title, g.barcode, g.albumsort)
|
|
||||||
print()
|
|
||||||
|
|
||||||
print(artist.main_album_collection._indexed_values)
|
|
@@ -9,8 +9,6 @@ from rich.console import Console
|
|||||||
from .utils.shared import DEBUG, DEBUG_LOGGING
|
from .utils.shared import DEBUG, DEBUG_LOGGING
|
||||||
from .utils.config import logging_settings, main_settings, read_config
|
from .utils.config import logging_settings, main_settings, read_config
|
||||||
|
|
||||||
__version__ = "1.15.0"
|
|
||||||
|
|
||||||
read_config()
|
read_config()
|
||||||
|
|
||||||
console: Console = Console()
|
console: Console = Console()
|
||||||
|
@@ -14,7 +14,7 @@ from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic,
|
|||||||
|
|
||||||
|
|
||||||
ALL_PAGES: Set[Type[Page]] = {
|
ALL_PAGES: Set[Type[Page]] = {
|
||||||
EncyclopaediaMetallum,
|
# EncyclopaediaMetallum,
|
||||||
Musify,
|
Musify,
|
||||||
YoutubeMusic,
|
YoutubeMusic,
|
||||||
Bandcamp
|
Bandcamp
|
||||||
|
@@ -1,8 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator
|
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
|
||||||
from .parents import OuterProxy
|
from .parents import OuterProxy
|
||||||
|
from ..utils import object_trace
|
||||||
|
|
||||||
T = TypeVar('T', bound=OuterProxy)
|
T = TypeVar('T', bound=OuterProxy)
|
||||||
|
|
||||||
@@ -21,187 +22,62 @@ class Collection(Generic[T]):
|
|||||||
self,
|
self,
|
||||||
data: Optional[Iterable[T]] = None,
|
data: Optional[Iterable[T]] = None,
|
||||||
sync_on_append: Dict[str, Collection] = None,
|
sync_on_append: Dict[str, Collection] = None,
|
||||||
contain_given_in_attribute: Dict[str, Collection] = None,
|
append_object_to_attribute: Dict[str, T] = None,
|
||||||
contain_attribute_in_given: Dict[str, Collection] = None,
|
extend_object_to_attribute: Dict[str, Collection] = None,
|
||||||
append_object_to_attribute: Dict[str, T] = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
self._collection_for: dict = dict()
|
||||||
|
|
||||||
self._contains_ids = set()
|
self._contains_ids = set()
|
||||||
self._data = []
|
self._data = []
|
||||||
|
|
||||||
self.parents: List[Collection[T]] = []
|
|
||||||
self.children: List[Collection[T]] = []
|
|
||||||
|
|
||||||
# List of collection attributes that should be modified on append
|
# List of collection attributes that should be modified on append
|
||||||
# Key: collection attribute (str) of appended element
|
# Key: collection attribute (str) of appended element
|
||||||
# Value: main collection to sync to
|
# Value: main collection to sync to
|
||||||
self.contain_given_in_attribute: Dict[str, Collection] = contain_given_in_attribute or {}
|
|
||||||
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
|
||||||
|
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
|
||||||
|
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
|
||||||
|
|
||||||
self._indexed_values = defaultdict(set)
|
self._id_to_index_values: Dict[int, set] = defaultdict(set)
|
||||||
self._indexed_to_objects = defaultdict(list)
|
|
||||||
|
# This is to cleanly unmap previously mapped items by their id
|
||||||
|
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
|
||||||
|
# this is to keep track and look up the actual objects
|
||||||
|
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
|
||||||
|
|
||||||
self.extend(data)
|
self.extend(data)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return f"Collection({id(self)})"
|
||||||
|
|
||||||
def _map_element(self, __object: T, from_map: bool = False):
|
def _map_element(self, __object: T, from_map: bool = False):
|
||||||
__object._inner._mapped_in_collection.add(self)
|
self._unmap_element(__object.id)
|
||||||
self._contains_ids.add(__object.id)
|
|
||||||
|
self._indexed_from_id[__object.id]["id"] = __object.id
|
||||||
|
self._indexed_values["id"][__object.id] = __object
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
for name, value in __object.indexing_values:
|
||||||
if value is None or value == __object._inner._default_values.get(name):
|
if value is None or value == __object._inner._default_values.get(name):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._indexed_values[name].add(value)
|
self._indexed_values[name][value] = __object
|
||||||
self._indexed_to_objects[value].append(__object)
|
self._indexed_from_id[__object.id][name] = value
|
||||||
|
|
||||||
def _unmap_element(self, __object: T):
|
def _unmap_element(self, __object: Union[T, int]):
|
||||||
if __object.id in self._contains_ids:
|
obj_id = __object.id if isinstance(__object, OuterProxy) else __object
|
||||||
self._contains_ids.remove(__object.id)
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
if obj_id not in self._indexed_from_id:
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value not in self._indexed_values[name]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._indexed_to_objects[value].remove(__object)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not len(self._indexed_to_objects[value]):
|
|
||||||
self._indexed_values[name].remove(value)
|
|
||||||
|
|
||||||
def _contained_in_self(self, __object: T) -> bool:
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return True
|
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
if value in self._indexed_values[name]:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _contained_in_sub(self, __object: T, break_at_first: bool = True) -> List[Collection]:
|
|
||||||
"""
|
|
||||||
Gets the collection this object is found in, if it is found in any.
|
|
||||||
|
|
||||||
:param __object:
|
|
||||||
:param break_at_first:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
results = []
|
|
||||||
|
|
||||||
if self._contained_in_self(__object):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
for collection in self.children:
|
|
||||||
results.extend(collection._contained_in_sub(__object, break_at_first=break_at_first))
|
|
||||||
|
|
||||||
if break_at_first:
|
|
||||||
return results
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _get_root_collections(self) -> List[Collection]:
|
|
||||||
if not len(self.parents):
|
|
||||||
return [self]
|
|
||||||
|
|
||||||
root_collections = []
|
|
||||||
for upper_collection in self.parents:
|
|
||||||
root_collections.extend(upper_collection._get_root_collections())
|
|
||||||
return root_collections
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _is_root(self) -> bool:
|
|
||||||
return len(self.parents) <= 0
|
|
||||||
|
|
||||||
def _get_parents_of_multiple_contained_children(self, __object: T):
|
|
||||||
results = []
|
|
||||||
if len(self.children) < 2 or self._contained_in_self(__object):
|
|
||||||
return results
|
|
||||||
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
for collection in self.children:
|
|
||||||
sub_results = collection._get_parents_of_multiple_contained_children(__object)
|
|
||||||
|
|
||||||
if len(sub_results) > 0:
|
|
||||||
count += 1
|
|
||||||
results.extend(sub_results)
|
|
||||||
|
|
||||||
if count >= 2:
|
|
||||||
results.append(self)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def merge_into_self(self, __object: T, from_map: bool = False):
|
|
||||||
"""
|
|
||||||
1. find existing objects
|
|
||||||
2. merge into existing object
|
|
||||||
3. remap existing object
|
|
||||||
"""
|
|
||||||
if __object.id in self._contains_ids:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_object: T = None
|
for name, value in self._indexed_from_id[obj_id].items():
|
||||||
|
|
||||||
for name, value in __object.indexing_values:
|
|
||||||
if value is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if value in self._indexed_values[name]:
|
if value in self._indexed_values[name]:
|
||||||
existing_object = self._indexed_to_objects[value][0]
|
del self._indexed_values[name][value]
|
||||||
if existing_object.id == __object.id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
break
|
del self._indexed_from_id[obj_id]
|
||||||
|
|
||||||
if existing_object is None:
|
def _find_object(self, __object: T) -> Optional[T]:
|
||||||
return None
|
|
||||||
|
|
||||||
existing_object.merge(__object)
|
|
||||||
|
|
||||||
# just a check if it really worked
|
|
||||||
if existing_object.id != __object.id:
|
|
||||||
raise ValueError("This should NEVER happen. Merging doesn't work.")
|
|
||||||
|
|
||||||
self._map_element(existing_object, from_map=from_map)
|
|
||||||
|
|
||||||
def contains(self, __object: T) -> bool:
|
|
||||||
return len(self._contained_in_sub(__object)) > 0
|
|
||||||
|
|
||||||
def _find_object_in_self(self, __object: T) -> Optional[T]:
|
|
||||||
for name, value in __object.indexing_values:
|
for name, value in __object.indexing_values:
|
||||||
if value in self._indexed_values[name]:
|
if value in self._indexed_values[name]:
|
||||||
return self._indexed_to_objects[value][0]
|
return self._indexed_values[name][value]
|
||||||
|
|
||||||
def _find_object(self, __object: T, no_sibling: bool = False) -> Tuple[Collection[T], Optional[T]]:
|
|
||||||
other_object = self._find_object_in_self(__object)
|
|
||||||
if other_object is not None:
|
|
||||||
return self, other_object
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
o, other_object = c._find_object(__object)
|
|
||||||
if other_object is not None:
|
|
||||||
return o, other_object
|
|
||||||
|
|
||||||
if no_sibling:
|
|
||||||
return self, None
|
|
||||||
|
|
||||||
"""
|
|
||||||
# find in siblings and all children of siblings
|
|
||||||
for parent in self.parents:
|
|
||||||
for sibling in parent.children:
|
|
||||||
if sibling is self:
|
|
||||||
continue
|
|
||||||
|
|
||||||
o, other_object = sibling._find_object(__object, no_sibling=True)
|
|
||||||
if other_object is not None:
|
|
||||||
return o, other_object
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self, None
|
|
||||||
|
|
||||||
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
|
||||||
"""
|
"""
|
||||||
@@ -218,28 +94,46 @@ class Collection(Generic[T]):
|
|||||||
if __object is None:
|
if __object is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
append_to, existing_object = self._find_object(__object)
|
existing_object = self._find_object(__object)
|
||||||
|
|
||||||
if existing_object is None:
|
if existing_object is None:
|
||||||
# append
|
# append
|
||||||
append_to._data.append(__object)
|
self._data.append(__object)
|
||||||
append_to._map_element(__object)
|
self._map_element(__object)
|
||||||
|
|
||||||
# only modify collections if the object actually has been appended
|
for collection_attribute, child_collection in self.extend_object_to_attribute.items():
|
||||||
for collection_attribute, child_collection in self.contain_given_in_attribute.items():
|
__object.__getattribute__(collection_attribute).extend(child_collection)
|
||||||
__object.__getattribute__(collection_attribute).contain_collection_inside(child_collection, __object)
|
|
||||||
|
|
||||||
for attribute, new_object in self.append_object_to_attribute.items():
|
for attribute, new_object in self.append_object_to_attribute.items():
|
||||||
__object.__getattribute__(attribute).append(new_object)
|
__object.__getattribute__(attribute).append(new_object)
|
||||||
|
|
||||||
|
# only modify collections if the object actually has been appended
|
||||||
|
for attribute, a in self.sync_on_append.items():
|
||||||
|
b = __object.__getattribute__(attribute)
|
||||||
|
object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]")
|
||||||
|
|
||||||
|
data_to_extend = b.data
|
||||||
|
|
||||||
|
a._collection_for.update(b._collection_for)
|
||||||
|
for synced_with, key in b._collection_for.items():
|
||||||
|
synced_with.__setattr__(key, a)
|
||||||
|
|
||||||
|
a.extend(data_to_extend)
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# merge only if the two objects are not the same
|
# merge only if the two objects are not the same
|
||||||
if existing_object.id == __object.id:
|
if existing_object.id == __object.id:
|
||||||
return
|
return
|
||||||
|
|
||||||
append_to._unmap_element(existing_object)
|
old_id = existing_object.id
|
||||||
existing_object.merge(__object)
|
|
||||||
append_to._map_element(existing_object)
|
|
||||||
|
|
||||||
|
existing_object.merge(__object)
|
||||||
|
|
||||||
|
if existing_object.id != old_id:
|
||||||
|
self._unmap_element(old_id)
|
||||||
|
|
||||||
|
self._map_element(existing_object)
|
||||||
|
|
||||||
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
def extend(self, __iterable: Optional[Generator[T, None, None]]):
|
||||||
if __iterable is None:
|
if __iterable is None:
|
||||||
@@ -248,54 +142,22 @@ class Collection(Generic[T]):
|
|||||||
for __object in __iterable:
|
for __object in __iterable:
|
||||||
self.append(__object)
|
self.append(__object)
|
||||||
|
|
||||||
def contain_collection_inside(self, sub_collection: Collection, _object: T):
|
|
||||||
"""
|
|
||||||
This collection will ALWAYS contain everything from the passed in collection
|
|
||||||
"""
|
|
||||||
if self is sub_collection or sub_collection in self.children:
|
|
||||||
return
|
|
||||||
|
|
||||||
_object._inner._is_collection_child[self] = sub_collection
|
|
||||||
_object._inner._is_collection_parent[sub_collection] = self
|
|
||||||
|
|
||||||
self.children.append(sub_collection)
|
|
||||||
sub_collection.parents.append(self)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def data(self) -> List[T]:
|
def data(self) -> List[T]:
|
||||||
return list(self.__iter__())
|
return list(self.__iter__())
|
||||||
|
|
||||||
def __len__(self) -> int:
|
def __len__(self) -> int:
|
||||||
return len(self._data) + sum(len(collection) for collection in self.children)
|
return len(self._data)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def empty(self) -> bool:
|
def empty(self) -> bool:
|
||||||
return self.__len__() <= 0
|
return self.__len__() <= 0
|
||||||
|
|
||||||
def __iter__(self, finished_ids: set = None) -> Iterator[T]:
|
def __iter__(self) -> Iterator[T]:
|
||||||
_finished_ids = finished_ids or set()
|
yield from self._data
|
||||||
|
|
||||||
for element in self._data:
|
|
||||||
if element.id in _finished_ids:
|
|
||||||
continue
|
|
||||||
_finished_ids.add(element.id)
|
|
||||||
yield element
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
yield from c.__iter__(finished_ids=finished_ids)
|
|
||||||
|
|
||||||
def __merge__(self, __other: Collection, override: bool = False):
|
def __merge__(self, __other: Collection, override: bool = False):
|
||||||
self.extend(__other.__iter__())
|
self.extend(__other)
|
||||||
|
|
||||||
def __getitem__(self, item: int):
|
def __getitem__(self, item: int):
|
||||||
if item < len(self._data):
|
return self._data[item]
|
||||||
return self._data[item]
|
|
||||||
|
|
||||||
item = item - len(self._data)
|
|
||||||
|
|
||||||
for c in self.children:
|
|
||||||
if item < len(c):
|
|
||||||
return c.__getitem__(item)
|
|
||||||
item = item - len(c._data)
|
|
||||||
|
|
||||||
raise IndexError
|
|
||||||
|
@@ -1,5 +1,10 @@
|
|||||||
import mistune
|
import mistune
|
||||||
import html2markdown
|
from markdownify import markdownify as md
|
||||||
|
|
||||||
|
|
||||||
|
def plain_to_markdown(plain: str) -> str:
|
||||||
|
return plain.replace("\n", " \n")
|
||||||
|
|
||||||
|
|
||||||
class FormattedText:
|
class FormattedText:
|
||||||
html = ""
|
html = ""
|
||||||
@@ -7,12 +12,15 @@ class FormattedText:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
markdown: str = None,
|
markdown: str = None,
|
||||||
html: str = None
|
html: str = None,
|
||||||
|
plain: str = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
if html is not None:
|
if html is not None:
|
||||||
self.html = html
|
self.html = html
|
||||||
elif markdown is not None:
|
elif markdown is not None:
|
||||||
self.html = mistune.markdown(markdown)
|
self.html = mistune.markdown(markdown)
|
||||||
|
elif plain is not None:
|
||||||
|
self.html = mistune.markdown(plain_to_markdown(plain))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_empty(self) -> bool:
|
def is_empty(self) -> bool:
|
||||||
@@ -28,7 +36,7 @@ class FormattedText:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def markdown(self) -> str:
|
def markdown(self) -> str:
|
||||||
return html2markdown.convert(self.html)
|
return md(self.html).strip()
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return self.markdown
|
return self.markdown
|
||||||
|
@@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
|
|||||||
@property
|
@property
|
||||||
def metadata(self) -> Metadata:
|
def metadata(self) -> Metadata:
|
||||||
return Metadata({
|
return Metadata({
|
||||||
id3Mapping.UNSYNCED_LYRICS: [self.text.html]
|
id3Mapping.UNSYNCED_LYRICS: [self.text.markdown]
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@@ -3,9 +3,11 @@ from __future__ import annotations
|
|||||||
import random
|
import random
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
|
from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
import inspect
|
||||||
|
|
||||||
from .metadata import Metadata
|
from .metadata import Metadata
|
||||||
from ..utils import get_unix_time, object_trace
|
from ..utils import get_unix_time, object_trace
|
||||||
from ..utils.config import logging_settings, main_settings
|
from ..utils.config import logging_settings, main_settings
|
||||||
@@ -30,10 +32,7 @@ class InnerData:
|
|||||||
|
|
||||||
def __init__(self, object_type, **kwargs):
|
def __init__(self, object_type, **kwargs):
|
||||||
self._refers_to_instances = set()
|
self._refers_to_instances = set()
|
||||||
|
self._fetched_from: dict = {}
|
||||||
# collection : collection that is a collection of self
|
|
||||||
self._is_collection_child: Dict[Collection, Collection] = {}
|
|
||||||
self._is_collection_parent: Dict[Collection, Collection] = {}
|
|
||||||
|
|
||||||
# initialize the default values
|
# initialize the default values
|
||||||
self._default_values = {}
|
self._default_values = {}
|
||||||
@@ -41,8 +40,13 @@ class InnerData:
|
|||||||
self._default_values[name] = factory()
|
self._default_values[name] = factory()
|
||||||
|
|
||||||
for key, value in kwargs.items():
|
for key, value in kwargs.items():
|
||||||
|
if hasattr(value, "__is_collection__"):
|
||||||
|
value._collection_for[self] = key
|
||||||
self.__setattr__(key, value)
|
self.__setattr__(key, value)
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return self.id
|
||||||
|
|
||||||
def __merge__(self, __other: InnerData, override: bool = False):
|
def __merge__(self, __other: InnerData, override: bool = False):
|
||||||
"""
|
"""
|
||||||
:param __other:
|
:param __other:
|
||||||
@@ -50,6 +54,8 @@ class InnerData:
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self._fetched_from.update(__other._fetched_from)
|
||||||
|
|
||||||
for key, value in __other.__dict__.copy().items():
|
for key, value in __other.__dict__.copy().items():
|
||||||
# just set the other value if self doesn't already have it
|
# just set the other value if self doesn't already have it
|
||||||
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
|
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
|
||||||
@@ -83,7 +89,7 @@ class OuterProxy:
|
|||||||
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
|
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
|
||||||
_automatic_id: bool = False
|
_automatic_id: bool = False
|
||||||
|
|
||||||
if _id is None and not dynamic:
|
if _id is None:
|
||||||
"""
|
"""
|
||||||
generates a random integer id
|
generates a random integer id
|
||||||
the range is defined in the config
|
the range is defined in the config
|
||||||
@@ -107,11 +113,11 @@ class OuterProxy:
|
|||||||
|
|
||||||
del kwargs[name]
|
del kwargs[name]
|
||||||
|
|
||||||
self._fetched_from: dict = {}
|
|
||||||
self._inner: InnerData = InnerData(type(self), **kwargs)
|
self._inner: InnerData = InnerData(type(self), **kwargs)
|
||||||
self._inner._refers_to_instances.add(self)
|
self._inner._refers_to_instances.add(self)
|
||||||
|
|
||||||
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
|
object_trace(f"creating {type(self).__name__} [{self.title_string}]")
|
||||||
|
|
||||||
self.__init_collections__()
|
self.__init_collections__()
|
||||||
|
|
||||||
for name, data_list in collection_data.items():
|
for name, data_list in collection_data.items():
|
||||||
@@ -162,15 +168,7 @@ class OuterProxy:
|
|||||||
self._add_other_db_objects(key, value)
|
self._add_other_db_objects(key, value)
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
"""
|
return id(self)
|
||||||
:raise: IsDynamicException
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
|
|
||||||
if self.dynamic:
|
|
||||||
return id(self._inner)
|
|
||||||
|
|
||||||
return self.id
|
|
||||||
|
|
||||||
def __eq__(self, other: Any):
|
def __eq__(self, other: Any):
|
||||||
return self.__hash__() == other.__hash__()
|
return self.__hash__() == other.__hash__()
|
||||||
@@ -187,42 +185,40 @@ class OuterProxy:
|
|||||||
if __other is None:
|
if __other is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
object_trace(f"merging {type(self).__name__} [{self.title_string} | {self.id}] with {type(__other).__name__} [{__other.title_string} | {__other.id}]")
|
|
||||||
|
|
||||||
a = self
|
a = self
|
||||||
b = __other
|
b = __other
|
||||||
|
|
||||||
if a._inner is b._inner:
|
if a.id == b.id:
|
||||||
return
|
return
|
||||||
|
|
||||||
# switch instances if more efficient
|
# switch instances if more efficient
|
||||||
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
|
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
|
||||||
a, b = b, a
|
a, b = b, a
|
||||||
|
|
||||||
a._inner.__merge__(b._inner, override=override)
|
object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
|
||||||
for collection, child_collection in b._inner._is_collection_child.items():
|
|
||||||
collection.children.remove(child_collection)
|
|
||||||
|
|
||||||
for collection, parent_collection in b._inner._is_collection_parent.items():
|
old_inner = b._inner
|
||||||
collection.parents.remove(parent_collection)
|
|
||||||
|
|
||||||
a._inner._refers_to_instances.update(b._inner._refers_to_instances)
|
for instance in b._inner._refers_to_instances.copy():
|
||||||
|
|
||||||
for instance in b._inner._refers_to_instances:
|
|
||||||
instance._inner = a._inner
|
instance._inner = a._inner
|
||||||
|
a._inner._refers_to_instances.add(instance)
|
||||||
|
|
||||||
|
a._inner.__merge__(old_inner, override=override)
|
||||||
|
del old_inner
|
||||||
|
|
||||||
def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
|
def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
|
||||||
self.merge(__other, override)
|
self.merge(__other, override)
|
||||||
|
|
||||||
def mark_as_fetched(self, *url_hash_list: List[str]):
|
def mark_as_fetched(self, *url_hash_list: List[str]):
|
||||||
for url_hash in url_hash_list:
|
for url_hash in url_hash_list:
|
||||||
self._fetched_from[url_hash] = {
|
self._inner._fetched_from[url_hash] = {
|
||||||
"time": get_unix_time(),
|
"time": get_unix_time(),
|
||||||
"url": url_hash,
|
"url": url_hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
def already_fetched_from(self, url_hash: str) -> bool:
|
def already_fetched_from(self, url_hash: str) -> bool:
|
||||||
res = self._fetched_from.get(url_hash, None)
|
res = self._inner._fetched_from.get(url_hash, None)
|
||||||
|
|
||||||
if res is None:
|
if res is None:
|
||||||
return False
|
return False
|
||||||
|
@@ -86,14 +86,14 @@ class Song(Base):
|
|||||||
TITEL = "title"
|
TITEL = "title"
|
||||||
|
|
||||||
def __init_collections__(self) -> None:
|
def __init_collections__(self) -> None:
|
||||||
self.album_collection.contain_given_in_attribute = {
|
self.album_collection.sync_on_append = {
|
||||||
"artist_collection": self.main_artist_collection,
|
"artist_collection": self.main_artist_collection,
|
||||||
}
|
}
|
||||||
|
|
||||||
self.album_collection.append_object_to_attribute = {
|
self.album_collection.append_object_to_attribute = {
|
||||||
"song_collection": self,
|
"song_collection": self,
|
||||||
}
|
}
|
||||||
|
self.main_artist_collection.extend_object_to_attribute = {
|
||||||
self.main_artist_collection.contain_given_in_attribute = {
|
|
||||||
"main_album_collection": self.album_collection
|
"main_album_collection": self.album_collection
|
||||||
}
|
}
|
||||||
self.feature_artist_collection.append_object_to_attribute = {
|
self.feature_artist_collection.append_object_to_attribute = {
|
||||||
@@ -120,7 +120,7 @@ class Song(Base):
|
|||||||
def indexing_values(self) -> List[Tuple[str, object]]:
|
def indexing_values(self) -> List[Tuple[str, object]]:
|
||||||
return [
|
return [
|
||||||
('id', self.id),
|
('id', self.id),
|
||||||
('title', unify(self.unified_title)),
|
('title', unify(self.title)),
|
||||||
('isrc', self.isrc),
|
('isrc', self.isrc),
|
||||||
*[('url', source.url) for source in self.source_collection]
|
*[('url', source.url) for source in self.source_collection]
|
||||||
]
|
]
|
||||||
@@ -203,6 +203,7 @@ class Album(Base):
|
|||||||
notes: FormattedText
|
notes: FormattedText
|
||||||
|
|
||||||
source_collection: SourceCollection
|
source_collection: SourceCollection
|
||||||
|
|
||||||
artist_collection: Collection[Artist]
|
artist_collection: Collection[Artist]
|
||||||
song_collection: Collection[Song]
|
song_collection: Collection[Song]
|
||||||
label_collection: Collection[Label]
|
label_collection: Collection[Label]
|
||||||
@@ -245,11 +246,14 @@ class Album(Base):
|
|||||||
self.song_collection.append_object_to_attribute = {
|
self.song_collection.append_object_to_attribute = {
|
||||||
"album_collection": self
|
"album_collection": self
|
||||||
}
|
}
|
||||||
|
self.song_collection.sync_on_append = {
|
||||||
|
"main_artist_collection": self.artist_collection
|
||||||
|
}
|
||||||
|
|
||||||
self.artist_collection.append_object_to_attribute = {
|
self.artist_collection.append_object_to_attribute = {
|
||||||
"main_album_collection": self
|
"main_album_collection": self
|
||||||
}
|
}
|
||||||
self.artist_collection.contain_given_in_attribute = {
|
self.artist_collection.extend_object_to_attribute = {
|
||||||
"label_collection": self.label_collection
|
"label_collection": self.label_collection
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -338,7 +342,6 @@ class Album(Base):
|
|||||||
tracksort_map[i] = existing_list.pop(0)
|
tracksort_map[i] = existing_list.pop(0)
|
||||||
tracksort_map[i].tracksort = i
|
tracksort_map[i].tracksort = i
|
||||||
|
|
||||||
|
|
||||||
def compile(self, merge_into: bool = False):
|
def compile(self, merge_into: bool = False):
|
||||||
"""
|
"""
|
||||||
compiles the recursive structures,
|
compiles the recursive structures,
|
||||||
|
@@ -105,13 +105,6 @@ class Source(OuterProxy):
|
|||||||
('audio_url', self.audio_url),
|
('audio_url', self.audio_url),
|
||||||
]
|
]
|
||||||
|
|
||||||
def __merge__(self, __other: Source, override: bool = False):
|
|
||||||
if override:
|
|
||||||
self.audio_url = __other.audio_url
|
|
||||||
|
|
||||||
if self.audio_url is None or (override and __other.audio_url is not None):
|
|
||||||
self.audio_url = __other.audio_url
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.__repr__()
|
return self.__repr__()
|
||||||
|
|
||||||
|
@@ -18,10 +18,12 @@ from ..objects import (
|
|||||||
Contact,
|
Contact,
|
||||||
ID3Timestamp,
|
ID3Timestamp,
|
||||||
Lyrics,
|
Lyrics,
|
||||||
FormattedText
|
FormattedText,
|
||||||
|
Artwork,
|
||||||
)
|
)
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..utils.support_classes.download_result import DownloadResult
|
from ..utils.support_classes.download_result import DownloadResult
|
||||||
|
from ..utils.string_processing import clean_song_title
|
||||||
from ..utils.config import main_settings, logging_settings
|
from ..utils.config import main_settings, logging_settings
|
||||||
from ..utils.shared import DEBUG
|
from ..utils.shared import DEBUG
|
||||||
|
|
||||||
@@ -114,7 +116,7 @@ class Bandcamp(Page):
|
|||||||
|
|
||||||
if object_type is BandcampTypes.SONG:
|
if object_type is BandcampTypes.SONG:
|
||||||
return Song(
|
return Song(
|
||||||
title=name.strip(),
|
title=clean_song_title(name, artist_name=data["band_name"]),
|
||||||
source_list=source_list,
|
source_list=source_list,
|
||||||
main_artist_list=[
|
main_artist_list=[
|
||||||
Artist(
|
Artist(
|
||||||
@@ -252,11 +254,18 @@ class Bandcamp(Page):
|
|||||||
artist.source_collection.append(source)
|
artist.source_collection.append(source)
|
||||||
return artist
|
return artist
|
||||||
|
|
||||||
def _parse_track_element(self, track: dict) -> Optional[Song]:
|
def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]:
|
||||||
|
lyrics_list: List[Lyrics] = []
|
||||||
|
|
||||||
|
_lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text")
|
||||||
|
if _lyrics is not None:
|
||||||
|
lyrics_list.append(Lyrics(text=FormattedText(plain=_lyrics)))
|
||||||
|
|
||||||
return Song(
|
return Song(
|
||||||
title=track["item"]["name"].strip(),
|
title=clean_song_title(track["item"]["name"]),
|
||||||
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
|
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
|
||||||
tracksort=int(track["position"])
|
tracksort=int(track["position"]),
|
||||||
|
artwork=artwork,
|
||||||
)
|
)
|
||||||
|
|
||||||
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
||||||
@@ -289,12 +298,32 @@ class Bandcamp(Page):
|
|||||||
)]
|
)]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
artwork: Artwork = Artwork()
|
||||||
|
|
||||||
|
def _get_artwork_url(_data: dict) -> Optional[str]:
|
||||||
|
if "image" in _data:
|
||||||
|
return _data["image"]
|
||||||
|
for _property in _data.get("additionalProperty", []):
|
||||||
|
if _property.get("name") == "art_id":
|
||||||
|
return f"https://f4.bcbits.com/img/a{_property.get('value')}_2.jpg"
|
||||||
|
|
||||||
|
_artwork_url = _get_artwork_url(data)
|
||||||
|
if _artwork_url is not None:
|
||||||
|
artwork.append(url=_artwork_url, width=350, height=350)
|
||||||
|
else:
|
||||||
|
for album_release in data.get("albumRelease", []):
|
||||||
|
_artwork_url = _get_artwork_url(album_release)
|
||||||
|
if _artwork_url is not None:
|
||||||
|
artwork.append(url=_artwork_url, width=350, height=350)
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
|
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
|
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
album.song_collection.append(self._parse_track_element(track_json))
|
album.song_collection.append(self._parse_track_element(track_json, artwork=artwork))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -304,7 +333,6 @@ class Bandcamp(Page):
|
|||||||
def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]:
|
def _fetch_lyrics(self, soup: BeautifulSoup) -> List[Lyrics]:
|
||||||
track_lyrics = soup.find("div", {"class": "lyricsText"})
|
track_lyrics = soup.find("div", {"class": "lyricsText"})
|
||||||
if track_lyrics:
|
if track_lyrics:
|
||||||
self.LOGGER.debug(" Lyrics retrieved..")
|
|
||||||
return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))]
|
return [Lyrics(text=FormattedText(html=track_lyrics.prettify()))]
|
||||||
|
|
||||||
return []
|
return []
|
||||||
@@ -323,10 +351,9 @@ class Bandcamp(Page):
|
|||||||
if len(other_data_list) > 0:
|
if len(other_data_list) > 0:
|
||||||
other_data = json.loads(other_data_list[0]["data-tralbum"])
|
other_data = json.loads(other_data_list[0]["data-tralbum"])
|
||||||
|
|
||||||
if DEBUG:
|
dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False)
|
||||||
dump_to_file("bandcamp_song_data.json", data_container.text, is_json=True, exit_after_dump=False)
|
dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False)
|
||||||
dump_to_file("bandcamp_song_data_other.json", json.dumps(other_data), is_json=True, exit_after_dump=False)
|
dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False)
|
||||||
dump_to_file("bandcamp_song_page.html", r.text, exit_after_dump=False)
|
|
||||||
|
|
||||||
data = json.loads(data_container.text)
|
data = json.loads(data_container.text)
|
||||||
album_data = data["inAlbum"]
|
album_data = data["inAlbum"]
|
||||||
@@ -337,8 +364,8 @@ class Bandcamp(Page):
|
|||||||
mp3_url = value
|
mp3_url = value
|
||||||
|
|
||||||
song = Song(
|
song = Song(
|
||||||
title=data["name"].strip(),
|
title=clean_song_title(data["name"], artist_name=artist_data["name"]),
|
||||||
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
|
source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
|
||||||
album_list=[Album(
|
album_list=[Album(
|
||||||
title=album_data["name"].strip(),
|
title=album_data["name"].strip(),
|
||||||
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
||||||
@@ -351,8 +378,6 @@ class Bandcamp(Page):
|
|||||||
lyrics_list=self._fetch_lyrics(soup=soup)
|
lyrics_list=self._fetch_lyrics(soup=soup)
|
||||||
)
|
)
|
||||||
|
|
||||||
song.source_collection.append(source)
|
|
||||||
|
|
||||||
return song
|
return song
|
||||||
|
|
||||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||||
|
@@ -2,8 +2,9 @@ from datetime import datetime
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import inspect
|
||||||
|
|
||||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE
|
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
|
||||||
from .config import config, read_config, write_config
|
from .config import config, read_config, write_config
|
||||||
from .enums.colors import BColors
|
from .enums.colors import BColors
|
||||||
from .path_manager import LOCATIONS
|
from .path_manager import LOCATIONS
|
||||||
@@ -56,7 +57,8 @@ def object_trace(obj):
|
|||||||
if not DEBUG_OBJECT_TRACE:
|
if not DEBUG_OBJECT_TRACE:
|
||||||
return
|
return
|
||||||
|
|
||||||
output("object: " + str(obj), BColors.GREY)
|
appendix = f" called by [{' | '.join(f'{s.function} {Path(s.filename).name}:{str(s.lineno)}' for s in inspect.stack()[1:5])}]" if DEBUG_OBJECT_TRACE_CALLSTACK else ""
|
||||||
|
output("object: " + str(obj) + appendix, BColors.GREY)
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@@ -13,12 +13,13 @@ if not load_dotenv(Path(__file__).parent.parent.parent / ".env"):
|
|||||||
__stage__ = os.getenv("STAGE", "prod")
|
__stage__ = os.getenv("STAGE", "prod")
|
||||||
|
|
||||||
DEBUG = (__stage__ == "dev") and True
|
DEBUG = (__stage__ == "dev") and True
|
||||||
DEBUG_LOGGING = DEBUG and True
|
DEBUG_LOGGING = DEBUG and False
|
||||||
DEBUG_TRACE = DEBUG and True
|
DEBUG_TRACE = DEBUG and True
|
||||||
DEBUG_OBJECT_TRACE = DEBUG and True
|
DEBUG_OBJECT_TRACE = DEBUG and False
|
||||||
|
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
|
||||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||||
DEBUG_PAGES = DEBUG and False
|
DEBUG_PAGES = DEBUG and False
|
||||||
DEBUG_DUMP = DEBUG and True
|
DEBUG_DUMP = DEBUG and False
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
print("DEBUG ACTIVE")
|
print("DEBUG ACTIVE")
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
from typing import Tuple, Union
|
from typing import Tuple, Union, Optional
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import string
|
import string
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
from transliterate.exceptions import LanguageDetectionError
|
from transliterate.exceptions import LanguageDetectionError
|
||||||
from transliterate import translit
|
from transliterate import translit
|
||||||
@@ -10,8 +11,11 @@ from pathvalidate import sanitize_filename
|
|||||||
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
|
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
|
||||||
"(official video)",
|
"(official video)",
|
||||||
)
|
)
|
||||||
|
OPEN_BRACKETS = "(["
|
||||||
|
CLOSE_BRACKETS = ")]"
|
||||||
|
DISALLOWED_SUBSTRING_IN_BRACKETS = ("official", "video", "audio", "lyrics", "prod", "remix", "ft", "feat", "ft.", "feat.")
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
def unify(string: str) -> str:
|
def unify(string: str) -> str:
|
||||||
"""
|
"""
|
||||||
returns a unified str, to make comparisons easy.
|
returns a unified str, to make comparisons easy.
|
||||||
@@ -52,7 +56,8 @@ def fit_to_file_system(string: Union[str, Path]) -> Union[str, Path]:
|
|||||||
return fit_string(string)
|
return fit_string(string)
|
||||||
|
|
||||||
|
|
||||||
def clean_song_title(raw_song_title: str, artist_name: str) -> str:
|
@lru_cache(maxsize=128)
|
||||||
|
def clean_song_title(raw_song_title: str, artist_name: Optional[str] = None) -> str:
|
||||||
"""
|
"""
|
||||||
This function cleans common naming "conventions" for non clean song titles, like the title of youtube videos
|
This function cleans common naming "conventions" for non clean song titles, like the title of youtube videos
|
||||||
|
|
||||||
@@ -64,19 +69,45 @@ def clean_song_title(raw_song_title: str, artist_name: str) -> str:
|
|||||||
- `song (prod. some producer)`
|
- `song (prod. some producer)`
|
||||||
"""
|
"""
|
||||||
raw_song_title = raw_song_title.strip()
|
raw_song_title = raw_song_title.strip()
|
||||||
artist_name = artist_name.strip()
|
|
||||||
|
|
||||||
# Clean official Video appendix
|
# Clean official Video appendix
|
||||||
for dirty_appendix in COMMON_TITLE_APPENDIX_LIST:
|
for dirty_appendix in COMMON_TITLE_APPENDIX_LIST:
|
||||||
if raw_song_title.lower().endswith(dirty_appendix):
|
if raw_song_title.lower().endswith(dirty_appendix):
|
||||||
raw_song_title = raw_song_title[:-len(dirty_appendix)].strip()
|
raw_song_title = raw_song_title[:-len(dirty_appendix)].strip()
|
||||||
|
|
||||||
# Remove artist from the start of the title
|
# remove brackets and their content if they contain disallowed substrings
|
||||||
if raw_song_title.lower().startswith(artist_name.lower()):
|
for open_bracket, close_bracket in zip(OPEN_BRACKETS, CLOSE_BRACKETS):
|
||||||
raw_song_title = raw_song_title[len(artist_name):].strip()
|
if open_bracket not in raw_song_title or close_bracket not in raw_song_title:
|
||||||
|
continue
|
||||||
|
|
||||||
if raw_song_title.startswith("-"):
|
start = 0
|
||||||
raw_song_title = raw_song_title[1:].strip()
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
open_bracket_index = raw_song_title.index(open_bracket, start)
|
||||||
|
except ValueError:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
close_bracket_index = raw_song_title.index(close_bracket, open_bracket_index + 1)
|
||||||
|
except ValueError:
|
||||||
|
break
|
||||||
|
|
||||||
|
substring = raw_song_title[open_bracket_index + 1:close_bracket_index]
|
||||||
|
if any(disallowed_substring in substring for disallowed_substring in DISALLOWED_SUBSTRING_IN_BRACKETS):
|
||||||
|
raw_song_title = raw_song_title[:open_bracket_index] + raw_song_title[close_bracket_index + 1:]
|
||||||
|
else:
|
||||||
|
start = close_bracket_index + 1
|
||||||
|
|
||||||
|
# everything that requires the artist name
|
||||||
|
if artist_name is not None:
|
||||||
|
artist_name = artist_name.strip()
|
||||||
|
|
||||||
|
# Remove artist from the start of the title
|
||||||
|
if raw_song_title.lower().startswith(artist_name.lower()):
|
||||||
|
raw_song_title = raw_song_title[len(artist_name):].strip()
|
||||||
|
|
||||||
|
if raw_song_title.startswith("-"):
|
||||||
|
raw_song_title = raw_song_title[1:].strip()
|
||||||
|
|
||||||
return raw_song_title.strip()
|
return raw_song_title.strip()
|
||||||
|
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
[build-system]
|
[build-system]
|
||||||
requires = ["hatchling", "hatch-requirements-txt" ]
|
requires = ["hatchling", "hatch-requirements-txt", "hatch-vcs"]
|
||||||
build-backend = "hatchling.build"
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
[tool.hatch.build]
|
[tool.hatch.build]
|
||||||
@@ -15,7 +15,15 @@ packages = ["music_kraken"]
|
|||||||
music-kraken = "music_kraken.__main__:cli"
|
music-kraken = "music_kraken.__main__:cli"
|
||||||
|
|
||||||
[tool.hatch.version]
|
[tool.hatch.version]
|
||||||
path = "music_kraken/__init__.py"
|
source = "vcs"
|
||||||
|
path = "music_kraken/_version.py"
|
||||||
|
fallback-version = "0.0.0"
|
||||||
|
|
||||||
|
[tool.hatch.version.raw-options]
|
||||||
|
local_scheme = "no-local-version"
|
||||||
|
|
||||||
|
[tool.hatch.build.hooks.vcs]
|
||||||
|
version-file = "music_kraken/_version.py"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "music-kraken"
|
name = "music-kraken"
|
||||||
@@ -48,6 +56,7 @@ dependencies = [
|
|||||||
|
|
||||||
"rich~=13.7.1",
|
"rich~=13.7.1",
|
||||||
"mistune~=3.0.2",
|
"mistune~=3.0.2",
|
||||||
|
"markdownify~=0.12.1",
|
||||||
"html2markdown~=0.1.7",
|
"html2markdown~=0.1.7",
|
||||||
"jellyfish~=0.9.0",
|
"jellyfish~=0.9.0",
|
||||||
"transliterate~=1.10.2",
|
"transliterate~=1.10.2",
|
||||||
|
@@ -70,7 +70,49 @@ class TestCollection(unittest.TestCase):
|
|||||||
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
||||||
self.assertTrue(a.country == b.country == c.country == d.country)
|
self.assertTrue(a.country == b.country == c.country == d.country)
|
||||||
|
|
||||||
"""
|
def test_artist_artist_relation(self):
|
||||||
|
artist = Artist(
|
||||||
|
name="artist",
|
||||||
|
main_album_list=[
|
||||||
|
Album(
|
||||||
|
title="album",
|
||||||
|
song_list=[
|
||||||
|
Song(title="song"),
|
||||||
|
],
|
||||||
|
artist_list=[
|
||||||
|
Artist(name="artist"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
|
||||||
|
|
||||||
|
def test_artist_collection_sync(self):
|
||||||
|
album_1 = Album(
|
||||||
|
title="album",
|
||||||
|
song_list=[
|
||||||
|
Song(title="song", main_artist_list=[Artist(name="artist")]),
|
||||||
|
],
|
||||||
|
artist_list=[
|
||||||
|
Artist(name="artist"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
album_2 = Album(
|
||||||
|
title="album",
|
||||||
|
song_list=[
|
||||||
|
Song(title="song", main_artist_list=[Artist(name="artist")]),
|
||||||
|
],
|
||||||
|
artist_list=[
|
||||||
|
Artist(name="artist"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
album_1.merge(album_2)
|
||||||
|
|
||||||
|
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection))
|
||||||
|
|
||||||
def test_song_artist_relations(self):
|
def test_song_artist_relations(self):
|
||||||
a = self.complicated_object()
|
a = self.complicated_object()
|
||||||
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
|
||||||
@@ -80,7 +122,6 @@ class TestCollection(unittest.TestCase):
|
|||||||
self.assertTrue(a.id == b.id == c.id == d.id)
|
self.assertTrue(a.id == b.id == c.id == d.id)
|
||||||
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
|
||||||
self.assertTrue(a.country == b.country == c.country == d.country)
|
self.assertTrue(a.country == b.country == c.country == d.country)
|
||||||
"""
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Reference in New Issue
Block a user