Compare commits

..

No commits in common. "4e0b005170a17c8cb2ed5c97aa6a65477a326bc9" and "e3e7aea959111049753714d59414fb3a7d85b39e" have entirely different histories.

28 changed files with 953 additions and 1110 deletions

View File

@ -16,13 +16,11 @@
}, },
"python.formatting.provider": "none", "python.formatting.provider": "none",
"cSpell.words": [ "cSpell.words": [
"albumsort",
"APIC", "APIC",
"Bandcamp", "Bandcamp",
"dotenv", "dotenv",
"encyclopaedia", "encyclopaedia",
"ENDC", "ENDC",
"Gitea",
"levenshtein", "levenshtein",
"metallum", "metallum",
"musify", "musify",
@ -30,11 +28,9 @@
"pathvalidate", "pathvalidate",
"Referer", "Referer",
"sponsorblock", "sponsorblock",
"tracklist",
"tracksort", "tracksort",
"translit", "translit",
"unmap", "unmap",
"youtube", "youtube"
"youtubei"
] ]
} }

View File

@ -11,6 +11,7 @@ steps:
build-stable: build-stable:
image: python image: python
commands: commands:
- sed -i 's/name = "music-kraken"/name = "music-kraken-stable"/' pyproject.toml
- python -m pip install -r requirements-dev.txt - python -m pip install -r requirements-dev.txt
- python3 -m build - python3 -m build
environment: environment:

226
README.md
View File

@ -2,43 +2,61 @@
[![Woodpecker CI Status](https://ci.elara.ws/api/badges/59/status.svg)](https://ci.elara.ws/repos/59) [![Woodpecker CI Status](https://ci.elara.ws/api/badges/59/status.svg)](https://ci.elara.ws/repos/59)
<img src="https://gitea.elara.ws/music-kraken/music-kraken-core/media/branch/experimental/assets/logo.svg" width=300 alt="music kraken logo"/> <img src="assets/logo.svg" width=300 alt="music kraken logo"/>
- [Installation](#installation) - [Music Kraken](#music-kraken)
- [Quick-Guide](#quick-guide) - [Installation](#installation)
- [How to search properly](#query) - [From source](#from-source)
- [Matrix Space](#matrix-space) - [Notes for WSL](#notes-for-wsl)
- [Quick-Guide](#quick-guide)
If you want to use this a library or contribute, check out [the wiki](https://gitea.elara.ws/music-kraken/music-kraken-core/wiki) for more information. - [Query](#query)
- [CONTRIBUTE](#contribute)
- [Matrix Space](#matrix-space)
- [TODO till the next release](#todo-till-the-next-release)
- [Programming Interface / Use as Library](#programming-interface--use-as-library)
- [Quick Overview](#quick-overview)
- [Data Model](#data-model)
- [Data Objects](#data-objects)
- [Creation](#creation)
--- ---
## Installation ## Installation
You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package, You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package,
or simply the source code from [Gitea](https://gitea.elara.ws/music-kraken/music-kraken-core). ** or simply the source code from [GitHub](https://github.com/HeIIow2/music-downloader). Note that even though
everything **SHOULD** work cross-platform, I have only tested it on Ubuntu.
If you enjoy this project, feel free to give it a star on GitHub.
**NOTES** > THE PyPI PACKAGE IS OUTDATED
- Even though everything **SHOULD** work cross-platform, I have only tested it on Ubuntu.
- If you enjoy this project, feel free to give it a star on GitHub.
### From source ### From source
if you use Debian or Ubuntu:
```sh ```sh
git clone https://gitea.elara.ws/music-kraken/music-kraken-core.git git clone https://github.com/HeIIow2/music-downloader
python3 -m pip install -e music-kraken-core/ sudo apt install pandoc
cd music-downloader/
python3 -m pip install -r requirements.txt
``` ```
To update the program, if installed like this, go into the `music-kraken-core` directory and run `git pull`. then you can add to `~/.bashrc`
### Get it running on other Systems ```
alias music-kraken='cd your/directory/music-downloader/src; python3 -m music_kraken'
alias 🥺='sudo'
```
Here are the collected issues, that are related to running the program on different systems. If you have any issues, feel free to open a new one. ```sh
source ~/.bashrc
music-kraken
```
#### Windows + WSL ### Notes for WSL
Add ` ~/.local/bin` to your `$PATH`. [#2][i2] If you choose to run it in WSL, make sure ` ~/.local/bin` is added to your `$PATH` [#2][i2]
## Quick-Guide ## Quick-Guide
@ -69,6 +87,10 @@ The escape character is as usual `\`.
--- ---
## CONTRIBUTE
I am happy about every pull request. To contribute look [here](contribute.md).
## Matrix Space ## Matrix Space
<img align="right" alt="music-kraken logo" src="assets/element_logo.png" width=100> <img align="right" alt="music-kraken logo" src="assets/element_logo.png" width=100>
@ -77,5 +99,171 @@ I decided against creating a discord server, due to various communities get ofte
**Click [this invitation](https://matrix.to/#/#music-kraken:matrix.org) _([https://matrix.to/#/#music-kraken:matrix.org](https://matrix.to/#/#music-kraken:matrix.org))_ to join.** **Click [this invitation](https://matrix.to/#/#music-kraken:matrix.org) _([https://matrix.to/#/#music-kraken:matrix.org](https://matrix.to/#/#music-kraken:matrix.org))_ to join.**
## TODO till the next release
> These Points will most likely be in the changelogs.
- [x] Migrate away from pandoc, to a more lightweight alternative, that can be installed over PiPY.
- [ ] Update the Documentation of the internal structure. _(could be pushed back one release)_
---
# Programming Interface / Use as Library
This application is $100\%$ centered around Data. Thus, the most important thing for working with musik kraken is, to understand how I structured the data.
## Quick Overview
- explanation of the [Data Model](#data-model)
- how to use the [Data Objects](#data-objects)
- further Dokumentation of _hopefully_ [most relevant classes](documentation/objects.md)
- the [old implementation](documentation/old_implementation.md)
```mermaid
---
title: Quick Overview (outdated)
---
sequenceDiagram
participant pg as Page (eg. YouTube, MB, Musify, ...)
participant obj as DataObjects (eg. Song, Artist, ...)
participant db as DataBase
obj ->> db: write
db ->> obj: read
pg -> obj: find a source for any page, for object.
obj -> pg: add more detailed data from according page.
obj -> pg: if available download audio to target.
```
## Data Model
The Data Structure, that the whole programm is built on looks as follows:
```mermaid
---
title: Music Data
---
erDiagram
Target {
}
Lyrics {
}
Song {
}
Album {
}
Artist {
}
Label {
}
Source {
}
Source }o--|| Song : ""
Source }o--|| Lyrics : ""
Source }o--|| Album : ""
Source }o--|| Artist : ""
Source }o--|| Label : ""
Song }o--o{ Album : AlbumSong
Album }o--o{ Artist : ArtistAlbum
Song }o--o{ Artist : "ArtistSong (features)"
Label }o--o{ Album : LabelAlbum
Label }o--o{ Artist : LabelSong
Song ||--o{ Lyrics : ""
Song ||--o{ Target : ""
```
Ok now this **WILL** look intimidating, thus I break it down quickly.
*That is also the reason I didn't add all Attributes here.*
The most important Entities are:
- Song
- Album
- Artist
- Label
All of them *(and Lyrics)* can have multiple Sources, and every Source can only Point to one of those Element.
The `Target` Entity represents the location on the hard drive a Song has. One Song can have multiple download Locations.
The `Lyrics` Entity simply represents the Lyrics of each Song. One Song can have multiple Lyrics, e.g. Translations.
Here is the simplified Diagramm without only the main Entities.
```mermaid
---
title: simplified Music Data
---
erDiagram
Song {
}
Album {
}
Artist {
}
Label {
}
Song }o--o{ Album : AlbumSong
Album }o--o{ Artist : ArtistAlbum
Song }o--o{ Artist : "ArtistSong (features)"
Label }o--o{ Album : LabelAlbum
Label }o--o{ Artist : LabelSong
```
Looks way more manageable, doesn't it?
The reason every relation here is a `n:m` *(many to many)* relation is not, that it makes sense in the aspekt of modeling reality, but to be able to put data from many Sources in the same Data Model.
Every Service models Data a bit different, and projecting a one-to-many relationship to a many to many relationship without data loss is easy. The other way around it is basically impossible
## Data Objects
> Not 100% accurate yet and *might* change slightly
### Creation
```python
# needs to be added
```
If you just want to start implementing, then just use the code example I provided, I don't care.
For those who don't want any bugs and use it as intended *(which is recommended, cuz I am only one person so there are defs bugs)* continue reading, and read the whole documentation, which may exist in the future xD
[i10]: https://github.com/HeIIow2/music-downloader/issues/10 [i10]: https://github.com/HeIIow2/music-downloader/issues/10
[i2]: https://github.com/HeIIow2/music-downloader/issues/2 [i2]: https://github.com/HeIIow2/music-downloader/issues/2

View File

@ -7,8 +7,7 @@ logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__": if __name__ == "__main__":
commands = [ commands = [
"s: #a Crystal F", "s: #a Crystal F",
"10", "d: 20",
"2",
] ]

View File

@ -2,24 +2,30 @@ import music_kraken
from music_kraken.objects import Song, Album, Artist, Collection from music_kraken.objects import Song, Album, Artist, Collection
if __name__ == "__main__": if __name__ == "__main__":
song_1 = Song( album_1 = Album(
title="song", title="album",
feature_artist_list=[Artist( song_list=[
name="main_artist" Song(title="song", main_artist_list=[Artist(name="artist")]),
)] ],
artist_list=[
Artist(name="artist 3"),
]
) )
other_artist = Artist(name="other_artist") album_2 = Album(
title="album",
song_2 = Song( song_list=[
title = "song", Song(title="song", main_artist_list=[Artist(name="artist 2")]),
main_artist_list=[other_artist] ],
artist_list=[
Artist(name="artist"),
]
) )
other_artist.name = "main_artist" album_1.merge(album_2)
song_1.merge(song_2) print()
print(*(f"{a.title_string} ; {a.id}" for a in album_1.artist_collection.data), sep=" | ")
print("#" * 120) print(id(album_1.artist_collection), id(album_2.artist_collection))
print("main", *song_1.main_artist_collection) print(id(album_1.song_collection[0].main_artist_collection), id(album_2.song_collection[0].main_artist_collection))
print("feat", *song_1.feature_artist_collection)

View File

@ -80,7 +80,7 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
with temp_target.open("wb") as f: with temp_target.open("wb") as f:
f.write(r.content) f.write(r.content)
converted_target: Target = Target.temp(name=f"{song.title.replace('/', '_')}") converted_target: Target = Target.temp(name=f"{song.title}.jpeg")
with Image.open(temp_target.file_path) as img: with Image.open(temp_target.file_path) as img:
# crop the image if it isn't square in the middle with minimum data loss # crop the image if it isn't square in the middle with minimum data loss
width, height = img.size width, height = img.size

View File

@ -6,18 +6,16 @@ import re
from .utils import cli_function from .utils import cli_function
from .options.first_config import initial_config from .options.first_config import initial_config
from ..utils import output, BColors
from ..utils.config import write_config, main_settings from ..utils.config import write_config, main_settings
from ..utils.shared import URL_PATTERN from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors from ..utils.enums.colors import BColors
from .. import console from .. import console
from ..download.results import Results, Option, PageResults, GoToResults from ..download.results import Results, Option, PageResults
from ..download.page_attributes import Pages from ..download.page_attributes import Pages
from ..pages import Page from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject from ..objects import Song, Album, Artist, DatabaseObject
@ -176,7 +174,7 @@ class Downloader:
print() print()
page_count = 0 page_count = 0
for option in self.current_results.formatted_generator(): for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options):
if isinstance(option, Option): if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object) _downloadable = self.pages.is_downloadable(option.music_object)
@ -251,7 +249,7 @@ class Downloader:
f"Recommendations and suggestions on sites to implement appreciated.\n" f"Recommendations and suggestions on sites to implement appreciated.\n"
f"But don't be a bitch if I don't end up implementing it.") f"But don't be a bitch if I don't end up implementing it.")
return return
self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options)) self.set_current_options(PageResults(page, data_object.options))
self.print_current_options() self.print_current_options()
return return
@ -301,41 +299,66 @@ class Downloader:
self.set_current_options(self.pages.search(parsed_query)) self.set_current_options(self.pages.search(parsed_query))
self.print_current_options() self.print_current_options()
def goto(self, data_object: DatabaseObject): def goto(self, index: int):
page: Type[Page] page: Type[Page]
music_object: DatabaseObject
self.pages.fetch_details(data_object, stop_at_level=1) try:
page, music_object = self.current_results.get_music_object_by_index(index)
except KeyError:
print()
print(f"The option {index} doesn't exist.")
print()
return
self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options)) self.pages.fetch_details(music_object)
print(music_object)
print(music_object.options)
self.set_current_options(PageResults(page, music_object.options))
self.print_current_options() self.print_current_options()
def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool: def download(self, download_str: str, download_all: bool = False) -> bool:
output() to_download: List[DatabaseObject] = []
if len(data_objects) == 1:
output(f"Downloading {data_objects[0].option_string}...", color=BColors.BOLD) if re.match(URL_PATTERN, download_str) is not None:
_, music_objects = self.pages.fetch_url(download_str)
to_download.append(music_objects)
else: else:
output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n") index: str
for index in download_str.split(", "):
if not index.strip().isdigit():
print()
print(f"Every download thingie has to be an index, not {index}.")
print()
return False
for index in download_str.split(", "):
to_download.append(self.current_results.get_music_object_by_index(int(index))[1])
print()
print("Downloading:")
for download_object in to_download:
print(download_object.option_string)
print()
_result_map: Dict[DatabaseObject, DownloadResult] = dict() _result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in data_objects: for database_object in to_download:
r = self.pages.download( r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all,
music_object=database_object, process_metadata_anyway=self.process_metadata_anyway)
genre=self.genre,
**kwargs
)
_result_map[database_object] = r _result_map[database_object] = r
for music_object, result in _result_map.items(): for music_object, result in _result_map.items():
output() print()
output(music_object.option_string) print(music_object.option_string)
output(result) print(result)
return True return True
def process_input(self, input_str: str) -> bool: def process_input(self, input_str: str) -> bool:
try:
input_str = input_str.strip() input_str = input_str.strip()
processed_input: str = input_str.lower() processed_input: str = input_str.lower()
@ -351,69 +374,20 @@ class Downloader:
self.print_current_options() self.print_current_options()
return False return False
command = "" if processed_input.startswith("s: "):
query = processed_input self.search(input_str[3:])
if ":" in processed_input:
_ = processed_input.split(":")
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and do_download:
raise MKInvalidInputException(message="You can't search and download at the same time.")
if do_search and do_merge:
raise MKInvalidInputException(message="You can't search and merge at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False return False
indices = [] if processed_input.startswith("d: "):
for possible_index in query.split(","): return self.download(input_str[3:])
possible_index = possible_index.strip()
if possible_index == "":
continue
i = 0 if processed_input.isdigit():
try: self.goto(int(processed_input))
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
selected_objects = [self.current_results[i] for i in indices]
if do_merge:
old_selected_objects = selected_objects
a = old_selected_objects[0]
for b in old_selected_objects[1:]:
if type(a) != type(b):
raise MKInvalidInputException(message="You can't merge different types of objects.")
a.merge(b)
selected_objects = [a]
if do_download:
self.download(selected_objects)
return False return False
if len(selected_objects) != 1: if processed_input != "help":
raise MKInvalidInputException(message="You can only go to one object at a time without merging.") print(f"{BColors.WARNING.value}Invalid input.{BColors.ENDC.value}")
self.goto(selected_objects[0])
return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
help_message() help_message()
return False return False
def mainloop(self): def mainloop(self):

View File

@ -13,35 +13,31 @@ class Option:
class Results: class Results:
def __init__(self, max_items_per_page: int = 10, **kwargs) -> None: def __init__(self) -> None:
self._by_index: Dict[int, DatabaseObject] = dict() self._by_index: Dict[int, DatabaseObject] = dict()
self._page_by_index: Dict[int: Type[Page]] = dict() self._page_by_index: Dict[int: Type[Page]] = dict()
self.max_items_per_page = max_items_per_page
def __iter__(self) -> Generator[DatabaseObject, None, None]: def __iter__(self) -> Generator[DatabaseObject, None, None]:
for option in self.formatted_generator(): for option in self.formated_generator():
if isinstance(option, Option): if isinstance(option, Option):
yield option.music_object yield option.music_object
def formatted_generator(self) -> Generator[Union[Type[Page], Option], None, None]: def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]:
self._by_index = dict() self._by_index = dict()
self._page_by_index = dict() self._page_by_index = dict()
def __len__(self) -> int: def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]:
return max(self._by_index.keys()) # if this throws a key error, either the formatted generator needs to be iterated, or the option doesn't exist.
return self._page_by_index[index], self._by_index[index]
def __getitem__(self, index: int):
return self._by_index[index]
class SearchResults(Results): class SearchResults(Results):
def __init__( def __init__(
self, self,
pages: Tuple[Type[Page], ...] = None, pages: Tuple[Type[Page], ...] = None
**kwargs,
) -> None: ) -> None:
super().__init__(**kwargs) super().__init__()
self.pages = pages or [] self.pages = pages or []
# this would initialize a list for every page, which I don't think I want # this would initialize a list for every page, which I don't think I want
@ -59,11 +55,8 @@ class SearchResults(Results):
def get_page_results(self, page: Type[Page]) -> "PageResults": def get_page_results(self, page: Type[Page]) -> "PageResults":
return PageResults(page, self.results.get(page, [])) return PageResults(page, self.results.get(page, []))
def __len__(self) -> int: def formated_generator(self, max_items_per_page: int = 10):
return sum(min(self.max_items_per_page, len(results)) for results in self.results.values()) super().formated_generator()
def formatted_generator(self):
super().formatted_generator()
i = 0 i = 0
for page in self.results: for page in self.results:
@ -77,37 +70,19 @@ class SearchResults(Results):
i += 1 i += 1
j += 1 j += 1
if j >= self.max_items_per_page: if j >= max_items_per_page:
break break
class GoToResults(Results):
def __init__(self, results: List[DatabaseObject], **kwargs):
self.results: List[DatabaseObject] = results
super().__init__(**kwargs)
def __getitem__(self, index: int):
return self.results[index]
def __len__(self) -> int:
return len(self.results)
def formatted_generator(self):
yield from (Option(i, o) for i, o in enumerate(self.results))
class PageResults(Results): class PageResults(Results):
def __init__(self, page: Type[Page], results: List[DatabaseObject], **kwargs) -> None: def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None:
super().__init__(**kwargs) super().__init__()
self.page: Type[Page] = page self.page: Type[Page] = page
self.results: List[DatabaseObject] = results self.results: List[DatabaseObject] = results
def formated_generator(self, max_items_per_page: int = 10):
def formatted_generator(self, max_items_per_page: int = 10): super().formated_generator()
super().formatted_generator()
i = 0 i = 0
yield self.page yield self.page
@ -117,6 +92,3 @@ class PageResults(Results):
self._by_index[i] = option self._by_index[i] = option
self._page_by_index[i] = self.page self._page_by_index[i] = self.page
i += 1 i += 1
def __len__(self) -> int:
return len(self.results)

View File

@ -53,9 +53,9 @@ class Artwork:
def get_variant_name(self, variant: ArtworkVariant) -> str: def get_variant_name(self, variant: ArtworkVariant) -> str:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}" return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
def __merge__(self, other: Artwork, **kwargs) -> None: def __merge__(self, other: Artwork, override: bool = False) -> None:
for key, value in other._variant_mapping.items(): for key, value in other._variant_mapping.items():
if key not in self._variant_mapping: if key not in self._variant_mapping or override:
self._variant_mapping[key] = value self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool: def __eq__(self, other: Artwork) -> bool:

View File

@ -1,12 +1,9 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any
import copy
from .parents import OuterProxy from .parents import OuterProxy
from ..utils import object_trace from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy) T = TypeVar('T', bound=OuterProxy)
@ -16,8 +13,8 @@ class Collection(Generic[T]):
_data: List[T] _data: List[T]
_indexed_from_id: Dict[int, Dict[str, Any]] _indexed_values: Dict[str, set]
_indexed_values: Dict[str, Dict[Any, T]] _indexed_to_objects: Dict[any, list]
shallow_list = property(fget=lambda self: self.data) shallow_list = property(fget=lambda self: self.data)
@ -39,8 +36,8 @@ class Collection(Generic[T]):
self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {} self.append_object_to_attribute: Dict[str, T] = append_object_to_attribute or {}
self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {} self.extend_object_to_attribute: Dict[str, Collection[T]] = extend_object_to_attribute or {}
self.sync_on_append: Dict[str, Collection] = sync_on_append or {} self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = [] self._id_to_index_values: Dict[int, set] = defaultdict(set)
# This is to cleanly unmap previously mapped items by their id # This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict) self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
@ -49,18 +46,10 @@ class Collection(Generic[T]):
self.extend(data) self.extend(data)
def __hash__(self) -> int:
return id(self)
@property
def collection_names(self) -> List[str]:
return list(set(self._collection_for.values()))
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Collection({' | '.join(self.collection_names)} {id(self)})" return f"Collection({id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs): def _map_element(self, __object: T, from_map: bool = False):
if not no_unmap:
self._unmap_element(__object.id) self._unmap_element(__object.id)
self._indexed_from_id[__object.id]["id"] = __object.id self._indexed_from_id[__object.id]["id"] = __object.id
@ -85,125 +74,73 @@ class Collection(Generic[T]):
del self._indexed_from_id[obj_id] del self._indexed_from_id[obj_id]
def _remap(self): def _find_object(self, __object: T) -> Optional[T]:
# reinitialize the mapping to clean it without time consuming operations
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
self._indexed_values: Dict[str, Dict[Any, T]] = defaultdict(dict)
for e in self._data:
self._map_element(e, no_unmap=True)
def _find_object(self, __object: T, **kwargs) -> Optional[T]:
self._remap()
if __object.id in self._indexed_from_id:
return self._indexed_values["id"][__object.id]
for name, value in __object.indexing_values: for name, value in __object.indexing_values:
if value in self._indexed_values[name]: if value in self._indexed_values[name]:
return self._indexed_values[name][value] return self._indexed_values[name][value]
return None def append(self, __object: Optional[T], already_is_parent: bool = False, from_map: bool = False):
def _append_new_object(self, other: T, **kwargs):
"""
This function appends the other object to the current collection.
This only works if not another object, which represents the same real life object exists in the collection.
"""
self._data.append(other)
other._inner._is_in_collection.add(self)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
b = other.__getattribute__(attribute)
if a is b:
continue
object_trace(f"Syncing [{a}] = [{b}]")
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
del b
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
def append(self, other: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
merge into this object. (and remap) merge into this object. (and remap)
Else append to this collection. Else append to this collection.
:param other: :param __object:
:param already_is_parent:
:param from_map:
:return: :return:
""" """
if other is None: if __object is None:
return
if other.id in self._indexed_from_id:
return return
object_trace(f"Appending {other.option_string} to {self}") existing_object = self._find_object(__object)
# switching collection in the case of push to if existing_object is None:
for c in self.push_to: # append
r = c._find_object(other) self._data.append(__object)
if r is not None: self._map_element(__object)
# output("found push to", r, other, c, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
for c in self.pull_from: for collection_attribute, child_collection in self.extend_object_to_attribute.items():
r = c._find_object(other) __object.__getattribute__(collection_attribute).extend(child_collection)
if r is not None:
# output("found pull from", r, other, c, self, color=BColors.RED, sep="\t") for attribute, new_object in self.append_object_to_attribute.items():
c.remove(r, existing=r, **kwargs) __object.__getattribute__(attribute).append(new_object)
# only modify collections if the object actually has been appended
for attribute, a in self.sync_on_append.items():
b = __object.__getattribute__(attribute)
object_trace(f"Syncing [{a}{id(a)}] = [{b}{id(b)}]")
data_to_extend = b.data
a._collection_for.update(b._collection_for)
for synced_with, key in b._collection_for.items():
synced_with.__setattr__(key, a)
a.extend(data_to_extend)
existing = self._find_object(other)
if existing is None:
self._append_new_object(other, **kwargs)
else: else:
existing.merge(other, **kwargs) # merge only if the two objects are not the same
if existing_object.id == __object.id:
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, remove_from_other_collection=True, **kwargs):
other: T
for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None:
if not silent:
raise ValueError(f"Object {other} not found in {self}")
return other
if remove_from_other_collection:
for c in copy.copy(other._inner._is_in_collection):
c.remove(other, silent=True, remove_from_other_collection=False, **kwargs)
other._inner._is_in_collection = set()
else:
self._data.remove(existing)
self._unmap_element(existing)
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None
def extend(self, other_collections: Optional[Generator[T, None, None]], **kwargs):
if other_collections is None:
return return
for other_object in other_collections: old_id = existing_object.id
self.append(other_object, **kwargs)
existing_object.merge(__object)
if existing_object.id != old_id:
self._unmap_element(old_id)
self._map_element(existing_object)
def extend(self, __iterable: Optional[Generator[T, None, None]]):
if __iterable is None:
return
for __object in __iterable:
self.append(__object)
@property @property
def data(self) -> List[T]: def data(self) -> List[T]:
@ -219,9 +156,8 @@ class Collection(Generic[T]):
def __iter__(self) -> Iterator[T]: def __iter__(self) -> Iterator[T]:
yield from self._data yield from self._data
def __merge__(self, other: Collection, **kwargs): def __merge__(self, __other: Collection, override: bool = False):
object_trace(f"merging {str(self)} | {str(other)}") self.extend(__other)
self.extend(other, **kwargs)
def __getitem__(self, item: int): def __getitem__(self, item: int):
return self._data[item] return self._data[item]
@ -230,9 +166,3 @@ class Collection(Generic[T]):
if item >= len(self._data): if item >= len(self._data):
return default return default
return self._data[item] return self._data[item]
def __eq__(self, other: Collection) -> bool:
if self.empty and other.empty:
return True
return self._data == other._data

View File

@ -9,9 +9,9 @@ from pathlib import Path
import inspect import inspect
from .metadata import Metadata from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id from ..utils import get_unix_time, object_trace
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID, DEBUG_PRINT_ID from ..utils.shared import HIGHEST_ID
from ..utils.hacking import MetaClass from ..utils.hacking import MetaClass
LOGGER = logging_settings["object_logger"] LOGGER = logging_settings["object_logger"]
@ -29,15 +29,9 @@ class InnerData:
""" """
_refers_to_instances: set = None _refers_to_instances: set = None
_is_in_collection: set = None
"""
Attribute versions keep track, of if the attribute has been changed.
"""
def __init__(self, object_type, **kwargs): def __init__(self, object_type, **kwargs):
self._refers_to_instances = set() self._refers_to_instances = set()
self._is_in_collection = set()
self._fetched_from: dict = {} self._fetched_from: dict = {}
# initialize the default values # initialize the default values
@ -48,29 +42,21 @@ class InnerData:
for key, value in kwargs.items(): for key, value in kwargs.items():
if hasattr(value, "__is_collection__"): if hasattr(value, "__is_collection__"):
value._collection_for[self] = key value._collection_for[self] = key
self.__setattr__(key, value) self.__setattr__(key, value)
def __hash__(self): def __hash__(self):
return self.id return self.id
def __merge__(self, __other: InnerData, **kwargs): def __merge__(self, __other: InnerData, override: bool = False):
""" """
:param __other: :param __other:
:param override:
:return: :return:
""" """
self._fetched_from.update(__other._fetched_from) self._fetched_from.update(__other._fetched_from)
self._is_in_collection.update(__other._is_in_collection)
for key, value in __other.__dict__.copy().items(): for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
continue
if hasattr(value, "__is_collection__") and key in self.__dict__:
self.__getattribute__(key).__merge__(value, **kwargs)
continue
# just set the other value if self doesn't already have it # just set the other value if self doesn't already have it
if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)): if key not in self.__dict__ or (key in self.__dict__ and self.__dict__[key] == self._default_values.get(key)):
self.__setattr__(key, value) self.__setattr__(key, value)
@ -78,8 +64,13 @@ class InnerData:
# if the object of value implemented __merge__, it merges # if the object of value implemented __merge__, it merges
existing = self.__getattribute__(key) existing = self.__getattribute__(key)
if hasattr(existing, "__merge__"): if hasattr(type(existing), "__merge__"):
existing.__merge__(value, **kwargs) existing.__merge__(value, override)
continue
# override the existing value if requested
if override:
self.__setattr__(key, value)
class OuterProxy: class OuterProxy:
@ -93,6 +84,8 @@ class OuterProxy:
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple() UPWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
TITEL = "id"
def __init__(self, _id: int = None, dynamic: bool = False, **kwargs): def __init__(self, _id: int = None, dynamic: bool = False, **kwargs):
_automatic_id: bool = False _automatic_id: bool = False
@ -101,7 +94,7 @@ class OuterProxy:
generates a random integer id generates a random integer id
the range is defined in the config the range is defined in the config
""" """
_id = generate_id() _id = random.randint(0, HIGHEST_ID)
_automatic_id = True _automatic_id = True
kwargs["automatic_id"] = _automatic_id kwargs["automatic_id"] = _automatic_id
@ -123,7 +116,7 @@ class OuterProxy:
self._inner: InnerData = InnerData(type(self), **kwargs) self._inner: InnerData = InnerData(type(self), **kwargs)
self._inner._refers_to_instances.add(self) self._inner._refers_to_instances.add(self)
object_trace(f"creating {type(self).__name__} [{self.option_string}]") object_trace(f"creating {type(self).__name__} [{self.title_string}]")
self.__init_collections__() self.__init_collections__()
@ -180,12 +173,13 @@ class OuterProxy:
def __eq__(self, other: Any): def __eq__(self, other: Any):
return self.__hash__() == other.__hash__() return self.__hash__() == other.__hash__()
def merge(self, __other: Optional[OuterProxy], **kwargs): def merge(self, __other: Optional[OuterProxy], override: bool = False):
""" """
1. merges the data of __other in self 1. merges the data of __other in self
2. replaces the data of __other with the data of self 2. replaces the data of __other with the data of self
:param __other: :param __other:
:param override:
:return: :return:
""" """
if __other is None: if __other is None:
@ -202,7 +196,7 @@ class OuterProxy:
if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances): if len(b._inner._refers_to_instances) > len(a._inner._refers_to_instances):
a, b = b, a a, b = b, a
object_trace(f"merging {a.option_string} | {b.option_string}") object_trace(f"merging {type(a).__name__} [{a.title_string} | {a.id}] with {type(b).__name__} [{b.title_string} | {b.id}]")
old_inner = b._inner old_inner = b._inner
@ -210,11 +204,11 @@ class OuterProxy:
instance._inner = a._inner instance._inner = a._inner
a._inner._refers_to_instances.add(instance) a._inner._refers_to_instances.add(instance)
a._inner.__merge__(old_inner, **kwargs) a._inner.__merge__(old_inner, override=override)
del old_inner del old_inner
def __merge__(self, __other: Optional[OuterProxy], **kwargs): def __merge__(self, __other: Optional[OuterProxy], override: bool = False):
self.merge(__other, **kwargs) self.merge(__other, override)
def mark_as_fetched(self, *url_hash_list: List[str]): def mark_as_fetched(self, *url_hash_list: List[str]):
for url_hash in url_hash_list: for url_hash in url_hash_list:
@ -241,23 +235,7 @@ class OuterProxy:
@property @property
def options(self) -> List[P]: def options(self) -> List[P]:
r = [] return [self]
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
r.append(self)
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
@property
def option_string(self) -> str:
return self.title_string
INDEX_DEPENDS_ON: List[str] = []
@property @property
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
@ -289,10 +267,9 @@ class OuterProxy:
return r return r
TITEL = "id"
@property @property
def title_string(self) -> str: def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "") return str(self.__getattribute__(self.TITEL))
def __repr__(self): def __repr__(self):
return f"{type(self).__name__}({self.title_string})" return f"{type(self).__name__}({self.title_string})"

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import random import random
from collections import defaultdict from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union from typing import List, Optional, Dict, Tuple, Type, Union
import copy
import pycountry import pycountry
@ -23,7 +22,6 @@ from .parents import OuterProxy, P
from .source import Source, SourceCollection from .source import Source, SourceCollection
from .target import Target from .target import Target
from .country import Language, Country from .country import Language, Country
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify from ..utils.string_processing import unify
from .parents import OuterProxy as Base from .parents import OuterProxy as Base
@ -45,8 +43,7 @@ def get_collection_string(
template: str, template: str,
ignore_titles: Set[str] = None, ignore_titles: Set[str] = None,
background: BColors = OPTION_BACKGROUND, background: BColors = OPTION_BACKGROUND,
foreground: BColors = OPTION_FOREGROUND, foreground: BColors = OPTION_FOREGROUND
add_id: bool = DEBUG_PRINT_ID,
) -> str: ) -> str:
if collection.empty: if collection.empty:
return "" return ""
@ -58,15 +55,8 @@ def get_collection_string(
r = background r = background
def get_element_str(element) -> str:
nonlocal add_id
r = element.title_string.strip()
if add_id and False:
r += " " + str(element.id)
return r
element: Base element: Base
titel_list: List[str] = [get_element_str(element) for element in collection if element.title_string not in ignore_titles] titel_list: List[str] = [element.title_string.strip() for element in collection if element.title_string not in ignore_titles]
for i, titel in enumerate(titel_list): for i, titel in enumerate(titel_list):
delimiter = ", " delimiter = ", "
@ -119,29 +109,15 @@ class Song(Base):
"tracksort": lambda: 0, "tracksort": lambda: 0,
} }
def __init__( def __init__(self, title: str = "", unified_title: str = None, isrc: str = None, length: int = None,
self, genre: str = None, note: FormattedText = None, source_list: List[Source] = None,
title: str = None, target_list: List[Target] = None, lyrics_list: List[Lyrics] = None,
isrc: str = None, main_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
length: int = None, album_list: List[Album] = None, tracksort: int = 0, artwork: Optional[Artwork] = None, **kwargs) -> None:
genre: str = None,
note: FormattedText = None,
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None,
album_list: List[Album] = None,
tracksort: int = 0,
artwork: Optional[Artwork] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs) Base.__init__(**locals())
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection", "main_artist_collection", "feature_artist_collection")
TITEL = "title" TITEL = "title"
def __init_collections__(self) -> None: def __init_collections__(self) -> None:
@ -159,9 +135,6 @@ class Song(Base):
"feature_song_collection": self "feature_song_collection": self
} }
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:
return return
@ -171,21 +144,20 @@ class Song(Base):
return return
if isinstance(object_list, Artist): if isinstance(object_list, Artist):
self.feature_artist_collection.extend(object_list) self.main_artist_collection.extend(object_list)
return return
if isinstance(object_list, Album): if isinstance(object_list, Album):
self.album_collection.extend(object_list) self.album_collection.extend(object_list)
return return
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property @property
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id),
('title', unify(self.title)), ('title', unify(self.title)),
('isrc', self.isrc), ('isrc', self.isrc),
*self.source_collection.indexing_values(), *[('url', source.url) for source in self.source_collection]
] ]
@property @property
@ -197,8 +169,6 @@ class Song(Base):
id3Mapping.GENRE: [self.genre], id3Mapping.GENRE: [self.genre],
id3Mapping.TRACKNUMBER: [self.tracksort_str], id3Mapping.TRACKNUMBER: [self.tracksort_str],
id3Mapping.COMMENT: [self.note.markdown], id3Mapping.COMMENT: [self.note.markdown],
id3Mapping.FILE_WEBPAGE_URL: self.source_collection.url_list,
id3Mapping.SOURCE_WEBPAGE_URL: self.source_collection.homepage_list,
}) })
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3 # metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
@ -219,12 +189,20 @@ class Song(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title}) r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}") r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}") r += get_collection_string(self.feature_artist_collection, " feat. {}")
return r return r
@property
def options(self) -> List[P]:
options = self.main_artist_collection.shallow_list
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return options
@property @property
def tracksort_str(self) -> str: def tracksort_str(self) -> str:
""" """
@ -280,30 +258,18 @@ class Album(Base):
TITEL = "title" TITEL = "title"
# This is automatically generated # This is automatically generated
def __init__( def __init__(self, title: str = None, unified_title: str = None, album_status: AlbumStatus = None,
self, album_type: AlbumType = None, language: Language = None, date: ID3Timestamp = None,
title: str = None, barcode: str = None, albumsort: int = None, notes: FormattedText = None,
unified_title: str = None, source_list: List[Source] = None, artist_list: List[Artist] = None, song_list: List[Song] = None,
album_status: AlbumStatus = None, label_list: List[Label] = None, **kwargs) -> None:
album_type: AlbumType = None, super().__init__(title=title, unified_title=unified_title, album_status=album_status, album_type=album_type,
language: Language = None, language=language, date=date, barcode=barcode, albumsort=albumsort, notes=notes,
date: ID3Timestamp = None, source_list=source_list, artist_list=artist_list, song_list=song_list, label_list=label_list,
barcode: str = None, **kwargs)
albumsort: int = None,
notes: FormattedText = None,
source_list: List[Source] = None,
artist_list: List[Artist] = None,
song_list: List[Song] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",) DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection") UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "label_collection")
def __init_collections__(self): def __init_collections__(self):
self.song_collection.append_object_to_attribute = { self.song_collection.append_object_to_attribute = {
@ -336,14 +302,13 @@ class Album(Base):
self.label_collection.extend(object_list) self.label_collection.extend(object_list)
return return
INDEX_DEPENDS_ON = ("title", "barcode", "source_collection")
@property @property
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id),
('title', unify(self.title)), ('title', unify(self.title)),
('barcode', self.barcode), ('barcode', self.barcode),
*self.source_collection.indexing_values(), *[('url', source.url) for source in self.source_collection]
] ]
@property @property
@ -368,7 +333,7 @@ class Album(Base):
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.title + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}") r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
@ -376,6 +341,12 @@ class Album(Base):
r += f" with {len(self.song_collection)} songs" r += f" with {len(self.song_collection)} songs"
return r return r
@property
def options(self) -> List[P]:
options = [*self.artist_collection, self, *self.song_collection]
return options
def update_tracksort(self): def update_tracksort(self):
""" """
This updates the tracksort attributes, of the songs in This updates the tracksort attributes, of the songs in
@ -401,6 +372,18 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0) tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i tracksort_map[i].tracksort = i
def compile(self, merge_into: bool = False):
"""
compiles the recursive structures,
and does depending on the object some other stuff.
no need to override if only the recursive structure should be built.
override self.build_recursive_structures() instead
"""
self.update_tracksort()
self._build_recursive_structures(build_version=random.randint(0, 99999), merge=merge_into)
@property @property
def copyright(self) -> str: def copyright(self) -> str:
if self.date is None: if self.date is None:
@ -432,15 +415,21 @@ class Album(Base):
return self.album_type.value return self.album_type.value
"""
All objects dependent on Artist
"""
class Artist(Base): class Artist(Base):
name: str name: str
unified_name: str
country: Country country: Country
formed_in: ID3Timestamp formed_in: ID3Timestamp
notes: FormattedText notes: FormattedText
lyrical_themes: List[str] lyrical_themes: List[str]
general_genre: str general_genre: str
unformatted_location: str unformated_location: str
source_collection: SourceCollection source_collection: SourceCollection
contact_collection: Collection[Contact] contact_collection: Collection[Contact]
@ -450,9 +439,10 @@ class Artist(Base):
label_collection: Collection[Label] label_collection: Collection[Label]
_default_factories = { _default_factories = {
"name": lambda: None, "name": str,
"unified_name": lambda: None,
"country": lambda: None, "country": lambda: None,
"unformatted_location": lambda: None, "unformated_location": lambda: None,
"formed_in": ID3Timestamp, "formed_in": ID3Timestamp,
"notes": FormattedText, "notes": FormattedText,
@ -469,30 +459,19 @@ class Artist(Base):
TITEL = "name" TITEL = "name"
# This is automatically generated # This is automatically generated
def __init__( def __init__(self, name: str = "", unified_name: str = None, country: Country = None,
self, formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None,
name: str = None, general_genre: str = None, unformated_location: str = None, source_list: List[Source] = None,
unified_name: str = None, contact_list: List[Contact] = None, feature_song_list: List[Song] = None,
country: Country = None, main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None:
formed_in: ID3Timestamp = None,
notes: FormattedText = None,
lyrical_themes: List[str] = None,
general_genre: str = None,
unformatted_location: str = None,
source_list: List[Source] = None,
contact_list: List[Contact] = None,
feature_song_list: List[Song] = None,
main_album_list: List[Album] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs) super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes,
lyrical_themes=lyrical_themes, general_genre=general_genre,
unformated_location=unformated_location, source_list=source_list, contact_list=contact_list,
feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list,
**kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("feature_song_collection", "main_album_collection")
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",) UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self): def __init_collections__(self):
@ -525,6 +504,12 @@ class Artist(Base):
self.label_collection.extend(object_list) self.label_collection.extend(object_list)
return return
@property
def options(self) -> List[P]:
options = [self, *self.main_album_collection.shallow_list, *self.feature_album]
print(options)
return options
def update_albumsort(self): def update_albumsort(self):
""" """
This updates the albumsort attributes, of the albums in This updates the albumsort attributes, of the albums in
@ -582,27 +567,40 @@ class Artist(Base):
# replace the old collection with the new one # replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album) self.main_album_collection: Collection = Collection(data=album_list, element_type=Album)
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property @property
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id),
('name', unify(self.name)), ('name', unify(self.name)),
*[('contact', contact.value) for contact in self.contact_collection], *[('url', source.url) for source in self.source_collection],
*self.source_collection.indexing_values(), *[('contact', contact.value) for contact in self.contact_collection]
] ]
@property @property
def metadata(self) -> Metadata: def metadata(self) -> Metadata:
metadata = Metadata({ metadata = Metadata({
id3Mapping.ARTIST: [self.name], id3Mapping.ARTIST: [self.name]
id3Mapping.ARTIST_WEBPAGE_URL: self.source_collection.url_list,
}) })
metadata.merge_many([s.get_artist_metadata() for s in self.source_collection])
return metadata return metadata
"""
def __str__(self, include_notes: bool = False):
string = self.name or ""
if include_notes:
plaintext_notes = self.notes.get_plaintext()
if plaintext_notes is not None:
string += "\n" + plaintext_notes
return string
"""
def __repr__(self):
return f"Artist(\"{self.name}\")"
@property @property
def option_string(self) -> str: def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value r = OPTION_FOREGROUND.value + self.name + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}") r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value r += OPTION_BACKGROUND.value
@ -615,6 +613,48 @@ class Artist(Base):
return r return r
@property
def options(self) -> List[P]:
options = [self]
options.extend(self.main_album_collection)
options.extend(self.feature_song_collection)
return options
@property
def feature_album(self) -> Album:
return Album(
title="features",
album_status=AlbumStatus.UNRELEASED,
album_type=AlbumType.COMPILATION_ALBUM,
is_split=True,
albumsort=666,
dynamic=True,
song_list=self.feature_song_collection.shallow_list
)
def get_all_songs(self) -> List[Song]:
"""
returns a list of all Songs.
probably not that useful, because it is unsorted
"""
collection = self.feature_song_collection.copy()
for album in self.discography:
collection.extend(album.song_collection)
return collection
@property
def discography(self) -> List[Album]:
flat_copy_discography = self.main_album_collection.copy()
flat_copy_discography.append(self.feature_album)
return flat_copy_discography
"""
Label
"""
class Label(Base): class Label(Base):
COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection") COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection")
@ -643,21 +683,12 @@ class Label(Base):
TITEL = "name" TITEL = "name"
def __init__( def __init__(self, name: str = None, unified_name: str = None, notes: FormattedText = None,
self, source_list: List[Source] = None, contact_list: List[Contact] = None,
name: str = None, album_list: List[Album] = None, current_artist_list: List[Artist] = None, **kwargs) -> None:
unified_name: str = None, super().__init__(name=name, unified_name=unified_name, notes=notes, source_list=source_list,
notes: FormattedText = None, contact_list=contact_list, album_list=album_list, current_artist_list=current_artist_list,
source_list: List[Source] = None, **kwargs)
contact_list: List[Contact] = None,
album_list: List[Album] = None,
current_artist_list: List[Artist] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
def __init_collections__(self): def __init_collections__(self):
self.album_collection.append_object_to_attribute = { self.album_collection.append_object_to_attribute = {
@ -671,6 +702,7 @@ class Label(Base):
@property @property
def indexing_values(self) -> List[Tuple[str, object]]: def indexing_values(self) -> List[Tuple[str, object]]:
return [ return [
('id', self.id),
('name', unify(self.name)), ('name', unify(self.name)),
*[('url', source.url) for source in self.source_collection] *[('url', source.url) for source in self.source_collection]
] ]

View File

@ -2,176 +2,142 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator from typing import List, Dict, Set, Tuple, Optional, Iterable
from urllib.parse import urlparse, ParseResult from urllib.parse import urlparse
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums.source import SourcePages, SourceTypes from ..utils.enums.source import SourcePages, SourceTypes
from ..utils.config import youtube_settings from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url from ..utils.string_processing import hash_url
from .metadata import Mapping, Metadata from .metadata import Mapping, Metadata
from .parents import OuterProxy from .parents import OuterProxy
from .collection import Collection from .collection import Collection
class Source(OuterProxy):
@dataclass
class Source:
page_enum: SourcePages
url: str url: str
referrer_page: SourcePages = None
audio_url: Optional[str] = None
additional_data: dict = field(default_factory=dict) page_enum: SourcePages
referer_page: SourcePages
def __post_init__(self): audio_url: str
self.referrer_page = self.referrer_page or self.page_enum
@property _default_factories = {
def parsed_url(self) -> ParseResult: "audio_url": lambda: None,
return urlparse(self.url) }
# This is automatically generated
def __init__(self, page_enum: SourcePages, url: str, referer_page: SourcePages = None, audio_url: str = None,
**kwargs) -> None:
if referer_page is None:
referer_page = page_enum
super().__init__(url=url, page_enum=page_enum, referer_page=referer_page, audio_url=audio_url, **kwargs)
@classmethod @classmethod
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]: def match_url(cls, url: str, referer_page: SourcePages) -> Optional["Source"]:
""" """
this shouldn't be used, unless you are not certain what the source is for this shouldn't be used, unlesse you are not certain what the source is for
the reason is that it is more inefficient the reason is that it is more inefficient
""" """
parsed_url = urlparse(url) parsed = urlparse(url)
url = parsed_url.geturl() url = parsed.geturl()
if "musify" in parsed_url.netloc: if "musify" in parsed.netloc:
return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page) return cls(SourcePages.MUSIFY, url, referer_page=referer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]: if parsed.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page) return cls(SourcePages.YOUTUBE, url, referer_page=referer_page)
if url.startswith("https://www.deezer"): if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url, referrer_page=referrer_page) return cls(SourcePages.DEEZER, url, referer_page=referer_page)
if url.startswith("https://open.spotify.com"): if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page) return cls(SourcePages.SPOTIFY, url, referer_page=referer_page)
if "bandcamp" in url: if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page) return cls(SourcePages.BANDCAMP, url, referer_page=referer_page)
if "wikipedia" in parsed_url.netloc: if "wikipedia" in parsed.netloc:
return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page) return cls(SourcePages.WIKIPEDIA, url, referer_page=referer_page)
if url.startswith("https://www.metal-archives.com/"): if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page) return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referer_page=referer_page)
# the less important once # the less important once
if url.startswith("https://www.facebook"): if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page) return cls(SourcePages.FACEBOOK, url, referer_page=referer_page)
if url.startswith("https://www.instagram"): if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page) return cls(SourcePages.INSTAGRAM, url, referer_page=referer_page)
if url.startswith("https://twitter"): if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url, referrer_page=referrer_page) return cls(SourcePages.TWITTER, url, referer_page=referer_page)
if url.startswith("https://myspace.com"): if url.startswith("https://myspace.com"):
return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page) return cls(SourcePages.MYSPACE, url, referer_page=referer_page)
def get_song_metadata(self) -> Metadata:
return Metadata({
Mapping.FILE_WEBPAGE_URL: [self.url],
Mapping.SOURCE_WEBPAGE_URL: [self.homepage]
})
def get_artist_metadata(self) -> Metadata:
return Metadata({
Mapping.ARTIST_WEBPAGE_URL: [self.url]
})
@property @property
def hash_url(self) -> str: def hash_url(self) -> str:
return hash_url(self.url) return hash_url(self.url)
@property @property
def indexing_values(self) -> list: def metadata(self) -> Metadata:
r = [hash_url(self.url)] return self.get_song_metadata()
if self.audio_url:
r.append(hash_url(self.audio_url)) @property
return r def indexing_values(self) -> List[Tuple[str, object]]:
return [
('id', self.id),
('url', self.url),
('audio_url', self.audio_url),
]
def __str__(self):
return self.__repr__()
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})" return f"Src({self.page_enum.value}: {self.url}, {self.audio_url})"
def __merge__(self, other: Source, **kwargs): @property
if self.audio_url is None: def title_string(self) -> str:
self.audio_url = other.audio_url return self.url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.page_enum.value) page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
class SourceCollection: class SourceCollection(Collection):
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourcePages, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list) self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
self._indexed_sources = {}
self.extend(data or []) super().__init__(data=data, **kwargs)
def has_source_page(self, *source_pages: SourcePages) -> bool: def _map_element(self, __object: Source, **kwargs):
return any(source_page in self._page_to_source_list for source_page in source_pages) super()._map_element(__object, **kwargs)
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: self._page_to_source_list[__object.page_enum].append(__object)
if not len(source_pages):
source_pages = self.source_pages
for page in source_pages:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:
existing_source = self._indexed_sources[key]
break
if existing_source is not None:
existing_source.__merge__(source)
source = existing_source
else:
self._page_to_source_list[source.page_enum].append(source)
changed = False
for key in source.indexing_values:
if key not in self._indexed_sources:
changed = True
self._indexed_sources[key] = source
if changed:
self.__change_version__ = generate_id()
def extend(self, sources: Iterable[Source]):
for source in sources:
self.append(source)
def __iter__(self):
yield from self.get_sources()
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property @property
def source_pages(self) -> Iterable[SourcePages]: def source_pages(self) -> Set[SourcePages]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value) return set(source.page_enum for source in self._data)
@property def get_sources_from_page(self, source_page: SourcePages) -> List[Source]:
def hash_url_list(self) -> List[str]: """
return [hash_url(source.url) for source in self.get_sources()] getting the sources for a specific page like
YouTube or musify
@property """
def url_list(self) -> List[str]: return self._page_to_source_list[source_page].copy()
return [source.url for source in self.get_sources()]
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:
yield "url", index

View File

@ -89,6 +89,52 @@ class NamingDict(dict):
return self.default_value_for_name(attribute_name) return self.default_value_for_name(attribute_name)
def _clean_music_object(music_object: INDEPENDENT_DB_OBJECTS, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
if type(music_object) == Label:
return _clean_label(label=music_object, collections=collections)
if type(music_object) == Artist:
return _clean_artist(artist=music_object, collections=collections)
if type(music_object) == Album:
return _clean_album(album=music_object, collections=collections)
if type(music_object) == Song:
return _clean_song(song=music_object, collections=collections)
def _clean_collection(collection: Collection, collection_dict: Dict[INDEPENDENT_DB_TYPES, Collection]):
if collection.element_type not in collection_dict:
return
for i, element in enumerate(collection):
r = collection_dict[collection.element_type].append(element, merge_into_existing=True)
collection[i] = r.current_element
if not r.was_the_same:
_clean_music_object(r.current_element, collection_dict)
def _clean_label(label: Label, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(label.current_artist_collection, collections)
_clean_collection(label.album_collection, collections)
def _clean_artist(artist: Artist, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(artist.main_album_collection, collections)
_clean_collection(artist.feature_song_collection, collections)
_clean_collection(artist.label_collection, collections)
def _clean_album(album: Album, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(album.label_collection, collections)
_clean_collection(album.song_collection, collections)
_clean_collection(album.artist_collection, collections)
def _clean_song(song: Song, collections: Dict[INDEPENDENT_DB_TYPES, Collection]):
_clean_collection(song.album_collection, collections)
_clean_collection(song.feature_artist_collection, collections)
_clean_collection(song.main_artist_collection, collections)
class Page: class Page:
""" """
This is an abstract class, laying out the This is an abstract class, laying out the
@ -200,7 +246,7 @@ class Page:
# only certain database objects, have a source list # only certain database objects, have a source list
if isinstance(music_object, INDEPENDENT_DB_OBJECTS): if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source source: Source
for source in music_object.source_collection.get_sources(self.SOURCE_TYPE): for source in music_object.source_collection.get_sources_from_page(self.SOURCE_TYPE):
if music_object.already_fetched_from(source.hash_url): if music_object.already_fetched_from(source.hash_url):
continue continue
@ -254,7 +300,7 @@ class Page:
} }
if obj_type in fetch_map: if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level=stop_at_level) music_object = fetch_map[obj_type](source, stop_at_level)
else: else:
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}") self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
return None return None
@ -373,10 +419,9 @@ class Page:
if song.target_collection.empty: if song.target_collection.empty:
song.target_collection.append(new_target) song.target_collection.append(new_target)
if not song.source_collection.has_source_page(self.SOURCE_TYPE): sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.") if len(sources) == 0:
return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
temp_target: Target = Target( temp_target: Target = Target(
relative_to_music_dir=False, relative_to_music_dir=False,
@ -403,21 +448,14 @@ class Page:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.") self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r return r
skip_intervals = [] source = sources[0]
if not found_on_disc: if not found_on_disc:
for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string) r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error: if not r.is_fatal_error:
skip_intervals = self.get_skip_intervals(song, source) r.merge(self._post_process_targets(song, temp_target,
break [] if found_on_disc else self.get_skip_intervals(song, source)))
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=skip_intervals,
))
return r return r

View File

@ -185,7 +185,7 @@ class Bandcamp(Page):
if li is None and li['href'] is not None: if li is None and li['href'] is not None:
continue continue
source_list.append(Source.match_url(_parse_artist_url(li['href']), referrer_page=self.SOURCE_TYPE)) source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE))
return Artist( return Artist(
name=name, name=name,

View File

@ -486,7 +486,7 @@ class EncyclopaediaMetallum(Page):
href = anchor["href"] href = anchor["href"]
if href is not None: if href is not None:
source_list.append(Source.match_url(href, referrer_page=self.SOURCE_TYPE)) source_list.append(Source.match_url(href, referer_page=self.SOURCE_TYPE))
# The following code is only legacy code, which I just kep because it doesn't harm. # The following code is only legacy code, which I just kep because it doesn't harm.
# The way ma returns sources changed. # The way ma returns sources changed.
@ -504,7 +504,7 @@ class EncyclopaediaMetallum(Page):
if url is None: if url is None:
continue continue
source_list.append(Source.match_url(url, referrer_page=self.SOURCE_TYPE)) source_list.append(Source.match_url(url, referer_page=self.SOURCE_TYPE))
return source_list return source_list

View File

@ -1,7 +1,7 @@
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import List, Optional, Type, Union, Generator, Dict, Any from typing import List, Optional, Type, Union, Generator
from urllib.parse import urlparse from urllib.parse import urlparse
import pycountry import pycountry
@ -24,7 +24,7 @@ from ..objects import (
Lyrics, Lyrics,
Artwork Artwork
) )
from ..utils.config import logging_settings, main_settings from ..utils.config import logging_settings
from ..utils import string_processing, shared from ..utils import string_processing, shared
from ..utils.string_processing import clean_song_title from ..utils.string_processing import clean_song_title
from ..utils.support_classes.query import Query from ..utils.support_classes.query import Query
@ -361,7 +361,7 @@ class Musify(Page):
return Song( return Song(
title=clean_song_title(song_title, artist_name=artist_list[0].name if len(artist_list) > 0 else None), title=clean_song_title(song_title, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
feature_artist_list=artist_list, main_artist_list=artist_list,
source_list=source_list source_list=source_list
) )
@ -503,14 +503,14 @@ class Musify(Page):
source_list.append(Source( source_list.append(Source(
SourcePages.YOUTUBE, SourcePages.YOUTUBE,
iframe["src"], iframe["src"],
referrer_page=self.SOURCE_TYPE referer_page=self.SOURCE_TYPE
)) ))
return Song( return Song(
title=clean_song_title(track_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None), title=clean_song_title(track_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
source_list=source_list, source_list=source_list,
lyrics_list=lyrics_list, lyrics_list=lyrics_list,
feature_artist_list=artist_list, main_artist_list=artist_list,
album_list=album_list, album_list=album_list,
artwork=artwork, artwork=artwork,
) )
@ -652,101 +652,10 @@ class Musify(Page):
return Song( return Song(
title=clean_song_title(song_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None), title=clean_song_title(song_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
tracksort=tracksort, tracksort=tracksort,
feature_artist_list=artist_list, main_artist_list=artist_list,
source_list=source_list source_list=source_list
) )
def _parse_album(self, soup: BeautifulSoup) -> Album:
name: str = None
source_list: List[Source] = []
artist_list: List[Artist] = []
date: ID3Timestamp = None
"""
if breadcrumb list has 4 elements, then
the -2 is the artist link,
the -1 is the album
"""
# breadcrumb
breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"})
breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"})
if len(breadcrumb_elements) == 4:
# album
album_crumb: BeautifulSoup = breadcrumb_elements[-1]
name = album_crumb.text.strip()
# artist
artist_crumb: BeautifulSoup = breadcrumb_elements[-2]
anchor: BeautifulSoup = artist_crumb.find("a")
if anchor is not None:
href = anchor.get("href")
artist_source_list: List[Source] = []
if href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip()))
span: BeautifulSoup = anchor.find("span")
if span is not None:
artist_list.append(Artist(
name=span.get_text(strip=True),
source_list=artist_source_list
))
else:
self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case")
# meta
meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"})
if meta_url is not None:
url = meta_url.get("content")
if url is not None:
source_list.append(Source(self.SOURCE_TYPE, self.HOST + url))
meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"})
if meta_name is not None:
_name = meta_name.get("content")
if _name is not None:
name = _name
# album info
album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"})
if album_info_ul is not None:
artist_anchor: BeautifulSoup
for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}):
# line 98
artist_source_list: List[Source] = []
artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"})
if artist_url_meta is not None:
artist_href = artist_url_meta.get("content")
if artist_href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href))
artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"})
if artist_meta_name is not None:
artist_name = artist_meta_name.get("content")
if artist_name is not None:
artist_list.append(Artist(
name=artist_name,
source_list=artist_source_list
))
time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"})
if time_soup is not None:
raw_datetime = time_soup.get("datetime")
if raw_datetime is not None:
try:
date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d")
except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
return Album(
title=name,
source_list=source_list,
artist_list=artist_list,
date=date
)
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
""" """
fetches album from source: fetches album from source:
@ -781,18 +690,30 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup) new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song) album.song_collection.append(new_song)
if stop_at_level > 1:
song: Song
for song in album.song_collection:
sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
for source in sources:
song.merge(self.fetch_song(source=source))
album.update_tracksort() album.update_tracksort()
return album return album
def _fetch_initial_artist(self, url: MusifyUrl, source: Source, **kwargs) -> Artist: def _get_artist_attributes(self, url: MusifyUrl) -> Artist:
""" """
fetches the main Artist attributes from this endpoint
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
it needs to parse html
:param url:
:return:
""" """
r = self.connection.get(f"https://musify.club/{url.source_type.value}/{url.name_with_id}?_pjax=#bodyContent", name="artist_attributes_" + url.name_with_id) r = self.connection.get(f"https://musify.club/{url.source_type.value}/{url.name_with_id}?_pjax=#bodyContent", name="artist_attributes_" + url.name_with_id)
if r is None: if r is None:
return Artist(source_list=[source]) return Artist()
soup = self.get_soup_from_response(r) soup = self.get_soup_from_response(r)
@ -891,7 +812,7 @@ class Musify(Page):
href = additional_source.get("href") href = additional_source.get("href")
if href is None: if href is None:
continue continue
new_src = Source.match_url(href, referrer_page=self.SOURCE_TYPE) new_src = Source.match_url(href, referer_page=self.SOURCE_TYPE)
if new_src is None: if new_src is None:
continue continue
source_list.append(new_src) source_list.append(new_src)
@ -907,7 +828,7 @@ class Musify(Page):
notes=notes notes=notes
) )
def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album: def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None) -> Album:
""" """
<div class="card release-thumbnail" data-type="2"> <div class="card release-thumbnail" data-type="2">
<a href="/release/ghost-bath-self-loather-2021-1554266"> <a href="/release/ghost-bath-self-loather-2021-1554266">
@ -931,9 +852,33 @@ class Musify(Page):
</div> </div>
""" """
album_kwargs: Dict[str, Any] = { _id: Optional[str] = None
"source_list": [], name: str = None
} source_list: List[Source] = []
timestamp: Optional[ID3Timestamp] = None
album_status = None
def set_name(new_name: str):
nonlocal name
nonlocal artist_name
# example of just setting not working:
# https://musify.club/release/unjoy-eurythmie-psychonaut-4-tired-numb-still-alive-2012-324067
if new_name.count(" - ") != 1:
name = new_name
return
potential_artist_list, potential_name = new_name.split(" - ")
unified_artist_list = string_processing.unify(potential_artist_list)
if artist_name is not None:
if string_processing.unify(artist_name) not in unified_artist_list:
name = new_name
return
name = potential_name
return
name = new_name
album_status_id = album_card.get("data-type") album_status_id = album_card.get("data-type")
if album_status_id.isdigit(): if album_status_id.isdigit():
@ -944,7 +889,9 @@ class Musify(Page):
album_status = AlbumStatus.BOOTLEG album_status = AlbumStatus.BOOTLEG
def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False): def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False):
nonlocal album_kwargs nonlocal _id
nonlocal name
nonlocal source_list
if _anchor is None: if _anchor is None:
return return
@ -952,13 +899,20 @@ class Musify(Page):
href = _anchor.get("href") href = _anchor.get("href")
if href is not None: if href is not None:
# add url to sources # add url to sources
album_kwargs["source_list"].append(Source( source_list.append(Source(
self.SOURCE_TYPE, self.SOURCE_TYPE,
self.HOST + href self.HOST + href
)) ))
if text_is_name: # split id from url
album_kwargs["title"] = clean_song_title(_anchor.text, artist_name) split_href = href.split("-")
if len(split_href) > 1:
_id = split_href[-1]
if not text_is_name:
return
set_name(_anchor.text)
anchor_list = album_card.find_all("a", recursive=False) anchor_list = album_card.find_all("a", recursive=False)
if len(anchor_list) > 0: if len(anchor_list) > 0:
@ -969,7 +923,7 @@ class Musify(Page):
if thumbnail is not None: if thumbnail is not None:
alt = thumbnail.get("alt") alt = thumbnail.get("alt")
if alt is not None: if alt is not None:
album_kwargs["title"] = clean_song_title(alt, artist_name) set_name(alt)
image_url = thumbnail.get("src") image_url = thumbnail.get("src")
else: else:
@ -986,7 +940,7 @@ class Musify(Page):
13.11.2021 13.11.2021
</small> </small>
""" """
nonlocal album_kwargs nonlocal timestamp
italic_tagging_soup: BeautifulSoup = small_soup.find("i") italic_tagging_soup: BeautifulSoup = small_soup.find("i")
if italic_tagging_soup is None: if italic_tagging_soup is None:
@ -996,7 +950,7 @@ class Musify(Page):
return return
raw_time = small_soup.text.strip() raw_time = small_soup.text.strip()
album_kwargs["date"] = ID3Timestamp.strptime(raw_time, "%d.%m.%Y") timestamp = ID3Timestamp.strptime(raw_time, "%d.%m.%Y")
# parse small date # parse small date
card_footer_list = album_card.find_all("div", {"class": "card-footer"}) card_footer_list = album_card.find_all("div", {"class": "card-footer"})
@ -1009,9 +963,105 @@ class Musify(Page):
else: else:
self.LOGGER.debug("there is not even 1 footer in the album card") self.LOGGER.debug("there is not even 1 footer in the album card")
return Album(**album_kwargs) return Album(
title=name,
source_list=source_list,
date=timestamp,
album_type=album_type,
album_status=album_status
)
def _fetch_artist_discography(self, artist: Artist, url: MusifyUrl, artist_name: str = None, **kwargs): def _parse_album(self, soup: BeautifulSoup) -> Album:
name: str = None
source_list: List[Source] = []
artist_list: List[Artist] = []
date: ID3Timestamp = None
"""
if breadcrumb list has 4 elements, then
the -2 is the artist link,
the -1 is the album
"""
# breadcrumb
breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"})
breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"})
if len(breadcrumb_elements) == 4:
# album
album_crumb: BeautifulSoup = breadcrumb_elements[-1]
name = album_crumb.text.strip()
# artist
artist_crumb: BeautifulSoup = breadcrumb_elements[-2]
anchor: BeautifulSoup = artist_crumb.find("a")
if anchor is not None:
href = anchor.get("href")
artist_source_list: List[Source] = []
if href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip()))
span: BeautifulSoup = anchor.find("span")
if span is not None:
artist_list.append(Artist(
name=span.get_text(strip=True),
source_list=artist_source_list
))
else:
self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case")
# meta
meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"})
if meta_url is not None:
url = meta_url.get("content")
if url is not None:
source_list.append(Source(self.SOURCE_TYPE, self.HOST + url))
meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"})
if meta_name is not None:
_name = meta_name.get("content")
if _name is not None:
name = _name
# album info
album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"})
if album_info_ul is not None:
artist_anchor: BeautifulSoup
for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}):
# line 98
artist_source_list: List[Source] = []
artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"})
if artist_url_meta is not None:
artist_href = artist_url_meta.get("content")
if artist_href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href))
artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"})
if artist_meta_name is not None:
artist_name = artist_meta_name.get("content")
if artist_name is not None:
artist_list.append(Artist(
name=artist_name,
source_list=artist_source_list
))
time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"})
if time_soup is not None:
raw_datetime = time_soup.get("datetime")
if raw_datetime is not None:
try:
date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d")
except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
return Album(
title=name,
source_list=source_list,
artist_list=artist_list,
date=date
)
def _get_discography(self, url: MusifyUrl, artist_name: str = None, stop_at_level: int = 1) -> Generator[Album, None, None]:
""" """
POST https://musify.club/artist/filteralbums POST https://musify.club/artist/filteralbums
ArtistID: 280348 ArtistID: 280348
@ -1019,8 +1069,6 @@ class Musify(Page):
SortOrder.IsAscending: false SortOrder.IsAscending: false
X-Requested-With: XMLHttpRequest X-Requested-With: XMLHttpRequest
""" """
_download_all = kwargs.get("download_all", False)
_album_type_blacklist = kwargs.get("album_type_blacklist", main_settings["album_type_blacklist"])
endpoint = self.HOST + "/" + url.source_type.value + "/filteralbums" endpoint = self.HOST + "/" + url.source_type.value + "/filteralbums"
@ -1031,29 +1079,33 @@ class Musify(Page):
"X-Requested-With": "XMLHttpRequest" "X-Requested-With": "XMLHttpRequest"
}, name="discography_" + url.name_with_id) }, name="discography_" + url.name_with_id)
if r is None: if r is None:
return return []
soup: BeautifulSoup = BeautifulSoup(r.content, features="html.parser")
soup: BeautifulSoup = self.get_soup_from_response(r)
for card_soup in soup.find_all("div", {"class": "card"}): for card_soup in soup.find_all("div", {"class": "card"}):
album = self._parse_album_card(card_soup, artist_name, **kwargs) yield self._parse_album_card(card_soup, artist_name)
if album.album_type in _album_type_blacklist:
continue
artist.main_album_collection.append(album) def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
def fetch_artist(self, source: Source, **kwargs) -> Artist:
""" """
TODO fetches artist from source
[x] discography [x] discography
[x] attributes [x] attributes
[] picture gallery [] picture gallery
Args:
source (Source): the source to fetch
stop_at_level: int = 1: if it is false, every album from discograohy will be fetched. Defaults to False.
Returns:
Artist: the artist fetched
""" """
url = parse_url(source.url) url = parse_url(source.url)
artist = self._fetch_initial_artist(url, source=source, **kwargs) artist = self._get_artist_attributes(url)
self._fetch_artist_discography(artist, url, artist.name, **kwargs)
artist.main_album_collection.extend(self._get_discography(url, artist.name))
return artist return artist

View File

@ -25,6 +25,7 @@ def music_card_shelf_renderer(renderer: dict) -> List[DatabaseObject]:
results.extend(parse_renderer(sub_renderer)) results.extend(parse_renderer(sub_renderer))
return results return results
def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]: def music_responsive_list_item_flex_column_renderer(renderer: dict) -> List[DatabaseObject]:
return parse_run_list(renderer.get("text", {}).get("runs", [])) return parse_run_list(renderer.get("text", {}).get("runs", []))
@ -53,11 +54,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
for result in results: for result in results:
_map[type(result)].append(result) _map[type(result)].append(result)
if len(song_list) == 1: for song in song_list:
song = song_list[0]
song.feature_artist_collection.extend(artist_list)
song.album_collection.extend(album_list) song.album_collection.extend(album_list)
return [song] song.main_artist_collection.extend(artist_list)
for album in album_list:
album.artist_collection.extend(artist_list)
if len(song_list) > 0:
return song_list
if len(album_list) > 0:
return album_list
if len(artist_list) > 0:
return artist_list
return results return results

View File

@ -40,7 +40,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
_temp_nav = run_element.get("navigationEndpoint", {}) _temp_nav = run_element.get("navigationEndpoint", {})
is_video = "watchEndpoint" in _temp_nav is_video = "watchEndpoint" in _temp_nav
navigation_endpoint = _temp_nav.get("watchEndpoint", _temp_nav.get("browseEndpoint", {})) navigation_endpoint = _temp_nav.get("watchEndpoint" if is_video else "browseEndpoint", {})
element_type = PageType.SONG element_type = PageType.SONG
page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "") page_type_string = navigation_endpoint.get("watchEndpointMusicSupportedConfigs", {}).get("watchEndpointMusicConfig", {}).get("musicVideoType", "")
@ -51,7 +51,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
except ValueError: except ValueError:
return return
element_id = navigation_endpoint.get("videoId", navigation_endpoint.get("browseId")) element_id = navigation_endpoint.get("videoId" if is_video else "browseId")
element_text = run_element.get("text") element_text = run_element.get("text")
if element_id is None or element_text is None: if element_id is None or element_text is None:
@ -60,11 +60,7 @@ def parse_run_element(run_element: dict) -> Optional[DatabaseObject]:
if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]): if element_type == PageType.SONG or (element_type == PageType.VIDEO and not youtube_settings["youtube_music_clean_data"]) or (element_type == PageType.OFFICIAL_MUSIC_VIDEO and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/watch?v={element_id}")
return Song(title=clean_song_title(element_text), source_list=[source])
return Song(
title=clean_song_title(element_text),
source_list=[source]
)
if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]): if element_type == PageType.ARTIST or (element_type == PageType.CHANNEL and not youtube_settings["youtube_music_clean_data"]):
source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}") source = Source(SOURCE_PAGE, f"https://music.youtube.com/channel/{element_id}")

View File

@ -8,7 +8,6 @@ import json
from dataclasses import dataclass from dataclasses import dataclass
import re import re
from functools import lru_cache from functools import lru_cache
from collections import defaultdict
import youtube_dl import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE from youtube_dl.extractor.youtube import YoutubeIE
@ -18,7 +17,7 @@ from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis, traverse_json_path from ...utils import get_current_millis
from ...utils import dump_to_file from ...utils import dump_to_file
@ -31,16 +30,12 @@ from ...objects import (
Song, Song,
Album, Album,
Label, Label,
Target, Target
Lyrics,
FormattedText
) )
from ...connection import Connection from ...connection import Connection
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult from ...utils.support_classes.download_result import DownloadResult
from ._list_render import parse_renderer from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube from .super_youtube import SuperYouTube
@ -167,12 +162,6 @@ class MusicKrakenYoutubeIE(YoutubeIE):
ALBUM_TYPE_MAP = {
"Single": AlbumType.SINGLE,
"Album": AlbumType.STUDIO_ALBUM,
"EP": AlbumType.EP,
}
class YoutubeMusic(SuperYouTube): class YoutubeMusic(SuperYouTube):
# CHANGE # CHANGE
@ -412,7 +401,7 @@ class YoutubeMusic(SuperYouTube):
return results return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist = Artist(source_list=[source]) artist = Artist()
# construct the request # construct the request
url = urlparse(source.url) url = urlparse(source.url)
@ -432,19 +421,6 @@ class YoutubeMusic(SuperYouTube):
if DEBUG: if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
# artist details
data: dict = r.json()
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
artist.name = title_runs[0].get("text", artist.name)
# fetch discography
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@ -489,46 +465,6 @@ class YoutubeMusic(SuperYouTube):
if DEBUG: if DEBUG:
dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False) dump_to_file(f"{browse_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
# album details
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
if len(title_runs) > 0:
album.title = title_runs[0].get("text", album.title)
def other_parse_run(run: dict) -> str:
nonlocal album
if "text" not in run:
return
text = run["text"]
is_text_field = len(run.keys()) == 1
# regex that text is a year
if is_text_field and re.match(r"\d{4}", text):
album.date = ID3Timestamp.strptime(text, "%Y")
return
if text in ALBUM_TYPE_MAP:
album.album_type = ALBUM_TYPE_MAP[text]
return
if not is_text_field:
r = parse_run_element(run)
if r is not None:
album.add_list_of_other_objects([r])
return
for _run in subtitle_runs:
other_parse_run(_run)
# tracklist
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[ renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", []) 0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
@ -536,67 +472,20 @@ class YoutubeMusic(SuperYouTube):
for i, content in enumerate(renderer_list): for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False) dump_to_file(f"{i}-album-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
results = []
"""
cant use fixed indices, because if something has no entries, the list dissappears
instead I have to try parse everything, and just reject community playlists and profiles.
"""
for renderer in renderer_list: for renderer in renderer_list:
album.add_list_of_other_objects(parse_renderer(renderer)) results.extend(parse_renderer(renderer))
for song in album.song_collection: album.add_list_of_other_objects(results)
for song_source in song.source_collection:
song_source.additional_data["playlist_id"] = browse_id
return album return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
}
if playlist_id is not None:
request_data["playlistId"] = playlist_id
tab_request = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/next", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_tabs_{video_id}.json",
)
if tab_request is None:
return None
dump_to_file(f"fetch_song_tabs_{video_id}.json", tab_request.text, is_json=True, exit_after_dump=False)
tab_data: dict = tab_request.json()
tabs = traverse_json_path(tab_data, "contents.singleColumnMusicWatchNextResultsRenderer.tabbedRenderer.watchNextTabbedResultsRenderer.tabs", default=[])
browse_id = None
for tab in tabs:
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is None:
return None
r = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/browse", query=f"prettyPrint=false"),
json={
"browseId": browse_id,
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}}
},
name=f"fetch_song_lyrics_{video_id}.json"
)
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
lyrics_text = traverse_json_path(data, "contents.sectionListRenderer.contents[0].musicDescriptionShelfRenderer.description.runs[0].text", default=None)
if lyrics_text is None:
return None
return Lyrics(FormattedText(plain=lyrics_text))
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
ydl_res: dict = {} ydl_res: dict = {}
@ -609,19 +498,7 @@ class YoutubeMusic(SuperYouTube):
self.fetch_media_url(source=source, ydl_res=ydl_res) self.fetch_media_url(source=source, ydl_res=ydl_res)
artist_names = [] artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic")
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
artist_list = [
Artist(
name=name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
) for name in artist_names]
album_list = [] album_list = []
if "album" in ydl_res: if "album" in ydl_res:
@ -630,57 +507,25 @@ class YoutubeMusic(SuperYouTube):
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"), date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
)) ))
artist_name = artist_names[0] if len(artist_names) > 0 else None return Song(
song = Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)), title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"), note=ydl_res.get("descriptions"),
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])), artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list, main_artist_list=[Artist(
name=artist_name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
)],
source_list=[Source( source_list=[Source(
SourcePages.YOUTUBE_MUSIC, SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}" f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
), source], ), source],
) )
# other song details
parsed_url = urlparse(source.url)
browse_id = parse_qs(parsed_url.query)['v'][0]
request_data = {
"captionParams": {},
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": browse_id,
}
if "playlist_id" in source.additional_data:
request_data["playlistId"] = source.additional_data["playlist_id"]
initial_details = self.yt_music_connection.post(
url=get_youtube_url(path="/youtubei/v1/player", query=f"prettyPrint=false"),
json=request_data,
name=f"fetch_song_{browse_id}.json",
)
if initial_details is None:
return song
dump_to_file(f"fetch_song_{browse_id}.json", initial_details.text, is_json=True, exit_after_dump=False)
data = initial_details.json()
video_details = data.get("videoDetails", {})
browse_id = video_details.get("videoId", browse_id)
song.title = video_details.get("title", song.title)
if video_details.get("isLiveContent", False):
for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
return song
def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict: def fetch_media_url(self, source: Source, ydl_res: dict = None) -> dict:
def _get_best_format(format_list: List[Dict]) -> dict: def _get_best_format(format_list: List[Dict]) -> dict:

View File

@ -3,35 +3,24 @@ from pathlib import Path
import json import json
import logging import logging
import inspect import inspect
from typing import List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .path_manager import LOCATIONS from .path_manager import LOCATIONS
from .hacking import merge_args
""" """
IO functions IO functions
""" """
def _apply_color(msg: str, color: BColors) -> str: def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str):
msg = str(msg)
endc = BColors.ENDC.value
if color is BColors.ENDC: if color is BColors.ENDC:
return msg return msg
msg = msg.replace(BColors.ENDC.value, BColors.ENDC.value + color.value)
return color.value + msg + BColors.ENDC.value return color.value + msg + BColors.ENDC.value
@merge_args(print) def output(msg: str, color: BColors = BColors.ENDC):
def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs): print(_apply_color(msg, color))
print(*(_apply_color(s, color) for s in msg), **kwargs)
def user_input(msg: str, color: BColors = BColors.ENDC): def user_input(msg: str, color: BColors = BColors.ENDC):
@ -82,43 +71,6 @@ def object_trace(obj):
misc functions misc functions
""" """
def traverse_json_path(data, path: Union[str, List[str]], default=None):
"""
Path parts are concatenated with . or wrapped with [""] for object keys and wrapped in [] for array indices.
"""
if isinstance(path, str):
path = path.replace('["', '.').replace('"]', '.').replace("[", ".").replace("]", ".")
path = [p for p in path.split(".") if len(p) > 0]
if len(path) <= 0:
return data
current = path[0]
path = path[1:]
new_data = None
if isinstance(data, dict):
new_data = data.get(current)
elif isinstance(data, list):
try:
new_data = data[int(current)]
except (IndexError, ValueError):
pass
if new_data is None:
return default
return traverse_json_path(data=new_data, path=path, default=default)
_auto_increment = 0
def generate_id() -> int:
global _auto_increment
_auto_increment += 1
return _auto_increment
def get_current_millis() -> int: def get_current_millis() -> int:
dt = datetime.now() dt = datetime.now()
return int(dt.microsecond / 1_000) return int(dt.microsecond / 1_000)

View File

@ -9,32 +9,42 @@ class SourceTypes(Enum):
class SourcePages(Enum): class SourcePages(Enum):
YOUTUBE = "youtube", "https://www.youtube.com/" YOUTUBE = "youtube"
MUSIFY = "musify", "https://musify.club/" MUSIFY = "musify"
YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/" YOUTUBE_MUSIC = "youtube music"
GENIUS = "genius", "https://genius.com/" GENIUS = "genius"
MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/" MUSICBRAINZ = "musicbrainz"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum" ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp", "https://bandcamp.com/" BANDCAMP = "bandcamp"
DEEZER = "deezer", "https://www.deezer.com/" DEEZER = "deezer"
SPOTIFY = "spotify", "https://open.spotify.com/" SPOTIFY = "spotify"
# This has nothing to do with audio, but bands can be here # This has nothing to do with audio, but bands can be here
WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page" WIKIPEDIA = "wikipedia"
INSTAGRAM = "instagram", "https://www.instagram.com/" INSTAGRAM = "instagram"
FACEBOOK = "facebook", "https://www.facebook.com/" FACEBOOK = "facebook"
TWITTER = "twitter", "https://twitter.com/" TWITTER = "twitter" # I will use nitter though lol
MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE MYSPACE = "myspace" # Yes somehow this ancient site is linked EVERYWHERE
MANUAL = "manual", "" MANUAL = "manual"
PRESET = "preset", "" PRESET = "preset"
def __new__(cls, value, homepage = None):
member = object.__new__(cls)
member._value_ = value
member.homepage = homepage
return member
@classmethod
def get_homepage(cls, attribute) -> str:
homepage_map = {
cls.YOUTUBE: "https://www.youtube.com/",
cls.MUSIFY: "https://musify.club/",
cls.MUSICBRAINZ: "https://musicbrainz.org/",
cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/",
cls.GENIUS: "https://genius.com/",
cls.BANDCAMP: "https://bandcamp.com/",
cls.DEEZER: "https://www.deezer.com/",
cls.INSTAGRAM: "https://www.instagram.com/",
cls.FACEBOOK: "https://www.facebook.com/",
cls.SPOTIFY: "https://open.spotify.com/",
cls.TWITTER: "https://twitter.com/",
cls.MYSPACE: "https://myspace.com/",
cls.WIKIPEDIA: "https://en.wikipedia.org/wiki/Main_Page"
}
return homepage_map[attribute]

View File

@ -1,11 +1 @@
class MKBaseException(Exception): __all__ = ["config"]
def __init__(self, message: str = None, **kwargs) -> None:
self.message = message
super().__init__(message, **kwargs)
class MKFrontendException(MKBaseException):
pass
class MKInvalidInputException(MKFrontendException):
pass

View File

@ -78,14 +78,7 @@ def _merge(
drop_args = [] drop_args = []
if drop_kwonlyargs is None: if drop_kwonlyargs is None:
drop_kwonlyargs = [] drop_kwonlyargs = []
is_builtin = False
try:
source_spec = inspect.getfullargspec(source) source_spec = inspect.getfullargspec(source)
except TypeError:
is_builtin = True
source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], [])
dest_spec = inspect.getfullargspec(dest) dest_spec = inspect.getfullargspec(dest)
if source_spec.varargs or source_spec.varkw: if source_spec.varargs or source_spec.varkw:
@ -135,15 +128,13 @@ def _merge(
'co_kwonlyargcount': len(kwonlyargs_merged), 'co_kwonlyargcount': len(kwonlyargs_merged),
'co_posonlyargcount': dest.__code__.co_posonlyargcount, 'co_posonlyargcount': dest.__code__.co_posonlyargcount,
'co_nlocals': len(args_all), 'co_nlocals': len(args_all),
'co_flags': source.__code__.co_flags,
'co_varnames': args_all, 'co_varnames': args_all,
'co_filename': dest.__code__.co_filename, 'co_filename': dest.__code__.co_filename,
'co_name': dest.__code__.co_name, 'co_name': dest.__code__.co_name,
'co_firstlineno': dest.__code__.co_firstlineno, 'co_firstlineno': dest.__code__.co_firstlineno,
} }
if hasattr(source, "__code__"):
replace_kwargs['co_flags'] = source.__code__.co_flags
if PY310: if PY310:
replace_kwargs['co_linetable'] = dest.__code__.co_linetable replace_kwargs['co_linetable'] = dest.__code__.co_linetable
else: else:
@ -160,7 +151,7 @@ def _merge(
len(kwonlyargs_merged), len(kwonlyargs_merged),
_blank.__code__.co_nlocals, _blank.__code__.co_nlocals,
_blank.__code__.co_stacksize, _blank.__code__.co_stacksize,
source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags, source.__code__.co_flags,
_blank.__code__.co_code, (), (), _blank.__code__.co_code, (), (),
args_all, dest.__code__.co_filename, args_all, dest.__code__.co_filename,
dest.__code__.co_name, dest.__code__.co_name,
@ -180,9 +171,6 @@ def _merge(
dest_ret = dest.__annotations__['return'] dest_ret = dest.__annotations__['return']
for v in ('__kwdefaults__', '__annotations__'): for v in ('__kwdefaults__', '__annotations__'):
if not hasattr(source, v):
continue
out = getattr(source, v) out = getattr(source, v)
if out is None: if out is None:
out = {} out = {}

View File

@ -20,7 +20,6 @@ DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False DEBUG_DUMP = DEBUG and False
DEBUG_PRINT_ID = DEBUG and True
if DEBUG: if DEBUG:
print("DEBUG ACTIVE") print("DEBUG ACTIVE")

View File

@ -6,7 +6,6 @@ from functools import lru_cache
from transliterate.exceptions import LanguageDetectionError from transliterate.exceptions import LanguageDetectionError
from transliterate import translit from transliterate import translit
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
@ -22,7 +21,6 @@ def unify(string: str) -> str:
returns a unified str, to make comparisons easy. returns a unified str, to make comparisons easy.
a unified string has the following attributes: a unified string has the following attributes:
- is lowercase - is lowercase
- is transliterated to Latin characters from e.g. Cyrillic
""" """
if string is None: if string is None:
@ -33,8 +31,7 @@ def unify(string: str) -> str:
except LanguageDetectionError: except LanguageDetectionError:
pass pass
string = unify_punctuation(string) return string.lower()
return string.lower().strip()
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]:
@ -52,14 +49,7 @@ def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Uni
string = string[1:] string = string[1:]
string = string.replace("/", "_").replace("\\", "_") string = string.replace("/", "_").replace("\\", "_")
try:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
string = sanitize_filename(string) string = sanitize_filename(string)
return string return string
if isinstance(string, Path): if isinstance(string, Path):
@ -137,45 +127,13 @@ UNIFY_TO = " "
ALLOWED_LENGTH_DISTANCE = 20 ALLOWED_LENGTH_DISTANCE = 20
def unify_punctuation(to_unify: str, unify_to: str = UNIFY_TO) -> str: def unify_punctuation(to_unify: str) -> str:
for char in string.punctuation: for char in string.punctuation:
to_unify = to_unify.replace(char, unify_to) to_unify = to_unify.replace(char, UNIFY_TO)
return to_unify return to_unify
@lru_cache(maxsize=128) def hash_url(url: str) -> int:
def hash_url(url: Union[str, ParseResult]) -> str: return url.strip().lower().lstrip("https://").lstrip("http://")
if isinstance(url, str):
url = urlparse(url)
unify_to = "-"
def unify_part(part: str) -> str:
nonlocal unify_to
return unify_punctuation(part.lower(), unify_to=unify_to).strip(unify_to)
# netloc
netloc = unify_part(url.netloc)
if netloc.startswith("www" + unify_to):
netloc = netloc[3 + len(unify_to):]
# query
query = url.query
query_dict: Optional[dict] = None
try:
query_dict: dict = parse_qs(url.query, strict_parsing=True)
except ValueError:
# the query couldn't be parsed
pass
if isinstance(query_dict, dict):
# sort keys alphabetically
query = ""
for key, value in sorted(query_dict.items(), key=lambda i: i[0]):
query += f"{key.strip()}-{''.join(i.strip() for i in value)}"
r = f"{netloc}_{unify_part(url.path)}_{unify_part(query)}"
r = r.lower().strip()
return r
def remove_feature_part_from_track(title: str) -> str: def remove_feature_part_from_track(title: str) -> str:

View File

View File

@ -1,35 +0,0 @@
import unittest
from music_kraken.utils.string_processing import hash_url
class TestCollection(unittest.TestCase):
def test_remove_schema(self):
self.assertFalse(hash_url("https://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("ftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("sftp://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
self.assertFalse(hash_url("http://www.youtube.com/watch?v=3jZ_D3ELwOQ").startswith("https"))
def test_no_punctuation(self):
self.assertNotIn(hash_url("https://www.you_tube.com/watch?v=3jZ_D3ELwOQ"), "you_tube")
self.assertNotIn(hash_url("https://docs.gitea.com/next/install.ation/comparison"), ".")
def test_three_parts(self):
"""
The url is parsed into three parts [netloc; path; query]
Which are then appended to each other with an underscore between.
"""
self.assertTrue(hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web").count("_") == 2)
def test_sort_query(self):
"""
The query is sorted alphabetically
"""
hashed = hash_url("https://duckduckgo.com/?t=h_&q=dfasf&ia=web")
sorted_keys = ["ia-", "q-", "t-"]
self.assertTrue(hashed.index(sorted_keys[0]) < hashed.index(sorted_keys[1]) < hashed.index(sorted_keys[2]))
if __name__ == "__main__":
unittest.main()