Compare commits

..

119 Commits

Author SHA1 Message Date
810aff4163 Merge pull request 'feature/artwork_gallery' (#41) from feature/artwork_gallery into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #41
2024-07-15 09:36:21 +00:00
5ce76c758e feat: genius fixes and duplicate detection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-07-02 17:20:25 +02:00
93c9a367a2 feat: image hash implemented
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-07-01 14:59:51 +02:00
17c28722fb feat: musify ArtworkCollection simple function
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-17 14:50:17 +02:00
dd99e60afd fix: circular input
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-11 14:58:04 +02:00
274f1bce90 feat: implemented fetching of artworks on compile
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-11 14:54:36 +02:00
b1a306f3f3 fix: implemented artwork.add_data 2024-06-11 14:34:58 +02:00
4ee6fd2137 feat:a lot of nonsences
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-10 12:23:12 +02:00
2da7a48b72 feat: added compile
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:27:55 +02:00
346d273201 feat: added extend
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:17:47 +02:00
eef3ea7f07 feat: removed distracting code
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-07 11:15:23 +02:00
01dffc2443 Merge branch 'feature/artwork_gallery' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into feature/artwork_gallery
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-06 17:53:44 +02:00
4e50bb1fba draft implemented add_data 2024-06-06 17:53:17 +02:00
8e3ec0f4ed Merge branch 'feature/artwork_gallery' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into feature/artwork_gallery
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-05 13:45:25 +02:00
d447b10380 feat: youtube music album and artist artwork 2024-06-05 13:33:18 +02:00
df98a70717 feat: renamed artwork
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 12:05:38 +02:00
3118140f0f feat: fix saving img in tmp
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 09:47:02 +02:00
7d23ecac06 feat: bandcamp artist artwork
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-05 08:34:37 +02:00
d83e40ed83 feat: config changes
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 11:44:48 +02:00
d51e3a56fb feat: structure changes to artwork and collection objects
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-06-04 11:04:00 +02:00
05ee09e25f feat: musify completed
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:58:21 +02:00
1ef4b27f28 feat: added album.artwork to datastructure
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:31:23 +02:00
eb8fd5e580 feat: added artist.artwork to data structure 2024-06-04 10:13:34 +02:00
49c3734526 feat: added hooks for collection on append
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 10:11:46 +02:00
bc19a94e7f feat: added parent artwork options 2024-06-04 10:09:17 +02:00
5d26fdbf94 Artwork gallery Musify
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-04 07:58:18 +02:00
465af49057 hotfix
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-06-03 10:19:32 +02:00
2aa0f02fa5 Merge branch 'adding_genius' into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-23 13:36:10 +02:00
7b0b830d64 feat: removed legacy key
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2024-05-23 13:24:25 +02:00
1ba6c97f5a feat: more extensive browse id 2024-05-23 13:20:34 +02:00
c8cbfc7cb9 feat: improved output of clearing the cache 2024-05-23 13:17:14 +02:00
344da0a0bf fix: converting pictures to rgb before saving
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:20:26 +02:00
49dc7093c8 fix: genius fallback
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-22 15:18:43 +02:00
90f70638b4 feat: better lyrics support
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:55:08 +02:00
7b4eee858a feat: parsed script json
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 17:14:58 +02:00
f61b34dd40 feat: improved feature artists by also adding writer and producer to it
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:52:01 +02:00
688b4fd357 feat: getting the album tracklist
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:47:38 +02:00
769d27dc5c feat: album details
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:43:52 +02:00
f5d953d9ce feat: theoretically fetching feature songs
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 16:34:04 +02:00
46b64b8f8d feat: fetched the flat artist details 2024-05-21 16:23:05 +02:00
adfce16d2a feat: fetched the flat artist details 2024-05-21 16:21:58 +02:00
e4fd9faf12 feat: detecting url type 2024-05-21 15:57:09 +02:00
f6caee41a8 feat: finished searching genious
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-21 15:52:41 +02:00
068c749c38 feat: implemented artist search 2024-05-21 15:27:10 +02:00
c131924577 Merge pull request 'fix/clean_feature_artists' (#38) from fix/clean_feature_artists into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline was successful
Reviewed-on: #38
2024-05-21 12:08:04 +00:00
8cdb5c1f99 fix: tests were a mess and didn't properly test the functionality but random things that worked with implementation
All checks were successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-21 13:52:20 +02:00
356ba658ce fix: lyrics enpoint could crash the whole program 2024-05-21 13:37:26 +02:00
000a6c0dba feat: added type identifier to option strings 2024-05-17 18:24:56 +02:00
83a3334f1a changed default value
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:55:58 +02:00
ab61ff7e9b feat: added the option to select all at options at the same time
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-17 13:37:49 +02:00
3cb35909d1 feat: added option to just fetch the objects in select 2024-05-17 13:31:46 +02:00
e87075a809 feat: changed syncing from event based to reference
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:39:12 +02:00
86e985acec fix: files don't contain id anymore
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:30:53 +02:00
a70a24d93e feat: minor adjustments
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 17:14:18 +02:00
2c1ac0f12d fix: wrong collection 2024-05-16 17:09:36 +02:00
897897dba2 feat: added recursive structure 2024-05-16 14:29:50 +02:00
adcf26b518 feat: renamed main album collection to album collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:10:00 +02:00
8ccc28daf8 draft: added feature artist collection attr 2024-05-16 14:09:34 +02:00
2b3f4d82d9 feat: renamed main_artist_collection to artist_collection
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-16 14:05:33 +02:00
41a91a6afe feat: removing dependence beteen artist.album and album.artist 2024-05-16 13:41:06 +02:00
82df96a193 Merge pull request 'feature/move_download_code_to_download' (#34) from feature/move_download_code_to_download into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #34
2024-05-15 15:24:30 +00:00
80ad2727de fix: stream retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-15 17:14:01 +02:00
19b83ce880 fix: saving streaming progress on retry
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 15:04:00 +02:00
1bf04439f0 fix: setting the genre of the song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:51:30 +02:00
bab6aeb45d fix: removed double linebreaks from formated text, plaintext
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:26:19 +02:00
98afe5047d fix: wrong creation of source types
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:21:15 +02:00
017752c4d0 feat: better download output
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 14:10:32 +02:00
ea4c73158e fix: audio format is replaced completely
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:58:44 +02:00
0096dfe5cb feat: copying the downloaded music into the final locations
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-15 13:17:36 +02:00
bedd0fe819 fix: runtime errors
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 13:16:11 +02:00
ac6c513d56 draft: post process song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 12:30:54 +02:00
cc14253239 draft: streaming the audio 2024-05-15 12:18:08 +02:00
14f986a497 draft: rewrote sources 2024-05-15 11:44:39 +02:00
da8887b279 draft: rewriting soure
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-14 15:18:17 +02:00
Hellow
bb32fc7647 draft: rewriting downloading
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-14 00:28:05 +02:00
Hellow
8c369d79e4 draft: rewriting downloading
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 21:51:32 +02:00
Hellow
b09d6f2691 draft: rewriting downloading 2024-05-13 21:45:12 +02:00
0e6fe8187a feat: fetch_from_url
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 18:09:11 +02:00
0343c11a62 feat: migrated fetch details and from source
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 18:03:20 +02:00
9769cf4033 Merge pull request 'fix/caching_signatures' (#32) from fix/caching_signatures into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #32
2024-05-13 15:15:37 +00:00
55024bd987 fix: key error
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-13 17:15:15 +02:00
d85498869d feat: tracksort and albumsort + some other stuff
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 14:22:33 +02:00
c3350b016d fix: timeout for yt music stream
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 13:39:57 +02:00
788103a68e fix: removed invalid stuff
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-13 13:28:54 +02:00
5179c64161 Merge branch 'experimental' of ssh://gitea.elara.ws:2222/music-kraken/music-kraken-core into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:53:39 +02:00
04405f88eb Merge branch 'fix/musify_scrapes_year_as_artist' into experimental 2024-05-10 17:52:11 +02:00
acd183c90e fix: bandcamp
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-10 17:39:30 +02:00
7186f06ce6 feat: improved interface
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:33:07 +02:00
6e354af0d1 feat: added proper settings
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 17:06:40 +02:00
155f239c8a feat: changed ids for audio tempfiles to random id instead of increment id
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:32:14 +02:00
36db651dfa fix: cleaning the song name deleted the song if the song name was the same as the artist name
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:25:11 +02:00
8426f6e2ea fix: filtered another year
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-10 15:20:22 +02:00
75d0a83d14 fix: changed dependency
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-09 10:57:55 +02:00
Hellow
2af577c0cd fix: removed empty objects
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-08 21:06:40 +02:00
Hellow
3780f05e58 feat: added launch.json 2024-05-08 16:48:27 +02:00
Hellow
a0305a7a6e fix: don't add year as artist 2024-05-08 16:47:56 +02:00
949583225a Merge pull request 'Correct duplicate values' (#22) from issue16 into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #22
2024-05-08 12:33:34 +00:00
4e0b005170 Merge branch 'experimental' into issue16
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-08 12:33:56 +02:00
e3d7ed8837 Merge pull request 'fix/musify_artist_spam' (#27) from fix/musify_artist_spam into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #27
2024-05-08 10:31:23 +00:00
e3e7aea959 fix for lyrics
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-08 12:27:56 +02:00
9d4e3e8545 fix: bounds get respected
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-08 12:23:16 +02:00
9c63e8e55a fix: correct collections
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-08 12:09:41 +02:00
a97f8872c8 fix: refetching release title from album card
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-08 09:57:11 +02:00
a5f8057b82 feat: improved initialization of data objects 2024-05-08 09:44:18 +02:00
e3e547c232 feat: improved musify 2024-05-08 09:15:41 +02:00
12c0bf6b83 Merge pull request 'ci: make tags release to the music-kraken pypi package instead of music-kraken-stable' (#24) from ci/remove-stable-package into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #24
2024-05-07 21:17:11 +00:00
ac9a74138c ci: make tags release to the music-kraken pypi package instead of music-kraken-stable
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-07 16:07:45 +00:00
960d3b74ac feat: prevent collection albums from being fetched from musify
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 14:59:28 +02:00
585e8c9671 Merge pull request 'feature/add_merge_command' (#23) from feature/add_merge_command into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #23
2024-05-07 12:15:07 +00:00
4f9261505e fix: skip insterval works
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
ci/woodpecker/pull_request_closed/woodpecker Pipeline was successful
2024-05-07 13:59:29 +02:00
08b9492455 fix: am source thing
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 13:55:09 +02:00
9d0dcb412b feat: added m string
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 13:34:18 +02:00
709c5ebaa8 Correct duplicate values
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/pr/woodpecker Pipeline was successful
2024-05-07 12:34:24 +02:00
17c26c5140 feat: added links to wiki
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 11:17:36 +02:00
0a589d9c64 feat: added links to wiki
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 11:15:20 +02:00
8abb89ea48 feat: updated readme
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 10:37:02 +02:00
3951394ede feat: improved data structure docs
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 09:10:04 +02:00
73f26e121c feat: updated installing instructions
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-07 08:53:41 +02:00
3be6c71dcd Merge pull request 'fix/reindex_before_collection' (#21) from fix/reindex_before_collection into experimental
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Reviewed-on: #21
2024-05-06 17:36:27 +00:00
48 changed files with 2290 additions and 1548 deletions

22
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Python Debugger: Download script",
"type": "debugpy",
"request": "launch",
"program": "development/actual_donwload.py",
"console": "integratedTerminal"
}
]
}

View File

@ -19,13 +19,22 @@
"albumsort",
"APIC",
"Bandcamp",
"bitrate",
"CALLSTACK",
"DEEZER",
"dotenv",
"encyclopaedia",
"ENDC",
"Gitea",
"iframe",
"isrc",
"itemprop",
"levenshtein",
"metallum",
"MUSICBRAINZ",
"musify",
"OKBLUE",
"OKGREEN",
"pathvalidate",
"Referer",
"sponsorblock",

View File

@ -11,7 +11,6 @@ steps:
build-stable:
image: python
commands:
- sed -i 's/name = "music-kraken"/name = "music-kraken-stable"/' pyproject.toml
- python -m pip install -r requirements-dev.txt
- python3 -m build
environment:

228
README.md
View File

@ -2,61 +2,43 @@
[![Woodpecker CI Status](https://ci.elara.ws/api/badges/59/status.svg)](https://ci.elara.ws/repos/59)
<img src="assets/logo.svg" width=300 alt="music kraken logo"/>
<img src="https://gitea.elara.ws/music-kraken/music-kraken-core/media/branch/experimental/assets/logo.svg" width=300 alt="music kraken logo"/>
- [Music Kraken](#music-kraken)
- [Installation](#installation)
- [From source](#from-source)
- [Notes for WSL](#notes-for-wsl)
- [Quick-Guide](#quick-guide)
- [Query](#query)
- [CONTRIBUTE](#contribute)
- [Matrix Space](#matrix-space)
- [TODO till the next release](#todo-till-the-next-release)
- [Programming Interface / Use as Library](#programming-interface--use-as-library)
- [Quick Overview](#quick-overview)
- [Data Model](#data-model)
- [Data Objects](#data-objects)
- [Creation](#creation)
- [Installation](#installation)
- [Quick-Guide](#quick-guide)
- [How to search properly](#query)
- [Matrix Space](#matrix-space)
If you want to use this a library or contribute, check out [the wiki](https://gitea.elara.ws/music-kraken/music-kraken-core/wiki) for more information.
---
## Installation
You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package,
or simply the source code from [GitHub](https://github.com/HeIIow2/music-downloader). Note that even though
everything **SHOULD** work cross-platform, I have only tested it on Ubuntu.
If you enjoy this project, feel free to give it a star on GitHub.
You can find and get this project from either [PyPI](https://pypi.org/project/music-kraken/) as a Python-Package,
or simply the source code from [Gitea](https://gitea.elara.ws/music-kraken/music-kraken-core). **
> THE PyPI PACKAGE IS OUTDATED
**NOTES**
- Even though everything **SHOULD** work cross-platform, I have only tested it on Ubuntu.
- If you enjoy this project, feel free to give it a star on GitHub.
### From source
if you use Debian or Ubuntu:
```sh
git clone https://github.com/HeIIow2/music-downloader
sudo apt install pandoc
cd music-downloader/
python3 -m pip install -r requirements.txt
git clone https://gitea.elara.ws/music-kraken/music-kraken-core.git
python3 -m pip install -e music-kraken-core/
```
then you can add to `~/.bashrc`
To update the program, if installed like this, go into the `music-kraken-core` directory and run `git pull`.
```
alias music-kraken='cd your/directory/music-downloader/src; python3 -m music_kraken'
alias 🥺='sudo'
```
### Get it running on other Systems
```sh
source ~/.bashrc
music-kraken
```
Here are the collected issues, that are related to running the program on different systems. If you have any issues, feel free to open a new one.
### Notes for WSL
#### Windows + WSL
If you choose to run it in WSL, make sure ` ~/.local/bin` is added to your `$PATH` [#2][i2]
Add ` ~/.local/bin` to your `$PATH`. [#2][i2]
## Quick-Guide
@ -87,10 +69,6 @@ The escape character is as usual `\`.
---
## CONTRIBUTE
I am happy about every pull request. To contribute look [here](contribute.md).
## Matrix Space
<img align="right" alt="music-kraken logo" src="assets/element_logo.png" width=100>
@ -99,171 +77,5 @@ I decided against creating a discord server, due to various communities get ofte
**Click [this invitation](https://matrix.to/#/#music-kraken:matrix.org) _([https://matrix.to/#/#music-kraken:matrix.org](https://matrix.to/#/#music-kraken:matrix.org))_ to join.**
## TODO till the next release
> These Points will most likely be in the changelogs.
- [x] Migrate away from pandoc, to a more lightweight alternative, that can be installed over PiPY.
- [ ] Update the Documentation of the internal structure. _(could be pushed back one release)_
---
# Programming Interface / Use as Library
This application is $100\%$ centered around Data. Thus, the most important thing for working with musik kraken is, to understand how I structured the data.
## Quick Overview
- explanation of the [Data Model](#data-model)
- how to use the [Data Objects](#data-objects)
- further Dokumentation of _hopefully_ [most relevant classes](documentation/objects.md)
- the [old implementation](documentation/old_implementation.md)
```mermaid
---
title: Quick Overview (outdated)
---
sequenceDiagram
participant pg as Page (eg. YouTube, MB, Musify, ...)
participant obj as DataObjects (eg. Song, Artist, ...)
participant db as DataBase
obj ->> db: write
db ->> obj: read
pg -> obj: find a source for any page, for object.
obj -> pg: add more detailed data from according page.
obj -> pg: if available download audio to target.
```
## Data Model
The Data Structure, that the whole programm is built on looks as follows:
```mermaid
---
title: Music Data
---
erDiagram
Target {
}
Lyrics {
}
Song {
}
Album {
}
Artist {
}
Label {
}
Source {
}
Source }o--|| Song : ""
Source }o--|| Lyrics : ""
Source }o--|| Album : ""
Source }o--|| Artist : ""
Source }o--|| Label : ""
Song }o--o{ Album : AlbumSong
Album }o--o{ Artist : ArtistAlbum
Song }o--o{ Artist : "ArtistSong (features)"
Label }o--o{ Album : LabelAlbum
Label }o--o{ Artist : LabelSong
Song ||--o{ Lyrics : ""
Song ||--o{ Target : ""
```
Ok now this **WILL** look intimidating, thus I break it down quickly.
*That is also the reason I didn't add all Attributes here.*
The most important Entities are:
- Song
- Album
- Artist
- Label
All of them *(and Lyrics)* can have multiple Sources, and every Source can only Point to one of those Element.
The `Target` Entity represents the location on the hard drive a Song has. One Song can have multiple download Locations.
The `Lyrics` Entity simply represents the Lyrics of each Song. One Song can have multiple Lyrics, e.g. Translations.
Here is the simplified Diagramm without only the main Entities.
```mermaid
---
title: simplified Music Data
---
erDiagram
Song {
}
Album {
}
Artist {
}
Label {
}
Song }o--o{ Album : AlbumSong
Album }o--o{ Artist : ArtistAlbum
Song }o--o{ Artist : "ArtistSong (features)"
Label }o--o{ Album : LabelAlbum
Label }o--o{ Artist : LabelSong
```
Looks way more manageable, doesn't it?
The reason every relation here is a `n:m` *(many to many)* relation is not, that it makes sense in the aspekt of modeling reality, but to be able to put data from many Sources in the same Data Model.
Every Service models Data a bit different, and projecting a one-to-many relationship to a many to many relationship without data loss is easy. The other way around it is basically impossible
## Data Objects
> Not 100% accurate yet and *might* change slightly
### Creation
```python
# needs to be added
```
If you just want to start implementing, then just use the code example I provided, I don't care.
For those who don't want any bugs and use it as intended *(which is recommended, cuz I am only one person so there are defs bugs)* continue reading, and read the whole documentation, which may exist in the future xD
[i10]: https://github.com/HeIIow2/music-downloader/issues/10
[i2]: https://github.com/HeIIow2/music-downloader/issues/2

View File

@ -1,13 +1,13 @@
import logging
import music_kraken
import logging
print("Setting logging-level to DEBUG")
logging.getLogger().setLevel(logging.DEBUG)
if __name__ == "__main__":
commands = [
"s: #a Psychonaut 4",
"d: 0"
"s: #a Ghost Bath",
]

View File

@ -13,7 +13,7 @@ if __name__ == "__main__":
song_2 = Song(
title = "song",
main_artist_list=[other_artist]
artist_list=[other_artist]
)
other_artist.name = "main_artist"
@ -21,5 +21,5 @@ if __name__ == "__main__":
song_1.merge(song_2)
print("#" * 120)
print("main", *song_1.main_artist_collection)
print("main", *song_1.artist_collection)
print("feat", *song_1.feature_artist_collection)

View File

@ -10,12 +10,12 @@ from ..objects import Target
LOGGER = logging_settings["codex_logger"]
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None):
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
if not target.exists:
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
return
interval_list = interval_list or []
skip_intervals = skip_intervals or []
bitrate_b = int(bitrate_kb / 1024)
@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
start = 0
next_start = 0
for end, next_start in interval_list:
for end, next_start in skip_intervals:
aselect_list.append(f"between(t,{start},{end})")
start = next_start
aselect_list.append(f"gte(t,{next_start})")
@ -47,7 +47,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
# run the ffmpeg command with a progressbar
ff = FfmpegProgress(ffmpeg_command)
with tqdm(total=100, desc=f"removing {len(interval_list)} segments") as pbar:
with tqdm(total=100, desc=f"processing") as pbar:
for progress in ff.run_command_with_progress():
pbar.update(progress-pbar.n)

View File

@ -1,20 +1,21 @@
import mutagen
from mutagen.id3 import ID3, Frame, APIC
import logging
from pathlib import Path
from typing import List
import logging
import mutagen
from mutagen.id3 import APIC, ID3, USLT, Frame
from PIL import Image
from ..utils.config import logging_settings, main_settings
from ..objects import Song, Target, Metadata
from ..connection import Connection
from ..objects import Metadata, Song, Target
from ..objects.metadata import Mapping
from ..utils.config import logging_settings, main_settings
LOGGER = logging_settings["tagging_logger"]
artwork_connection: Connection = Connection()
class AudioMetadata:
def __init__(self, file_location: str = None) -> None:
self._file_location = None
@ -66,13 +67,14 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
id3_object = AudioMetadata(file_location=target.file_path)
LOGGER.info(str(metadata))
if song.artwork.best_variant is not None:
best_variant = song.artwork.best_variant
## REWRITE COMPLETLY !!!!!!!!!!!!
if len(song.artwork._data) != 0:
variants = song.artwork._data.__getitem__(0)
best_variant = variants.variants.__getitem__(0)
r = artwork_connection.get(
url=best_variant["url"],
name=song.artwork.get_variant_name(best_variant),
url=best_variant.url,
name=best_variant.url,
)
temp_target: Target = Target.temp()
@ -92,6 +94,10 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
# resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(converted_target.file_path, "JPEG")
# https://stackoverflow.com/questions/70228440/mutagen-how-can-i-correctly-embed-album-art-into-mp3-file-so-that-i-can-see-t
@ -102,11 +108,14 @@ def write_metadata_to_target(metadata: Metadata, target: Target, song: Song):
mime="image/jpeg",
type=3,
desc=u"Cover",
data=converted_target.read_bytes(),
data=converted_target.raw_content,
)
)
mutagen_file = mutagen.File(target.file_path)
id3_object.frames.delall("USLT")
uslt_val = metadata.get_id3_value(Mapping.UNSYNCED_LYRICS)
id3_object.frames.add(
USLT(encoding=3, lang=u'eng', desc=u'desc', text=uslt_val)
)
id3_object.add_metadata(metadata)
id3_object.save()

View File

@ -6,16 +6,18 @@ import re
from .utils import cli_function
from .options.first_config import initial_config
from ..utils import output, BColors
from ..utils.config import write_config, main_settings
from ..utils.shared import URL_PATTERN
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKInvalidInputException
from ..utils.exception.download import UrlNotFoundException
from ..utils.enums.colors import BColors
from .. import console
from ..download.results import Results, Option, PageResults
from ..download.results import Results, Option, PageResults, GoToResults
from ..download.page_attributes import Pages
from ..pages import Page
from ..objects import Song, Album, Artist, DatabaseObject
@ -164,9 +166,9 @@ class Downloader:
self.genre = genre or get_genre()
self.process_metadata_anyway = process_metadata_anyway
print()
print(f"Downloading to: \"{self.genre}\"")
print()
output()
output(f"Downloading to: \"{self.genre}\"", color=BColors.HEADER)
output()
def print_current_options(self):
self.page_dict = dict()
@ -174,10 +176,8 @@ class Downloader:
print()
page_count = 0
for option in self.current_results.formated_generator(max_items_per_page=self.max_displayed_options):
for option in self.current_results.formatted_generator():
if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object)
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:
@ -226,7 +226,7 @@ class Downloader:
if album is not None:
song.album_collection.append(album)
if artist is not None:
song.main_artist_collection.append(artist)
song.artist_collection.append(artist)
return Query(raw_query=query, music_object=song)
if album is not None:
@ -249,7 +249,7 @@ class Downloader:
f"Recommendations and suggestions on sites to implement appreciated.\n"
f"But don't be a bitch if I don't end up implementing it.")
return
self.set_current_options(PageResults(page, data_object.options))
self.set_current_options(PageResults(page, data_object.options, max_items_per_page=self.max_displayed_options))
self.print_current_options()
return
@ -299,95 +299,128 @@ class Downloader:
self.set_current_options(self.pages.search(parsed_query))
self.print_current_options()
def goto(self, index: int):
def goto(self, data_object: DatabaseObject):
page: Type[Page]
music_object: DatabaseObject
try:
page, music_object = self.current_results.get_music_object_by_index(index)
except KeyError:
print()
print(f"The option {index} doesn't exist.")
print()
return
self.pages.fetch_details(data_object, stop_at_level=1)
self.pages.fetch_details(music_object)
print(music_object)
print(music_object.options)
self.set_current_options(PageResults(page, music_object.options))
self.set_current_options(GoToResults(data_object.options, max_items_per_page=self.max_displayed_options))
self.print_current_options()
def download(self, download_str: str, download_all: bool = False) -> bool:
to_download: List[DatabaseObject] = []
if re.match(URL_PATTERN, download_str) is not None:
_, music_objects = self.pages.fetch_url(download_str)
to_download.append(music_objects)
else:
index: str
for index in download_str.split(", "):
if not index.strip().isdigit():
print()
print(f"Every download thingie has to be an index, not {index}.")
print()
return False
for index in download_str.split(", "):
to_download.append(self.current_results.get_music_object_by_index(int(index))[1])
print()
print("Downloading:")
for download_object in to_download:
print(download_object.option_string)
print()
def download(self, data_objects: List[DatabaseObject], **kwargs) -> bool:
output()
if len(data_objects) > 1:
output(f"Downloading {len(data_objects)} objects...", *("- " + o.option_string for o in data_objects), color=BColors.BOLD, sep="\n")
_result_map: Dict[DatabaseObject, DownloadResult] = dict()
for database_object in to_download:
r = self.pages.download(music_object=database_object, genre=self.genre, download_all=download_all,
process_metadata_anyway=self.process_metadata_anyway)
for database_object in data_objects:
r = self.pages.download(
data_object=database_object,
genre=self.genre,
**kwargs
)
_result_map[database_object] = r
for music_object, result in _result_map.items():
print()
print(music_object.option_string)
print(result)
output()
output(music_object.option_string)
output(result)
return True
def process_input(self, input_str: str) -> bool:
input_str = input_str.strip()
processed_input: str = input_str.lower()
try:
input_str = input_str.strip()
processed_input: str = input_str.lower()
if processed_input in EXIT_COMMANDS:
return True
if processed_input in EXIT_COMMANDS:
return True
if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
if processed_input == ".":
self.print_current_options()
return False
if processed_input == "..":
if self.previous_option():
self.print_current_options()
return False
command = ""
query = processed_input
if ":" in processed_input:
_ = processed_input.split(":")
command, query = _[0], ":".join(_[1:])
do_search = "s" in command
do_fetch = "f" in command
do_download = "d" in command
do_merge = "m" in command
if do_search and (do_download or do_fetch or do_merge):
raise MKInvalidInputException(message="You can't search and do another operation at the same time.")
if do_search:
self.search(":".join(input_str.split(":")[1:]))
return False
def get_selected_objects(q: str):
if q.strip().lower() == "all":
return list(self.current_results)
indices = []
for possible_index in q.split(","):
possible_index = possible_index.strip()
if possible_index == "":
continue
i = 0
try:
i = int(possible_index)
except ValueError:
raise MKInvalidInputException(message=f"The index \"{possible_index}\" is not a number.")
if i < 0 or i >= len(self.current_results):
raise MKInvalidInputException(message=f"The index \"{i}\" is not within the bounds of 0-{len(self.current_results) - 1}.")
indices.append(i)
return [self.current_results[i] for i in indices]
selected_objects = get_selected_objects(query)
if do_merge:
old_selected_objects = selected_objects
a = old_selected_objects[0]
for b in old_selected_objects[1:]:
if type(a) != type(b):
raise MKInvalidInputException(message="You can't merge different types of objects.")
a.merge(b)
selected_objects = [a]
if do_fetch:
for data_object in selected_objects:
self.pages.fetch_details(data_object)
self.print_current_options()
return False
if do_download:
self.download(selected_objects)
return False
if len(selected_objects) != 1:
raise MKInvalidInputException(message="You can only go to one object at a time without merging.")
self.goto(selected_objects[0])
return False
except MKInvalidInputException as e:
output("\n" + e.message + "\n", color=BColors.FAIL)
help_message()
if processed_input.startswith("s: "):
self.search(input_str[3:])
return False
if processed_input.startswith("d: "):
return self.download(input_str[3:])
if processed_input.isdigit():
self.goto(int(processed_input))
return False
if processed_input != "help":
print(f"{BColors.WARNING.value}Invalid input.{BColors.ENDC.value}")
help_message()
return False
def mainloop(self):

View File

@ -6,6 +6,7 @@ from typing import List, Optional
from functools import lru_cache
import logging
from ..utils import output, BColors
from ..utils.config import main_settings
from ..utils.string_processing import fit_to_file_system
@ -136,13 +137,13 @@ class Cache:
)
self._write_attribute(cache_attribute)
cache_path = fit_to_file_system(Path(module_path, name), hidden_ok=True)
cache_path = fit_to_file_system(Path(module_path, name.replace("/", "_")), hidden_ok=True)
with cache_path.open("wb") as content_file:
self.logger.debug(f"writing cache to {cache_path}")
content_file.write(content)
def get(self, name: str) -> Optional[CacheResult]:
path = fit_to_file_system(Path(self._dir, self.module, name), hidden_ok=True)
path = fit_to_file_system(Path(self._dir, self.module, name.replace("/", "_")), hidden_ok=True)
if not path.is_file():
return None
@ -165,7 +166,7 @@ class Cache:
if ca.name == "":
continue
file = fit_to_file_system(Path(self._dir, ca.module, ca.name), hidden_ok=True)
file = fit_to_file_system(Path(self._dir, ca.module, ca.name.replace("/", "_")), hidden_ok=True)
if not ca.is_valid:
self.logger.debug(f"deleting cache {ca.id}")
@ -204,9 +205,12 @@ class Cache:
for path in self._dir.iterdir():
if path.is_dir():
for file in path.iterdir():
output(f"Deleting file {file}", color=BColors.GREY)
file.unlink()
output(f"Deleting folder {path}", color=BColors.HEADER)
path.rmdir()
else:
output(f"Deleting folder {path}", color=BColors.HEADER)
path.unlink()
self.cached_attributes.clear()

View File

@ -1,12 +1,12 @@
from __future__ import annotations
import copy
import inspect
import logging
import threading
import time
from typing import List, Dict, Optional, Set
from urllib.parse import urlparse, urlunsplit, ParseResult
import copy
import inspect
from typing import TYPE_CHECKING, Dict, List, Optional, Set
from urllib.parse import ParseResult, urlparse, urlunsplit
import requests
import responses
@ -14,12 +14,15 @@ from tqdm import tqdm
from .cache import Cache
from .rotating import RotatingProxy
from ..objects import Target
if TYPE_CHECKING:
from ..objects import Target
from ..utils import request_trace
from ..utils.string_processing import shorten_display_url
from ..utils.config import main_settings
from ..utils.support_classes.download_result import DownloadResult
from ..utils.hacking import merge_args
from ..utils.string_processing import shorten_display_url
from ..utils.support_classes.download_result import DownloadResult
class Connection:
@ -317,7 +320,7 @@ class Connection:
name = kwargs.pop("description")
if progress > 0:
headers = dict() if headers is None else headers
headers = kwargs.get("headers", dict())
headers["Range"] = f"bytes={target.size}-"
r = self.request(
@ -366,6 +369,7 @@ class Connection:
if retry:
self.LOGGER.warning(f"Retrying stream...")
accepted_response_codes.add(206)
stream_kwargs["progress"] = progress
return Connection.stream_into(**stream_kwargs)
return DownloadResult()

View File

@ -0,0 +1,21 @@
from dataclasses import dataclass, field
from typing import Set
from ..utils.config import main_settings
from ..utils.enums.album import AlbumType
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
download_again_if_found: bool = False
process_audio_if_found: bool = False
process_metadata_if_found: bool = True

View File

@ -1,20 +1,45 @@
from typing import Tuple, Type, Dict, Set
from typing import Tuple, Type, Dict, Set, Optional, List
from collections import defaultdict
from pathlib import Path
import re
import logging
import subprocess
from PIL import Image
from . import FetchOptions, DownloadOptions
from .results import SearchResults
from ..objects import DatabaseObject, Source
from ..utils.config import youtube_settings
from ..utils.enums.source import SourcePages
from ..objects import (
DatabaseObject as DataObject,
Collection,
Target,
Source,
Options,
Song,
Album,
Artist,
Label,
)
from ..objects.artwork import ArtworkVariant
from ..audio import write_metadata_to_target, correct_codec
from ..utils import output, BColors
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.shared import DEBUG_PAGES
from ..connection import Connection
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, INDEPENDENT_DB_OBJECTS
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, YoutubeMusic, Bandcamp, Genius, INDEPENDENT_DB_OBJECTS
ALL_PAGES: Set[Type[Page]] = {
# EncyclopaediaMetallum,
Genius,
Musify,
YoutubeMusic,
Bandcamp
@ -34,6 +59,13 @@ SHADY_PAGES: Set[Type[Page]] = {
Musify,
}
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
if DEBUG_PAGES:
DEBUGGING_PAGE = Bandcamp
print(f"Only downloading from page {DEBUGGING_PAGE}.")
@ -43,91 +75,308 @@ if DEBUG_PAGES:
class Pages:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False) -> None:
def __init__(self, exclude_pages: Set[Type[Page]] = None, exclude_shady: bool = False, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
# initialize all page instances
self._page_instances: Dict[Type[Page], Page] = dict()
self._source_to_page: Dict[SourcePages, Type[Page]] = dict()
self._source_to_page: Dict[SourceType, Type[Page]] = dict()
exclude_pages = exclude_pages if exclude_pages is not None else set()
if exclude_shady:
exclude_pages = exclude_pages.union(SHADY_PAGES)
if not exclude_pages.issubset(ALL_PAGES):
raise ValueError(f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
raise ValueError(
f"The excluded pages have to be a subset of all pages: {exclude_pages} | {ALL_PAGES}")
def _set_to_tuple(page_set: Set[Type[Page]]) -> Tuple[Type[Page], ...]:
return tuple(sorted(page_set, key=lambda page: page.__name__))
self._pages_set: Set[Type[Page]] = ALL_PAGES.difference(exclude_pages)
self.pages: Tuple[Type[Page], ...] = _set_to_tuple(self._pages_set)
self._audio_pages_set: Set[Type[Page]] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(self._audio_pages_set)
self._audio_pages_set: Set[Type[Page]
] = self._pages_set.intersection(AUDIO_PAGES)
self.audio_pages: Tuple[Type[Page], ...] = _set_to_tuple(
self._audio_pages_set)
for page_type in self.pages:
self._page_instances[page_type] = page_type()
self._page_instances[page_type] = page_type(
fetch_options=self.fetch_options, download_options=self.download_options)
self._source_to_page[page_type.SOURCE_TYPE] = page_type
def _get_page_from_enum(self, source_page: SourceType) -> Page:
if source_page not in self._source_to_page:
return None
return self._page_instances[self._source_to_page[source_page]]
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page_type in self.pages:
result.add(
page=page_type,
search_result=self._page_instances[page_type].search(query=query)
search_result=self._page_instances[page_type].search(
query=query)
)
return result
def fetch_details(self, music_object: DatabaseObject, stop_at_level: int = 1) -> DatabaseObject:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
return music_object
for source_page in music_object.source_collection.source_pages:
if source_page not in self._source_to_page:
continue
page_type = self._source_to_page[source_page]
if page_type in self._pages_set:
music_object.merge(self._page_instances[page_type].fetch_details(music_object=music_object, stop_at_level=stop_at_level))
return music_object
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
if not isinstance(data_object, INDEPENDENT_DB_OBJECTS):
return data_object
def is_downloadable(self, music_object: DatabaseObject) -> bool:
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(
source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
audio_pages = self._audio_pages_set.intersection(_page_types)
return len(audio_pages) > 0
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
if not isinstance(music_object, INDEPENDENT_DB_OBJECTS):
return DownloadResult(error_message=f"{type(music_object).__name__} can't be downloaded.")
return data_object
self.fetch_details(music_object)
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
audio_pages = self._audio_pages_set.intersection(_page_types)
for download_page in audio_pages:
return self._page_instances[download_page].download(music_object=music_object, genre=genre, download_all=download_all, process_metadata_anyway=process_metadata_anyway)
return DownloadResult(error_message=f"No audio source has been found for {music_object}.")
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def _fetch_artist_artwork(self, artist: Artist, naming: dict):
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["artist"].append(artist.name)
naming["label"].extend(
[l.title_value for l in artist.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
artwork_collection: ArtworkCollection = artist.artwork
artwork_collection.compile()
for image_number, artwork in enumerate(artwork_collection):
for artwork_variant in artwork.variants:
naming["image_number"] = [str(image_number)]
target = Target(
relative_to_music_dir=True,
file_path=Path(self._parse_path_template(
main_settings["artist_artwork_path"], naming=naming))
)
if not target.file_path.parent.exists():
target.create_path()
subprocess.Popen(["gio", "set", target.file_path.parent, "metadata::custom-icon", "file://"+str(target.file_path)])
with Image.open(artwork_variant.target.file_path) as img:
img.save(target.file_path, main_settings["image_format"])
artwork_variant.target = Target
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(
f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
naming = kwargs.get("naming", {
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
"image_format": [main_settings["image_format"]]
})
# download artist artwork
if isinstance(data_object, Artist):
self._fetch_artist_artwork(artist=data_object, naming=naming)
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming=naming)
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(
path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(
f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(
a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(
main_settings["download_path"], naming=naming),
self._parse_path_template(
main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(
f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(
source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(
song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(
metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DatabaseObject]:
source = Source.match_url(url, SourcePages.MANUAL)
if source is None:
raise UrlNotFoundException(url=url)
_actual_page = self._source_to_page[source.page_enum]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)
_actual_page = self._source_to_page[source.source_type]
return _actual_page, self._page_instances[_actual_page].fetch_object_from_source(source=source, stop_at_level=stop_at_level)

View File

@ -2,7 +2,6 @@ from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass
from ..objects import DatabaseObject
from ..utils.enums.source import SourcePages
from ..pages import Page, EncyclopaediaMetallum, Musify
@ -13,31 +12,35 @@ class Option:
class Results:
def __init__(self) -> None:
def __init__(self, max_items_per_page: int = 10, **kwargs) -> None:
self._by_index: Dict[int, DatabaseObject] = dict()
self._page_by_index: Dict[int: Type[Page]] = dict()
self.max_items_per_page = max_items_per_page
def __iter__(self) -> Generator[DatabaseObject, None, None]:
for option in self.formated_generator():
for option in self.formatted_generator():
if isinstance(option, Option):
yield option.music_object
def formated_generator(self, max_items_per_page: int = 10) -> Generator[Union[Type[Page], Option], None, None]:
def formatted_generator(self) -> Generator[Union[Type[Page], Option], None, None]:
self._by_index = dict()
self._page_by_index = dict()
def get_music_object_by_index(self, index: int) -> Tuple[Type[Page], DatabaseObject]:
# if this throws a key error, either the formatted generator needs to be iterated, or the option doesn't exist.
return self._page_by_index[index], self._by_index[index]
def __len__(self) -> int:
return max(self._by_index.keys())
def __getitem__(self, index: int):
return self._by_index[index]
class SearchResults(Results):
def __init__(
self,
pages: Tuple[Type[Page], ...] = None
pages: Tuple[Type[Page], ...] = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.pages = pages or []
# this would initialize a list for every page, which I don't think I want
@ -54,9 +57,12 @@ class SearchResults(Results):
def get_page_results(self, page: Type[Page]) -> "PageResults":
return PageResults(page, self.results.get(page, []))
def __len__(self) -> int:
return sum(min(self.max_items_per_page, len(results)) for results in self.results.values())
def formated_generator(self, max_items_per_page: int = 10):
super().formated_generator()
def formatted_generator(self):
super().formatted_generator()
i = 0
for page in self.results:
@ -70,19 +76,37 @@ class SearchResults(Results):
i += 1
j += 1
if j >= max_items_per_page:
if j >= self.max_items_per_page:
break
class GoToResults(Results):
def __init__(self, results: List[DatabaseObject], **kwargs):
self.results: List[DatabaseObject] = results
super().__init__(**kwargs)
def __getitem__(self, index: int):
return self.results[index]
def __len__(self) -> int:
return len(self.results)
def formatted_generator(self):
yield from (Option(i, o) for i, o in enumerate(self.results))
class PageResults(Results):
def __init__(self, page: Type[Page], results: List[DatabaseObject]) -> None:
super().__init__()
def __init__(self, page: Type[Page], results: List[DatabaseObject], **kwargs) -> None:
super().__init__(**kwargs)
self.page: Type[Page] = page
self.results: List[DatabaseObject] = results
def formated_generator(self, max_items_per_page: int = 10):
super().formated_generator()
def formatted_generator(self, max_items_per_page: int = 10):
super().formatted_generator()
i = 0
yield self.page
@ -92,3 +116,6 @@ class PageResults(Results):
self._by_index[i] = option
self._page_by_index[i] = self.page
i += 1
def __len__(self) -> int:
return len(self.results)

View File

@ -1,27 +1,16 @@
from typing_extensions import TypeVar
from .option import Options
from .metadata import Metadata, Mapping as ID3Mapping, ID3Timestamp
from .source import Source, SourcePages, SourceTypes
from .song import (
Song,
Album,
Artist,
Target,
Lyrics,
Label
)
from .formatted_text import FormattedText
from .artwork import ArtworkCollection
from .collection import Collection
from .country import Country
from .contact import Contact
from .country import Country
from .formatted_text import FormattedText
from .metadata import ID3Timestamp
from .metadata import Mapping as ID3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .song import Album, Artist, Label, Lyrics, Song, Target
from .source import Source, SourceType
from .artwork import Artwork
DatabaseObject = TypeVar('T', bound=OuterProxy)
DatabaseObject = OuterProxy

View File

@ -1,62 +1,243 @@
from __future__ import annotations
from typing import List, Optional, Dict, Tuple, Type, Union, TypedDict
from .collection import Collection
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from ..utils.string_processing import unify, hash_url
from .parents import OuterProxy as Base
from copy import copy
from dataclasses import dataclass, field
from functools import cached_property
from typing import Dict, List, Optional, Set, Tuple, Type, TypedDict, Union
from ..connection import Connection
from ..utils import create_dataclass_instance, custom_hash
from ..utils.config import main_settings
from ..utils.enums import PictureType
from ..utils.string_processing import hash_url, unify
from .collection import Collection
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .parents import OuterProxy as Base
from .target import Target
from PIL import Image
import imagehash
artwork_connection: Connection = Connection(module="artwork")
class ArtworkVariant(TypedDict):
@dataclass
class ArtworkVariant:
url: str
width: int
height: int
deviation: float
width: Optional[int] = None
heigth: Optional[int] = None
image_format: Optional[str] = None
def __hash__(self) -> int:
return custom_hash(self.url)
class Artwork:
def __init__(self, *variants: List[ArtworkVariant]) -> None:
self._variant_mapping: Dict[str, ArtworkVariant] = {}
def __eq__(self, other: ArtworkVariant) -> bool:
return hash(self) == hash(other)
for variant in variants:
self.append(**variant)
def __contains__(self, other: str) -> bool:
return custom_hash(other) == hash(self.url)
@staticmethod
def _calculate_deviation(*dimensions: List[int]) -> float:
return sum(abs(d - main_settings["preferred_artwork_resolution"]) for d in dimensions) / len(dimensions)
def __merge__(self, other: ArtworkVariant) -> None:
for key, value in other.__dict__.items():
if value is None:
continue
def append(self, url: str, width: int = main_settings["preferred_artwork_resolution"], height: int = main_settings["preferred_artwork_resolution"], **kwargs) -> None:
if url is None:
if getattr(self, key) is None:
setattr(self, key, value)
@cached_property
def target(self) -> Target:
return Target.temp()
def fetch(self) -> None:
global artwork_connection
r = artwork_connection.get(self.url, name=hash_url(self.url))
if r is None:
return
self._variant_mapping[hash_url(url=url)] = {
"url": url,
"width": width,
"height": height,
"deviation": self._calculate_deviation(width, height),
}
self.target.raw_content = r.content
@dataclass
class Artwork:
variants: List[ArtworkVariant] = field(default_factory=list)
artwork_type: PictureType = PictureType.OTHER
def search_variant(self, url: str) -> Optional[ArtworkVariant]:
if url is None:
return None
for variant in self.variants:
if url in variant:
return variant
return None
def __contains__(self, other: str) -> bool:
return self.search_variant(other) is not None
def add_data(self, **kwargs) -> None:
variant = self.search_variant(kwargs.get("url"))
if variant is None:
variant, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
self.variants.append(variant)
variant.__dict__.update(kwargs)
@property
def best_variant(self) -> ArtworkVariant:
if len(self._variant_mapping.keys()) <= 0:
def url(self) -> Optional[str]:
if len(self.variants) <= 0:
return None
return min(self._variant_mapping.values(), key=lambda x: x["deviation"])
return self.variants[0].url
def get_variant_name(self, variant: ArtworkVariant) -> str:
return f"artwork_{variant['width']}x{variant['height']}_{hash_url(variant['url']).replace('/', '_')}"
def fetch(self) -> None:
for variant in self.variants:
variant.fetch()
def __merge__(self, other: Artwork, **kwargs) -> None:
for key, value in other._variant_mapping.items():
if key not in self._variant_mapping:
self._variant_mapping[key] = value
def __eq__(self, other: Artwork) -> bool:
return any(a == b for a, b in zip(self._variant_mapping.keys(), other._variant_mapping.keys()))
class ArtworkCollection:
"""
Stores all the images/artworks for one data object.
There could be duplicates before calling ArtworkCollection.compile()
_this is called before one object is downloaded automatically._
"""
artwork_type: PictureType = PictureType.OTHER
def __init__(
self,
*data: List[Artwork],
parent_artworks: Set[ArtworkCollection] = None,
crop_images: bool = True,
) -> None:
# this is used for the song artwork, to fall back to the song artwork
self.parent_artworks: Set[ArtworkCollection] = parent_artworks or set()
self.crop_images: bool = crop_images
self._data = []
self.extend(data)
def search_artwork(self, url: str) -> Optional[ArtworkVariant]:
for artwork in self._data:
if url in artwork:
return artwork
return None
def __contains__(self, other: str) -> bool:
return self.search_artwork(other) is not None
def _create_new_artwork(self, **kwargs) -> Tuple[Artwork, dict]:
kwargs["artwork_type"] = kwargs.get("artwork_type", self.artwork_type)
return create_dataclass_instance(Artwork, dict(**kwargs))
def add_data(self, url: str, **kwargs) -> Artwork:
kwargs["url"] = url
artwork = self.search_artwork(url)
if artwork is None:
artwork, kwargs = self._create_new_artwork(**kwargs)
self._data.append(artwork)
artwork.add_data(**kwargs)
return artwork
def append(self, value: Union[Artwork, ArtworkVariant, dict], **kwargs):
"""
You can append the types Artwork, ArtworkVariant or dict
the best option would be to use Artwork and avoid the other options.
"""
if isinstance(value, dict):
kwargs.update(value)
value, kwargs = create_dataclass_instance(ArtworkVariant, kwargs)
if isinstance(value, ArtworkVariant):
kwargs["variants"] = [value]
value, kwargs = create_dataclass_instance(Artwork, kwargs)
if isinstance(value, Artwork):
self._data.append(value)
return
def extend(self, values: List[Union[Artwork, ArtworkVariant, dict]], **kwargs):
for value in values:
self.append(value, **kwargs)
def compile(self, **kwargs) -> None:
"""
This will make the artworks ready for download and delete duplicates.
"""
artwork_hashes: list = list()
artwork_urls: list = list()
for artwork in self._data:
index = 0
for artwork_variant in artwork.variants:
r = artwork_connection.get(
url=artwork_variant.url,
name=artwork_variant.url,
)
if artwork_variant.url in artwork_urls:
artwork.variants.pop(index)
continue
artwork_urls.append(artwork_variant.url)
target: Target = artwork_variant.target
with target.open("wb") as f:
f.write(r.content)
with Image.open(target.file_path) as img:
# https://stackoverflow.com/a/59476938/16804841
if img.mode != 'RGB':
img = img.convert('RGB')
try:
image_hash = imagehash.crop_resistant_hash(img)
except Exception as e:
continue
if image_hash in artwork_hashes:
artwork.variants.pop(index)
target.delete()
continue
artwork_hashes.append(image_hash)
width, height = img.size
if width != height:
if width > height:
img = img.crop((width // 2 - height // 2, 0, width // 2 + height // 2, height))
else:
img = img.crop((0, height // 2 - width // 2, width, height // 2 + width // 2))
# resize the image to the preferred resolution
img.thumbnail((main_settings["preferred_artwork_resolution"], main_settings["preferred_artwork_resolution"]))
index =+ 1
def __merge__(self, other: ArtworkCollection, **kwargs) -> None:
self.parent_artworks.update(other.parent_artworks)
for other_artwork in other._data:
for other_variant in other_artwork.variants:
if self.__contains__(other_variant.url):
continue
self.append(ArtworkVariant(other_variant.url))
def __hash__(self) -> int:
return id(self)
def __iter__(self) -> Generator[Artwork, None, None]:
yield from self._data
def get_urls(self) -> Generator[str, None, None]:
yield from (artwork.url for artwork in self._data if artwork.url is not None)

View File

@ -1,14 +1,43 @@
from __future__ import annotations
import copy
from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from .parents import OuterProxy
from ..utils import object_trace
from ..utils import output, BColors
from dataclasses import dataclass
from typing import (Any, Callable, Dict, Generator, Generic, Iterable,
Iterator, List, Optional, Set, Tuple, TypeVar, Union)
from ..utils import BColors, object_trace, output
from .parents import InnerData, OuterProxy
T = TypeVar('T', bound=OuterProxy)
@dataclass
class AppendHookArguments:
"""
This class is used to store the arguments for the append hook.
The best explanation is with an examples:
```
album = Album()
song = Song()
album.song_collection.append(song)
```
In this case, the append hook is triggered with the following arguments:
```
AppendHookArguments(
collection=album.song_collection,
new_object=song,
collection_root_objects=[album]
)
```
"""
collection: Collection
new_object: T
collection_root_objects: Set[InnerData]
class Collection(Generic[T]):
__is_collection__ = True
@ -25,6 +54,7 @@ class Collection(Generic[T]):
sync_on_append: Dict[str, Collection] = None,
append_object_to_attribute: Dict[str, T] = None,
extend_object_to_attribute: Dict[str, Collection] = None,
append_callbacks: Set[Callable[[AppendHookArguments], None]] = None,
) -> None:
self._collection_for: dict = dict()
@ -39,6 +69,7 @@ class Collection(Generic[T]):
self.sync_on_append: Dict[str, Collection] = sync_on_append or {}
self.pull_from: List[Collection] = []
self.push_to: List[Collection] = []
self.append_callbacks: Set[Callable[[AppendHookArguments], None]] = append_callbacks or set()
# This is to cleanly unmap previously mapped items by their id
self._indexed_from_id: Dict[int, Dict[str, Any]] = defaultdict(dict)
@ -47,8 +78,15 @@ class Collection(Generic[T]):
self.extend(data)
def __hash__(self) -> int:
return id(self)
@property
def collection_names(self) -> List[str]:
return list(set(self._collection_for.values()))
def __repr__(self) -> str:
return f"Collection({' | '.join(self._collection_for.values())} {id(self)})"
return f"Collection({' | '.join(self.collection_names)} {id(self)})"
def _map_element(self, __object: T, no_unmap: bool = False, **kwargs):
if not no_unmap:
@ -104,13 +142,7 @@ class Collection(Generic[T]):
"""
self._data.append(other)
# all of the existing hooks to get the defined datastructure
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
other._inner._is_in_collection.add(self)
for attribute, a in self.sync_on_append.items():
# syncing two collections by reference
@ -131,6 +163,21 @@ class Collection(Generic[T]):
a.extend(b_data, **kwargs)
# all of the existing hooks to get the defined datastructures
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).extend(generator, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).append(new_object, **kwargs)
append_hook_args = AppendHookArguments(
collection=self,
new_object=other,
collection_root_objects=self._collection_for.keys(),
)
for callback in self.append_callbacks:
callback(append_hook_args)
def append(self, other: Optional[T], **kwargs):
"""
If an object, that represents the same entity exists in a relevant collection,
@ -143,35 +190,36 @@ class Collection(Generic[T]):
if other is None:
return
if not other._inner._has_data:
return
if other.id in self._indexed_from_id:
return
object_trace(f"Appending {other.option_string} to {self}")
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
output("found pull from", r, other, self, color=BColors.RED, sep="\t")
other.merge(r, **kwargs)
c.remove(r, existing=r, **kwargs)
break
existing_object = self._find_object(other)
# switching collection in the case of push to
for c in self.push_to:
r = c._find_object(other)
if r is not None:
output("found push to", r, other, self, color=BColors.RED, sep="\t")
# output("found push to", r, other, c, self, color=BColors.RED, sep="\t")
return c.append(other, **kwargs)
for c in self.pull_from:
r = c._find_object(other)
if r is not None:
# output("found pull from", r, other, c, self, color=BColors.RED, sep="\t")
c.remove(r, existing=r, **kwargs)
if existing_object is None:
existing = self._find_object(other)
if existing is None:
self._append_new_object(other, **kwargs)
else:
existing_object.merge(other, **kwargs)
existing.merge(other, **kwargs)
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, **kwargs):
def remove(self, *other_list: List[T], silent: bool = False, existing: Optional[T] = None, remove_from_other_collection=True, **kwargs):
other: T
for other in other_list:
existing: Optional[T] = existing or self._indexed_values["id"].get(other.id, None)
if existing is None:
@ -179,16 +227,13 @@ class Collection(Generic[T]):
raise ValueError(f"Object {other} not found in {self}")
return other
"""
for collection_attribute, generator in self.extend_object_to_attribute.items():
other.__getattribute__(collection_attribute).remove(*generator, silent=silent, **kwargs)
for attribute, new_object in self.append_object_to_attribute.items():
other.__getattribute__(attribute).remove(new_object, silent=silent, **kwargs)
"""
self._data.remove(existing)
self._unmap_element(existing)
if remove_from_other_collection:
for c in copy.copy(other._inner._is_in_collection):
c.remove(other, silent=True, remove_from_other_collection=False, **kwargs)
other._inner._is_in_collection = set()
else:
self._data.remove(existing)
self._unmap_element(existing)
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None

View File

@ -32,14 +32,27 @@ class FormattedText:
if self.is_empty and other.is_empty:
return True
return self.doc == other.doc
return self.html == other.html
@property
def markdown(self) -> str:
return md(self.html).strip()
@markdown.setter
def markdown(self, value: str) -> None:
self.html = mistune.markdown(value)
@property
def plain(self) -> str:
md = self.markdown
return md.replace("\n\n", "\n")
@plain.setter
def plain(self, value: str) -> None:
self.html = mistune.markdown(plain_to_markdown(value))
def __str__(self) -> str:
return self.markdown
plaintext = markdown
plaintext = plain

View File

@ -34,6 +34,6 @@ class Lyrics(OuterProxy):
@property
def metadata(self) -> Metadata:
return Metadata({
id3Mapping.UNSYNCED_LYRICS: [self.text.markdown]
id3Mapping.UNSYNCED_LYRICS: [self.text.plaintext]
})

View File

@ -92,7 +92,7 @@ class Mapping(Enum):
key = attribute.value
if key[0] == 'T':
# a text fiel
# a text field
return cls.get_text_instance(key, value)
if key[0] == "W":
# an url field
@ -355,7 +355,12 @@ class Metadata:
return None
list_data = self.id3_dict[field]
#correct duplications
correct_list_data = list()
for data in list_data:
if data not in correct_list_data:
correct_list_data.append(data)
list_data = correct_list_data
# convert for example the time objects to timestamps
for i, element in enumerate(list_data):
# for performances sake I don't do other checks if it is already the right type
@ -368,7 +373,7 @@ class Metadata:
if type(element) == ID3Timestamp:
list_data[i] = element.timestamp
continue
"""
Version 2.4 of the specification prescribes that all text fields (the fields that start with a T, except for TXXX) can contain multiple values separated by a null character.
Thus if above conditions are met, I concatenate the list,
@ -376,7 +381,7 @@ class Metadata:
"""
if field.value[0].upper() == "T" and field.value.upper() != "TXXX":
return self.NULL_BYTE.join(list_data)
return list_data[0]
def get_mutagen_object(self, field):
@ -395,6 +400,5 @@ class Metadata:
"""
# set the tagging timestamp to the current time
self.__setitem__(Mapping.TAGGING_TIME, [ID3Timestamp.now()])
for field in self.id3_dict:
yield self.get_mutagen_object(field)

View File

@ -8,6 +8,7 @@ from typing import Optional, Dict, Tuple, List, Type, Generic, Any, TypeVar, Set
from pathlib import Path
import inspect
from .source import SourceCollection
from .metadata import Metadata
from ..utils import get_unix_time, object_trace, generate_id
from ..utils.config import logging_settings, main_settings
@ -29,12 +30,17 @@ class InnerData:
"""
_refers_to_instances: set = None
_is_in_collection: set = None
_has_data: bool = False
"""
Attribute versions keep track, of if the attribute has been changed.
"""
def __init__(self, object_type, **kwargs):
self._refers_to_instances = set()
self._is_in_collection = set()
self._fetched_from: dict = {}
# initialize the default values
@ -45,9 +51,19 @@ class InnerData:
for key, value in kwargs.items():
if hasattr(value, "__is_collection__"):
value._collection_for[self] = key
self.__setattr__(key, value)
if self._has_data:
continue
def __setattr__(self, key: str, value):
if self._has_data or not hasattr(self, "_default_values"):
return super().__setattr__(key, value)
super().__setattr__("_has_data", not (key in self._default_values and self._default_values[key] == value))
return super().__setattr__(key, value)
def __hash__(self):
return self.id
@ -58,6 +74,7 @@ class InnerData:
"""
self._fetched_from.update(__other._fetched_from)
self._is_in_collection.update(__other._is_in_collection)
for key, value in __other.__dict__.copy().items():
if key.startswith("_"):
@ -83,7 +100,9 @@ class OuterProxy:
Wraps the inner data, and provides apis, to naturally access those values.
"""
_default_factories: dict = {}
source_collection: SourceCollection
_default_factories: dict = {"source_collection": SourceCollection}
_outer_attribute: Set[str] = {"options", "metadata", "indexing_values", "option_string"}
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = tuple()
@ -187,6 +206,7 @@ class OuterProxy:
if __other is None:
return
a_id = self.id
a = self
b = __other
@ -209,6 +229,8 @@ class OuterProxy:
a._inner.__merge__(old_inner, **kwargs)
del old_inner
self.id = a_id
def __merge__(self, __other: Optional[OuterProxy], **kwargs):
self.merge(__other, **kwargs)
@ -285,10 +307,49 @@ class OuterProxy:
return r
@property
def root_collections(self) -> List[Collection]:
if len(self.UPWARDS_COLLECTION_STRING_ATTRIBUTES) == 0:
return [self]
r = []
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
r.extend(self.__getattribute__(collection_string_attribute))
return r
def _compile(self, **kwargs):
pass
def compile(self, from_root=False, **kwargs):
# compile from the root
if not from_root:
for c in self.root_collections:
c.compile(from_root=True, **kwargs)
return
self._compile(**kwargs)
for c_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
for c in self.__getattribute__(c_attribute):
c.compile(from_root=True, **kwargs)
TITEL = "id"
@property
def title_string(self) -> str:
return str(self.__getattribute__(self.TITEL)) + (f" {self.id}" if DEBUG_PRINT_ID else "")
@property
def title_value(self) -> str:
return str(self.__getattribute__(self.TITEL))
def __repr__(self):
return f"{type(self).__name__}({self.title_string})"
def get_child_collections(self):
for collection_string_attribute in self.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
yield self.__getattribute__(collection_string_attribute)
def get_parent_collections(self):
for collection_string_attribute in self.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
yield self.__getattribute__(collection_string_attribute)

View File

@ -1,34 +1,32 @@
from __future__ import annotations
import copy
import random
from collections import defaultdict
from typing import List, Optional, Dict, Tuple, Type, Union
from typing import Dict, List, Optional, Tuple, Type, Union
import pycountry
from ..utils.enums.album import AlbumType, AlbumStatus
from .collection import Collection
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .contact import Contact
from .artwork import Artwork
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
Metadata
)
from .option import Options
from .parents import OuterProxy, P
from .source import Source, SourceCollection
from .target import Target
from .country import Language, Country
from ..utils.config import main_settings
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.enums.colors import BColors
from ..utils.shared import DEBUG_PRINT_ID
from ..utils.string_processing import unify
from .artwork import ArtworkCollection
from .collection import AppendHookArguments, Collection
from .contact import Contact
from .country import Country, Language
from .formatted_text import FormattedText
from .lyrics import Lyrics
from .metadata import ID3Timestamp
from .metadata import Mapping as id3Mapping
from .metadata import Metadata
from .option import Options
from .parents import OuterProxy
from .parents import OuterProxy as Base
from ..utils.config import main_settings
from ..utils.enums.colors import BColors
from .parents import P
from .source import Source, SourceCollection
from .target import Target
"""
All Objects dependent
@ -88,13 +86,13 @@ class Song(Base):
genre: str
note: FormattedText
tracksort: int
artwork: Artwork
artwork: ArtworkCollection
source_collection: SourceCollection
target_collection: Collection[Target]
lyrics_collection: Collection[Lyrics]
main_artist_collection: Collection[Artist]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
album_collection: Collection[Album]
@ -104,13 +102,13 @@ class Song(Base):
"source_collection": SourceCollection,
"target_collection": Collection,
"lyrics_collection": Collection,
"artwork": Artwork,
"artwork": ArtworkCollection,
"main_artist_collection": Collection,
"album_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"title": lambda: "",
"title": lambda: None,
"unified_title": lambda: None,
"isrc": lambda: None,
"genre": lambda: None,
@ -118,34 +116,57 @@ class Song(Base):
"tracksort": lambda: 0,
}
def __init__(self, title: str = "", unified_title: str = None, isrc: str = None, length: int = None,
genre: str = None, note: FormattedText = None, source_list: List[Source] = None,
target_list: List[Target] = None, lyrics_list: List[Lyrics] = None,
main_artist_list: List[Artist] = None, feature_artist_list: List[Artist] = None,
album_list: List[Album] = None, tracksort: int = 0, artwork: Optional[Artwork] = None, **kwargs) -> None:
def __init__(
self,
title: str = None,
isrc: str = None,
length: int = None,
genre: str = None,
note: FormattedText = None,
source_list: List[Source] = None,
target_list: List[Target] = None,
lyrics_list: List[Lyrics] = None,
artist_list: List[Artist] = None,
feature_artist_list: List[Artist] = None,
album_list: List[Album] = None,
tracksort: int = 0,
artwork: Optional[ArtworkCollection] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**locals())
Base.__init__(**real_kwargs)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_artist_collection", "feature_artist_collection", "album_collection")
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("artist_collection", "feature_artist_collection", "album_collection")
TITEL = "title"
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
album: Album = append_hook_arguments.new_object
song: Song
for song in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(album.artwork)
def __init_collections__(self) -> None:
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.sync_on_append = {
"artist_collection": self.main_artist_collection,
"artist_collection": self.artist_collection,
}
self.album_collection.append_object_to_attribute = {
"song_collection": self,
}
self.main_artist_collection.extend_object_to_attribute = {
"main_album_collection": self.album_collection
self.artist_collection.extend_object_to_attribute = {
"album_collection": self.album_collection
}
self.feature_artist_collection.append_object_to_attribute = {
"feature_song_collection": self
self.feature_artist_collection.extend_object_to_attribute = {
"album_collection": self.album_collection
}
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
self.album_collection.append_callbacks = set((Song.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
@ -163,6 +184,10 @@ class Song(Base):
self.album_collection.extend(object_list)
return
def _compile(self):
self.artwork.compile()
INDEX_DEPENDS_ON = ("title", "isrc", "source_collection")
@property
@ -188,14 +213,14 @@ class Song(Base):
# metadata.merge_many([s.get_song_metadata() for s in self.source_collection]) album sources have no relevant metadata for id3
metadata.merge_many([a.metadata for a in self.album_collection])
metadata.merge_many([a.metadata for a in self.main_artist_collection])
metadata.merge_many([a.metadata for a in self.artist_collection])
metadata.merge_many([a.metadata for a in self.feature_artist_collection])
metadata.merge_many([lyrics.metadata for lyrics in self.lyrics_collection])
return metadata
def get_artist_credits(self) -> str:
main_artists = ", ".join([artist.name for artist in self.main_artist_collection])
main_artists = ", ".join([artist.name for artist in self.artist_collection])
feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection])
if len(feature_artists) == 0:
@ -204,20 +229,13 @@ class Song(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "song "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.album_collection, " from {}", ignore_titles={self.title})
r += get_collection_string(self.main_artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}")
r += get_collection_string(self.artist_collection, " by {}")
r += get_collection_string(self.feature_artist_collection, " feat. {}" if len(self.artist_collection) > 0 else " by {}")
return r
@property
def options(self) -> List[P]:
options = self.main_artist_collection.shallow_list
options.extend(self.feature_artist_collection)
options.extend(self.album_collection)
options.append(self)
return options
@property
def tracksort_str(self) -> str:
"""
@ -230,11 +248,6 @@ class Song(Base):
return f"{self.tracksort}/{len(self.album_collection[0].song_collection) or 1}"
"""
All objects dependent on Album
"""
class Album(Base):
title: str
unified_title: str
@ -246,10 +259,12 @@ class Album(Base):
albumsort: int
notes: FormattedText
artwork: ArtworkCollection
source_collection: SourceCollection
artist_collection: Collection[Artist]
song_collection: Collection[Song]
artist_collection: Collection[Artist]
feature_artist_collection: Collection[Artist]
label_collection: Collection[Label]
_default_factories = {
@ -264,43 +279,71 @@ class Album(Base):
"date": ID3Timestamp,
"notes": FormattedText,
"artwork": lambda: ArtworkCollection(crop_images=False),
"source_collection": SourceCollection,
"artist_collection": Collection,
"song_collection": Collection,
"artist_collection": Collection,
"feature_artist_collection": Collection,
"label_collection": Collection,
}
TITEL = "title"
# This is automatically generated
def __init__(self, title: str = None, unified_title: str = None, album_status: AlbumStatus = None,
album_type: AlbumType = None, language: Language = None, date: ID3Timestamp = None,
barcode: str = None, albumsort: int = None, notes: FormattedText = None,
source_list: List[Source] = None, artist_list: List[Artist] = None, song_list: List[Song] = None,
label_list: List[Label] = None, **kwargs) -> None:
super().__init__(title=title, unified_title=unified_title, album_status=album_status, album_type=album_type,
language=language, date=date, barcode=barcode, albumsort=albumsort, notes=notes,
source_list=source_list, artist_list=artist_list, song_list=song_list, label_list=label_list,
**kwargs)
def __init__(
self,
title: str = None,
unified_title: str = None,
album_status: AlbumStatus = None,
album_type: AlbumType = None,
language: Language = None,
date: ID3Timestamp = None,
barcode: str = None,
albumsort: int = None,
notes: FormattedText = None,
artwork: ArtworkCollection = None,
source_list: List[Source] = None,
artist_list: List[Artist] = None,
song_list: List[Song] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("song_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection", "artist_collection")
@staticmethod
def register_artwork_parent(append_hook_arguments: AppendHookArguments):
song: Song = append_hook_arguments.new_object
for root_object in append_hook_arguments.collection_root_objects:
song.artwork.parent_artworks.add(root_object.artwork)
def __init_collections__(self):
self.feature_artist_collection.push_to = [self.artist_collection]
self.artist_collection.pull_from = [self.feature_artist_collection]
self.song_collection.append_object_to_attribute = {
"album_collection": self
}
self.song_collection.sync_on_append = {
"main_artist_collection": self.artist_collection
"artist_collection": self.artist_collection
}
self.artist_collection.append_object_to_attribute = {
"main_album_collection": self
"album_collection": self
}
self.artist_collection.extend_object_to_attribute = {
"label_collection": self.label_collection
}
self.song_collection.append_callbacks = set((Album.register_artwork_parent, ))
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
self.song_collection.extend(object_list)
@ -349,14 +392,37 @@ class Album(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "album "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.artist_collection, " by {}")
if len(self.artist_collection) <= 0:
r += get_collection_string(self.feature_artist_collection, " by {}")
r += get_collection_string(self.label_collection, " under {}")
if len(self.song_collection) > 0:
r += f" with {len(self.song_collection)} songs"
return r
def _compile(self):
self.analyze_implied_album_type()
self.update_tracksort()
self.fix_artist_collection()
def analyze_implied_album_type(self):
# if the song collection has only one song, it is reasonable to assume that it is a single
if len(self.song_collection) == 1:
self.album_type = AlbumType.SINGLE
return
# if the album already has an album type, we don't need to do anything
if self.album_type is not AlbumType.OTHER:
return
# for information on EP's I looked at https://www.reddit.com/r/WeAreTheMusicMakers/comments/a354ql/whats_the_cutoff_length_between_ep_and_album/
if len(self.song_collection) < 9:
self.album_type = AlbumType.EP
return
def update_tracksort(self):
"""
This updates the tracksort attributes, of the songs in
@ -382,6 +448,16 @@ class Album(Base):
tracksort_map[i] = existing_list.pop(0)
tracksort_map[i].tracksort = i
def fix_artist_collection(self):
"""
I add artists, that could only be feature artists to the feature artist collection.
They get automatically moved to main artist collection, if a matching artist exists in the main artist collection or is appended to it later on.
If I am not sure for any artist, I try to analyze the most common artist in the song collection of one album.
"""
# move all artists that are in all feature_artist_collections, of every song, to the artist_collection
pass
@property
def copyright(self) -> str:
if self.date is None:
@ -413,14 +489,8 @@ class Album(Base):
return self.album_type.value
"""
All objects dependent on Artist
"""
class Artist(Base):
name: str
unified_name: str
country: Country
formed_in: ID3Timestamp
notes: FormattedText
@ -429,16 +499,16 @@ class Artist(Base):
general_genre: str
unformatted_location: str
artwork: ArtworkCollection
source_collection: SourceCollection
contact_collection: Collection[Contact]
feature_song_collection: Collection[Song]
main_album_collection: Collection[Album]
album_collection: Collection[Album]
label_collection: Collection[Label]
_default_factories = {
"name": str,
"unified_name": lambda: None,
"name": lambda: None,
"country": lambda: None,
"unformatted_location": lambda: None,
@ -447,9 +517,10 @@ class Artist(Base):
"lyrical_themes": list,
"general_genre": lambda: "",
"artwork": ArtworkCollection,
"source_collection": SourceCollection,
"feature_song_collection": Collection,
"main_album_collection": Collection,
"album_collection": Collection,
"contact_collection": Collection,
"label_collection": Collection,
}
@ -457,30 +528,38 @@ class Artist(Base):
TITEL = "name"
# This is automatically generated
def __init__(self, name: str = "", unified_name: str = None, country: Country = None,
formed_in: ID3Timestamp = None, notes: FormattedText = None, lyrical_themes: List[str] = None,
general_genre: str = None, unformatted_location: str = None, source_list: List[Source] = None,
contact_list: List[Contact] = None, feature_song_list: List[Song] = None,
main_album_list: List[Album] = None, label_list: List[Label] = None, **kwargs) -> None:
super().__init__(name=name, unified_name=unified_name, country=country, formed_in=formed_in, notes=notes,
lyrical_themes=lyrical_themes, general_genre=general_genre,
unformatted_location=unformatted_location, source_list=source_list, contact_list=contact_list,
feature_song_list=feature_song_list, main_album_list=main_album_list, label_list=label_list,
**kwargs)
def __init__(
self,
name: str = None,
unified_name: str = None,
country: Country = None,
formed_in: ID3Timestamp = None,
notes: FormattedText = None,
lyrical_themes: List[str] = None,
general_genre: str = None,
artwork: ArtworkCollection = None,
unformatted_location: str = None,
source_list: List[Source] = None,
contact_list: List[Contact] = None,
feature_song_list: List[Song] = None,
album_list: List[Album] = None,
label_list: List[Label] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("main_album_collection", "feature_song_collection")
Base.__init__(**real_kwargs)
DOWNWARDS_COLLECTION_STRING_ATTRIBUTES = ("album_collection",)
UPWARDS_COLLECTION_STRING_ATTRIBUTES = ("label_collection",)
def __init_collections__(self):
self.feature_song_collection.append_object_to_attribute = {
self.album_collection.append_object_to_attribute = {
"feature_artist_collection": self
}
self.main_album_collection.append_object_to_attribute = {
"artist_collection": self
}
self.label_collection.append_object_to_attribute = {
"current_artist_collection": self
}
@ -488,33 +567,32 @@ class Artist(Base):
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:
# this doesn't really make sense
# self.feature_song_collection.extend(object_list)
return
if object_type is Artist:
return
if object_type is Album:
self.main_album_collection.extend(object_list)
self.album_collection.extend(object_list)
return
if object_type is Label:
self.label_collection.extend(object_list)
return
def _compile(self):
self.update_albumsort()
def update_albumsort(self):
"""
This updates the albumsort attributes, of the albums in
`self.main_album_collection`, and sorts the albums, if possible.
`self.album_collection`, and sorts the albums, if possible.
It is advised to only call this function, once all the albums are
added to the artist.
:return:
"""
if len(self.main_album_collection) <= 0:
return
type_section: Dict[AlbumType, int] = defaultdict(lambda: 2, {
AlbumType.OTHER: 0, # if I don't know it, I add it to the first section
AlbumType.STUDIO_ALBUM: 0,
@ -526,7 +604,7 @@ class Artist(Base):
# order albums in the previously defined section
album: Album
for album in self.main_album_collection:
for album in self.album_collection:
sections[type_section[album.album_type]].append(album)
def sort_section(_section: List[Album], last_albumsort: int) -> int:
@ -557,7 +635,7 @@ class Artist(Base):
album_list.extend(sections[section_index])
# replace the old collection with the new one
self.main_album_collection: Collection = Collection(data=album_list, element_type=Album)
self.album_collection._data = album_list
INDEX_DEPENDS_ON = ("name", "source_collection", "contact_collection")
@property
@ -579,25 +657,19 @@ class Artist(Base):
@property
def option_string(self) -> str:
r = OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r = "artist "
r += OPTION_FOREGROUND.value + self.title_string + BColors.ENDC.value + OPTION_BACKGROUND.value
r += get_collection_string(self.label_collection, " under {}")
r += OPTION_BACKGROUND.value
if len(self.main_album_collection) > 0:
r += f" with {len(self.main_album_collection)} albums"
if len(self.album_collection) > 0:
r += f" with {len(self.album_collection)} albums"
if len(self.feature_song_collection) > 0:
r += f" featured in {len(self.feature_song_collection)} songs"
r += BColors.ENDC.value
return r
"""
Label
"""
class Label(Base):
COLLECTION_STRING_ATTRIBUTES = ("album_collection", "current_artist_collection")
@ -625,12 +697,21 @@ class Label(Base):
TITEL = "name"
def __init__(self, name: str = None, unified_name: str = None, notes: FormattedText = None,
source_list: List[Source] = None, contact_list: List[Contact] = None,
album_list: List[Album] = None, current_artist_list: List[Artist] = None, **kwargs) -> None:
super().__init__(name=name, unified_name=unified_name, notes=notes, source_list=source_list,
contact_list=contact_list, album_list=album_list, current_artist_list=current_artist_list,
**kwargs)
def __init__(
self,
name: str = None,
unified_name: str = None,
notes: FormattedText = None,
source_list: List[Source] = None,
contact_list: List[Contact] = None,
album_list: List[Album] = None,
current_artist_list: List[Artist] = None,
**kwargs
) -> None:
real_kwargs = copy.copy(locals())
real_kwargs.update(real_kwargs.pop("kwargs", {}))
Base.__init__(**real_kwargs)
def __init_collections__(self):
self.album_collection.append_object_to_attribute = {
@ -670,4 +751,4 @@ class Label(Base):
@property
def option_string(self):
return OPTION_FOREGROUND.value + self.name + BColors.ENDC.value
return "label " + OPTION_FOREGROUND.value + self.name + BColors.ENDC.value

View File

@ -2,40 +2,48 @@ from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums.source import SourcePages, SourceTypes
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
from .parents import OuterProxy
from .collection import Collection
if TYPE_CHECKING:
from ..pages.abstract import Page
@dataclass
class Source:
page_enum: SourcePages
source_type: SourceType
url: str
referrer_page: SourcePages = None
referrer_page: SourceType = None
audio_url: Optional[str] = None
additional_data: dict = field(default_factory=dict)
def __post_init__(self):
self.referrer_page = self.referrer_page or self.page_enum
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
self.referrer_page = self.referrer_page or self.source_type
@classmethod
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional[Source]:
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
"""
this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient
@ -44,38 +52,50 @@ class Source:
url = parsed_url.geturl()
if "musify" in parsed_url.netloc:
return cls(SourcePages.MUSIFY, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.MUSIFY, url, referrer_page=referrer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourcePages.YOUTUBE, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed_url.netloc:
return cls(SourcePages.WIKIPEDIA, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once
if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"):
return cls(SourcePages.MYSPACE, url, referrer_page=referrer_page)
return cls(ALL_SOURCE_TYPES.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property
def hash_url(self) -> str:
@ -89,37 +109,82 @@ class Source:
return r
def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {shorten_display_url(self.url)})"
return f"Src({self.source_type.value}: {shorten_display_url(self.url)})"
def __merge__(self, other: Source, **kwargs):
if self.audio_url is None:
self.audio_url = other.audio_url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.page_enum.value)
page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourcePages, List[Source]]
_sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list)
self._sources_by_type = defaultdict(list)
self._indexed_sources = {}
self.extend(data or [])
def has_source_page(self, *source_pages: SourcePages) -> bool:
return any(source_page in self._page_to_source_list for source_page in source_pages)
def source_types(
self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
if not len(source_pages):
source_pages = self.source_pages
Args:
only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
for page in source_pages:
yield from self._page_to_source_list[page]
Returns:
Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._sources_by_type.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._sources_by_type[source_type]
def append(self, source: Source):
if source is None:
@ -135,7 +200,7 @@ class SourceCollection:
existing_source.__merge__(source)
source = existing_source
else:
self._page_to_source_list[source.page_enum].append(source)
self._sources_by_type[source.source_type].append(source)
changed = False
for key in source.indexing_values:
@ -156,10 +221,6 @@ class SourceCollection:
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def source_pages(self) -> Iterable[SourcePages]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@ -170,7 +231,7 @@ class SourceCollection:
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:

View File

@ -1,17 +1,17 @@
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, TextIO, Union
import logging
import random
from pathlib import Path
from typing import List, Optional, TextIO, Tuple, Union
import requests
from tqdm import tqdm
from .parents import OuterProxy
from ..utils.config import logging_settings, main_settings
from ..utils.shared import HIGHEST_ID
from ..utils.config import main_settings, logging_settings
from ..utils.string_processing import fit_to_file_system
from .parents import OuterProxy
LOGGER = logging.getLogger("target")
@ -31,7 +31,11 @@ class Target(OuterProxy):
}
@classmethod
def temp(cls, name: str = str(random.randint(0, HIGHEST_ID))) -> P:
def temp(cls, name: str = None, file_extension: Optional[str] = None) -> P:
name = name or str(random.randint(0, HIGHEST_ID))
if file_extension is not None:
name = f"{name}.{file_extension}"
return cls(main_settings["temp_directory"] / name)
# This is automatically generated
@ -114,3 +118,11 @@ class Target(OuterProxy):
def read_bytes(self) -> bytes:
return self.file_path.read_bytes()
@property
def raw_content(self) -> bytes:
return self.file_path.read_bytes()
@raw_content.setter
def raw_content(self, content: bytes):
self.file_path.write_bytes(content)

View File

@ -3,5 +3,6 @@ from .musify import Musify
from .youtube import YouTube
from .youtube_music import YoutubeMusic
from .bandcamp import Bandcamp
from .genius import Genius
from .abstract import Page, INDEPENDENT_DB_OBJECTS

View File

@ -3,8 +3,9 @@ import random
import re
from copy import copy
from pathlib import Path
from typing import Optional, Union, Type, Dict, Set, List, Tuple
from typing import Optional, Union, Type, Dict, Set, List, Tuple, TypedDict
from string import Formatter
from dataclasses import dataclass, field
import requests
from bs4 import BeautifulSoup
@ -21,85 +22,45 @@ from ..objects import (
Collection,
Label,
)
from ..utils.enums.source import SourcePages
from ..utils.enums import SourceType
from ..utils.enums.album import AlbumType
from ..audio import write_metadata_to_target, correct_codec
from ..utils.config import main_settings
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import fit_to_file_system
from ..utils import trace
from ..utils import trace, output, BColors
INDEPENDENT_DB_OBJECTS = Union[Label, Album, Artist, Song]
INDEPENDENT_DB_TYPES = Union[Type[Song], Type[Album], Type[Artist], Type[Label]]
@dataclass
class FetchOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
class NamingDict(dict):
CUSTOM_KEYS: Dict[str, str] = {
"label": "label.name",
"artist": "artist.name",
"song": "song.title",
"isrc": "song.isrc",
"album": "album.title",
"album_type": "album.album_type_string"
}
def __init__(self, values: dict, object_mappings: Dict[str, DatabaseObject] = None):
self.object_mappings: Dict[str, DatabaseObject] = object_mappings or dict()
super().__init__(values)
self["audio_format"] = main_settings["audio_format"]
def add_object(self, music_object: DatabaseObject):
self.object_mappings[type(music_object).__name__.lower()] = music_object
def copy(self) -> dict:
return type(self)(super().copy(), self.object_mappings.copy())
def __getitem__(self, key: str) -> str:
return fit_to_file_system(super().__getitem__(key))
def default_value_for_name(self, name: str) -> str:
return f'Various {name.replace("_", " ").title()}'
def __missing__(self, key: str) -> str:
if "." not in key:
if key not in self.CUSTOM_KEYS:
return self.default_value_for_name(key)
key = self.CUSTOM_KEYS[key]
frag_list = key.split(".")
object_name = frag_list[0].strip().lower()
attribute_name = frag_list[-1].strip().lower()
if object_name not in self.object_mappings:
return self.default_value_for_name(attribute_name)
music_object = self.object_mappings[object_name]
try:
value = getattr(music_object, attribute_name)
if value is None:
return self.default_value_for_name(attribute_name)
return str(value)
except AttributeError:
return self.default_value_for_name(attribute_name)
@dataclass
class DownloadOptions:
download_all: bool = False
album_type_blacklist: Set[AlbumType] = field(default_factory=lambda: set(AlbumType(a) for a in main_settings["album_type_blacklist"]))
process_audio_if_found: bool = False
process_metadata_if_found: bool = True
class Page:
"""
This is an abstract class, laying out the
functionality for every other class fetching something
"""
SOURCE_TYPE: SourceType
LOGGER: logging.Logger
SOURCE_TYPE: SourcePages
LOGGER = logging.getLogger("this shouldn't be used")
def __new__(cls, *args, **kwargs):
cls.LOGGER = logging.getLogger(cls.__name__)
# set this to true, if all song details can also be fetched by fetching album details
NO_ADDITIONAL_DATA_FROM_SONG = False
return super().__new__(cls)
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None):
self.SOURCE_TYPE.register_page(self)
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
def _search_regex(self, pattern, string, default=None, fatal=True, flags=0, group=None):
"""
@ -172,106 +133,7 @@ class Page:
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_details(
self,
music_object: DatabaseObject,
stop_at_level: int = 1,
post_process: bool = True
) -> DatabaseObject:
"""
when a music object with lacking data is passed in, it returns
the SAME object **(no copy)** with more detailed data.
If you for example put in, an album, it fetches the tracklist
:param music_object:
:param stop_at_level:
This says the depth of the level the scraper will recurse to.
If this is for example set to 2, then the levels could be:
1. Level: the album
2. Level: every song of the album + every artist of the album
If no additional requests are needed to get the data one level below the supposed stop level
this gets ignored
:return detailed_music_object: IT MODIFIES THE INPUT OBJ
"""
# creating a new object, of the same type
new_music_object: Optional[DatabaseObject] = None
fetched_from_url: List[str] = []
# only certain database objects, have a source list
if isinstance(music_object, INDEPENDENT_DB_OBJECTS):
source: Source
for source in music_object.source_collection.get_sources(self.SOURCE_TYPE):
if music_object.already_fetched_from(source.hash_url):
continue
tmp = self.fetch_object_from_source(
source=source,
enforce_type=type(music_object),
stop_at_level=stop_at_level,
post_process=False,
type_string=type(music_object).__name__,
entity_string=music_object.option_string,
)
if new_music_object is None:
new_music_object = tmp
else:
new_music_object.merge(tmp)
fetched_from_url.append(source.hash_url)
if new_music_object is not None:
music_object.merge(new_music_object)
music_object.mark_as_fetched(*fetched_from_url)
return music_object
def fetch_object_from_source(
self,
source: Source,
stop_at_level: int = 2,
enforce_type: Type[DatabaseObject] = None,
post_process: bool = True,
type_string: str = "",
entity_string: str = "",
) -> Optional[DatabaseObject]:
obj_type = self.get_source_type(source)
if obj_type is None:
return None
if enforce_type != obj_type and enforce_type is not None:
self.LOGGER.warning(f"Object type isn't type to enforce: {enforce_type}, {obj_type}")
return None
music_object: DatabaseObject = None
fetch_map = {
Song: self.fetch_song,
Album: self.fetch_album,
Artist: self.fetch_artist,
Label: self.fetch_label
}
if obj_type in fetch_map:
music_object = fetch_map[obj_type](source, stop_at_level)
else:
self.LOGGER.warning(f"Can't fetch details of type: {obj_type}")
return None
if stop_at_level > 0:
trace(f"fetching {type_string} [{entity_string}] [stop_at_level={stop_at_level}]")
collection: Collection
for collection_str in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection = music_object.__getattribute__(collection_str)
for sub_element in collection:
sub_element.merge(
self.fetch_details(sub_element, stop_at_level=stop_at_level - 1, post_process=False))
return music_object
# to fetch stuff
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
@ -284,161 +146,7 @@ class Page:
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download(
self,
music_object: DatabaseObject,
genre: str,
download_all: bool = False,
process_metadata_anyway: bool = True
) -> DownloadResult:
naming_dict: NamingDict = NamingDict({"genre": genre})
def fill_naming_objects(naming_music_object: DatabaseObject):
nonlocal naming_dict
for collection_name in naming_music_object.UPWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(naming_music_object, collection_name)
if collection.empty:
continue
dom_ordered_music_object: DatabaseObject = collection[0]
naming_dict.add_object(dom_ordered_music_object)
return fill_naming_objects(dom_ordered_music_object)
fill_naming_objects(music_object)
return self._download(music_object, naming_dict, download_all, process_metadata_anyway=process_metadata_anyway)
def _download(
self,
music_object: DatabaseObject,
naming_dict: NamingDict,
download_all: bool = False,
skip_details: bool = False,
process_metadata_anyway: bool = True
) -> DownloadResult:
trace(f"downloading {type(music_object).__name__} [{music_object.option_string}]")
skip_next_details = skip_details
# Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False
if isinstance(music_object, Album):
if self.NO_ADDITIONAL_DATA_FROM_SONG:
skip_next_details = True
if not download_all and music_object.album_type.value in main_settings["album_type_blacklist"]:
return DownloadResult()
if not (isinstance(music_object, Song) and self.NO_ADDITIONAL_DATA_FROM_SONG):
self.fetch_details(music_object=music_object, stop_at_level=1)
if isinstance(music_object, Album):
music_object.update_tracksort()
naming_dict.add_object(music_object)
if isinstance(music_object, Song):
return self._download_song(music_object, naming_dict, process_metadata_anyway=process_metadata_anyway)
download_result: DownloadResult = DownloadResult()
for collection_name in music_object.DOWNWARDS_COLLECTION_STRING_ATTRIBUTES:
collection: Collection = getattr(music_object, collection_name)
sub_ordered_music_object: DatabaseObject
for sub_ordered_music_object in collection:
download_result.merge(self._download(sub_ordered_music_object, naming_dict.copy(), download_all,
skip_details=skip_next_details,
process_metadata_anyway=process_metadata_anyway))
return download_result
def _download_song(self, song: Song, naming_dict: NamingDict, process_metadata_anyway: bool = True):
if "genre" not in naming_dict and song.genre is not None:
naming_dict["genre"] = song.genre
if song.genre is None:
song.genre = naming_dict["genre"]
path_parts = Formatter().parse(main_settings["download_path"])
file_parts = Formatter().parse(main_settings["download_file"])
new_target = Target(
relative_to_music_dir=True,
file_path=Path(
main_settings["download_path"].format(**{part[1]: naming_dict[part[1]] for part in path_parts}),
main_settings["download_file"].format(**{part[1]: naming_dict[part[1]] for part in file_parts})
)
)
if song.target_collection.empty:
song.target_collection.append(new_target)
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
temp_target: Target = Target(
relative_to_music_dir=False,
file_path=Path(
main_settings["temp_directory"],
str(song.id)
)
)
r = DownloadResult(1)
found_on_disc = False
target: Target
for target in song.target_collection:
if target.exists:
if process_metadata_anyway:
target.copy_content(temp_target)
found_on_disc = True
r.found_on_disk += 1
r.add_target(target)
if found_on_disc and not process_metadata_anyway:
self.LOGGER.info(f"{song.option_string} already exists, thus not downloading again.")
return r
if not found_on_disc:
for source in sources:
r = self.download_song_to_target(source=source, target=temp_target, desc=song.option_string)
if not r.is_fatal_error:
break
if temp_target.exists:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=[] if found_on_disc else self.get_skip_intervals(song, source)
))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List) -> DownloadResult:
correct_codec(temp_target, interval_list=interval_list)
self.post_process_hook(song, temp_target)
write_metadata_to_target(song.metadata, temp_target, song)
r = DownloadResult()
target: Target
for target in song.target_collection:
if temp_target is not target:
temp_target.copy_content(target)
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
return r
# to download stuff
def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
return []

View File

@ -1,31 +1,22 @@
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import json
from enum import Enum
from bs4 import BeautifulSoup
import pycountry
from typing import List, Optional, Type
from urllib.parse import urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
Contact,
ID3Timestamp,
Lyrics,
FormattedText,
Artwork,
)
from ..connection import Connection
from ..utils.support_classes.download_result import DownloadResult
from ..utils.string_processing import clean_song_title
from ..utils.config import main_settings, logging_settings
from ..objects import (Album, Artist, ArtworkCollection, Contact,
DatabaseObject, FormattedText, ID3Timestamp, Label,
Lyrics, Song, Source, SourceType, Target)
from ..utils import dump_to_file
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG:
from ..utils import dump_to_file
@ -48,9 +39,7 @@ class BandcampTypes(Enum):
class Bandcamp(Page):
# CHANGE
SOURCE_TYPE = SourcePages.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
@ -62,8 +51,7 @@ class Bandcamp(Page):
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
parsed_url = urlparse(source.url)
path = parsed_url.path.replace("/", "")
path = source.parsed_url.path.replace("/", "")
if path == "" or path.startswith("music"):
return Artist
@ -118,7 +106,7 @@ class Bandcamp(Page):
return Song(
title=clean_song_title(name, artist_name=data["band_name"]),
source_list=source_list,
main_artist_list=[
artist_list=[
Artist(
name=data["band_name"],
source_list=[
@ -238,8 +226,13 @@ class Bandcamp(Page):
html_music_grid = soup.find("ol", {"id": "music-grid"})
if html_music_grid is not None:
for subsoup in html_music_grid.find_all("li"):
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
artist.album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
# artist artwork
artist_artwork: BeautifulSoup = soup.find("img", {"class":"band-photo"})
if artist_artwork is not None:
artist.artwork.add_data(artist_artwork.get("data-src", artist_artwork.get("src")))
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
data_blob = data_blob_soup["data-blob"]
@ -247,14 +240,14 @@ class Bandcamp(Page):
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
if data_blob is not None:
artist.main_album_collection.extend(
artist.album_collection.extend(
self._parse_artist_data_blob(json.loads(data_blob), source.url)
)
artist.source_collection.append(source)
return artist
def _parse_track_element(self, track: dict, artwork: Artwork) -> Optional[Song]:
def _parse_track_element(self, track: dict, artwork: ArtworkCollection) -> Optional[Song]:
lyrics_list: List[Lyrics] = []
_lyrics: Optional[str] = track.get("item", {}).get("recordingOf", {}).get("lyrics", {}).get("text")
@ -288,9 +281,15 @@ class Bandcamp(Page):
artist_source_list = []
if "@id" in artist_data:
artist_source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"])))
album = Album(
title=data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
source_list=source_list,
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
artist_list=[Artist(
name=artist_data["name"].strip(),
@ -298,7 +297,7 @@ class Bandcamp(Page):
)]
)
artwork: Artwork = Artwork()
artwork: ArtworkCollection = ArtworkCollection()
def _get_artwork_url(_data: dict) -> Optional[str]:
if "image" in _data:
@ -309,15 +308,14 @@ class Bandcamp(Page):
_artwork_url = _get_artwork_url(data)
if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350)
artwork.add_data(url=_artwork_url, width=350, height=350)
else:
for album_release in data.get("albumRelease", []):
_artwork_url = _get_artwork_url(album_release)
if _artwork_url is not None:
artwork.append(url=_artwork_url, width=350, height=350)
artwork.add_data(url=_artwork_url, width=350, height=350)
break
for i, track_json in enumerate(data.get("track", {}).get("itemListElement", [])):
if DEBUG:
dump_to_file(f"album_track_{i}.json", json.dumps(track_json), is_json=True, exit_after_dump=False)
@ -363,17 +361,29 @@ class Bandcamp(Page):
for key, value in other_data.get("trackinfo", [{}])[0].get("file", {"": None}).items():
mp3_url = value
source_list: List[Source] = [source]
if "mainEntityOfPage" in data or "@id" in data:
source_list.append(Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url))
source_list_album: List[Source] = [source]
if "@id" in album_data:
source_list_album.append(Source(self.SOURCE_TYPE, album_data["@id"]))
source_list_artist: List[Source] = [source]
if "@id" in artist_data:
source_list_artist.append(Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"])))
song = Song(
title=clean_song_title(data["name"], artist_name=artist_data["name"]),
source_list=[source, Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), audio_url=mp3_url)],
source_list=source_list,
album_list=[Album(
title=album_data["name"].strip(),
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
source_list=source_list_album
)],
main_artist_list=[Artist(
artist_list=[Artist(
name=artist_data["name"].strip(),
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
source_list=source_list_artist
)],
lyrics_list=self._fetch_lyrics(soup=soup)
)

View File

@ -7,7 +7,7 @@ from urllib.parse import urlparse, urlencode
from ..connection import Connection
from ..utils.config import logging_settings
from .abstract import Page
from ..utils.enums.source import SourcePages
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.enums.album import AlbumType
from ..utils.support_classes.query import Query
from ..objects import (
@ -52,14 +52,14 @@ def _song_from_json(artist_html=None, album_html=None, release_type=None, title=
return Song(
title=title,
main_artist_list=[
artist_list=[
_artist_from_json(artist_html=artist_html)
],
album_list=[
_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html)
],
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, song_id)
Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, song_id)
]
)
@ -85,7 +85,7 @@ def _artist_from_json(artist_html=None, genre=None, country=None) -> Artist:
return Artist(
name=artist_name,
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, artist_url)
Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, artist_url)
]
)
@ -105,7 +105,7 @@ def _album_from_json(album_html=None, release_type=None, artist_html=None) -> Al
title=album_name,
album_type=album_type,
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, album_url)
Source(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, album_url)
],
artist_list=[
_artist_from_json(artist_html=artist_html)
@ -207,7 +207,7 @@ def create_grid(
class EncyclopaediaMetallum(Page):
SOURCE_TYPE = SourcePages.ENCYCLOPAEDIA_METALLUM
SOURCE_TYPE = ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM
LOGGER = logging_settings["metal_archives_logger"]
def __init__(self, **kwargs):
@ -266,7 +266,7 @@ class EncyclopaediaMetallum(Page):
song_title = song.title.strip()
album_titles = ["*"] if song.album_collection.empty else [album.title.strip() for album in song.album_collection]
artist_titles = ["*"] if song.main_artist_collection.empty else [artist.name.strip() for artist in song.main_artist_collection]
artist_titles = ["*"] if song.artist_collection.empty else [artist.name.strip() for artist in song.artist_collection]
search_results = []
@ -663,7 +663,7 @@ class EncyclopaediaMetallum(Page):
artist.notes = band_notes
discography: List[Album] = self._fetch_artist_discography(artist_id)
artist.main_album_collection.extend(discography)
artist.album_collection.extend(discography)
return artist
@ -832,7 +832,7 @@ class EncyclopaediaMetallum(Page):
)
def get_source_type(self, source: Source):
if self.SOURCE_TYPE != source.page_enum:
if self.SOURCE_TYPE != source.source_type:
return None
url = source.url

View File

@ -0,0 +1,288 @@
import simplejson as json
from json_unescape import escape_json, unescape_json
from enum import Enum
from typing import List, Optional, Type
from urllib.parse import urlencode, urlparse, urlunparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from ..objects import (Album, Artist, ArtworkCollection, Contact,
DatabaseObject, FormattedText, ID3Timestamp, Label,
Lyrics, Song, Source, SourceType, Target)
from ..utils import dump_to_file, traverse_json_path
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.shared import DEBUG
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.download_result import DownloadResult
from .abstract import Page
if DEBUG:
from ..utils import dump_to_file
class Genius(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.GENIUS
HOST = "genius.com"
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://genius.com/",
logger=self.LOGGER,
module="genius",
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
path = source.parsed_url.path.replace("/", "")
if path.startswith("artists"):
return Artist
if path.startswith("albums"):
return Album
return Song
def add_to_artwork(self, artwork: ArtworkCollection, url: str):
if url is None:
return
url_frags = url.split(".")
if len(url_frags) < 2:
artwork.add_data(url=url)
return
dimensions = url_frags[-2].split("x")
if len(dimensions) < 2:
artwork.add_data(url=url)
return
if len(dimensions) == 3:
dimensions = dimensions[:-1]
try:
artwork.add_data(url=url, width=int(dimensions[0]), height=int(dimensions[1]))
except ValueError:
artwork.add_data(url=url)
def parse_api_object(self, data: dict) -> Optional[DatabaseObject]:
if data is None:
return None
object_type = data.get("_type")
artwork = ArtworkCollection()
self.add_to_artwork(artwork, data.get("header_image_url"))
self.add_to_artwork(artwork, data.get("image_url"))
additional_sources: List[Source] = []
source: Source = Source(self.SOURCE_TYPE, data.get("url"), additional_data={
"id": data.get("id"),
"slug": data.get("slug"),
"api_path": data.get("api_path"),
})
notes = FormattedText()
description = data.get("description") or {}
if "html" in description:
notes.html = description["html"]
elif "markdown" in description:
notes.markdown = description["markdown"]
elif "description_preview" in data:
notes.plaintext = data["description_preview"]
if source.url is None:
return None
if object_type == "artist":
if data.get("instagram_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.INSTAGRAM, f"https://www.instagram.com/{data['instagram_name']}/"))
if data.get("facebook_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.FACEBOOK, f"https://www.facebook.com/{data['facebook_name']}/"))
if data.get("twitter_name") is not None:
additional_sources.append(Source(ALL_SOURCE_TYPES.TWITTER, f"https://x.com/{data['twitter_name']}/"))
return Artist(
name=data["name"].strip() if data.get("name") is not None else None,
source_list=[source],
artwork=artwork,
notes=notes,
)
if object_type == "album":
self.add_to_artwork(artwork, data.get("cover_art_thumbnail_url"))
self.add_to_artwork(artwork, data.get("cover_art_url"))
for cover_art in data.get("cover_arts", []):
self.add_to_artwork(artwork, cover_art.get("image_url"))
self.add_to_artwork(artwork, cover_art.get("thumbnail_image_url"))
return Album(
title=data.get("name").strip(),
source_list=[source],
artist_list=[self.parse_api_object(data.get("artist"))],
artwork=artwork,
date=ID3Timestamp(**(data.get("release_date_components") or {})),
)
if object_type == "song":
self.add_to_artwork(artwork, data.get("song_art_image_thumbnail_url"))
self.add_to_artwork(artwork, data.get("song_art_image_url"))
main_artist_list = []
featured_artist_list = []
_artist_name = None
primary_artist = self.parse_api_object(data.get("primary_artist"))
if primary_artist is not None:
_artist_name = primary_artist.name
main_artist_list.append(primary_artist)
for feature_artist in (*(data.get("featured_artists") or []), *(data.get("producer_artists") or []), *(data.get("writer_artists") or [])):
artist = self.parse_api_object(feature_artist)
if artist is not None:
featured_artist_list.append(artist)
return Song(
title=clean_song_title(data.get("title"), artist_name=_artist_name),
source_list=[source],
artwork=artwork,
feature_artist_list=featured_artist_list,
artist_list=main_artist_list,
)
return None
def general_search(self, search_query: str, **kwargs) -> List[DatabaseObject]:
results = []
search_params = {
"q": search_query,
}
r = self.connection.get("https://genius.com/api/search/multi?" + urlencode(search_params), name=f"search_{search_query}")
if r is None:
return results
dump_to_file("search_genius.json", r.text, is_json=True, exit_after_dump=False)
data = r.json()
for elements in traverse_json_path(data, "response.sections", default=[]):
hits = elements.get("hits", [])
for hit in hits:
parsed = self.parse_api_object(hit.get("result"))
if parsed is not None:
results.append(parsed)
return results
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist: Artist = Artist()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return artist
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_artist.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
artist = self.parse_api_object(data.get("artist"))
for e in (data.get("artist_albums") or []):
r = self.parse_api_object(e)
if not isinstance(r, Album):
continue
artist.album_collection.append(r)
for e in (data.get("artist_songs") or []):
r = self.parse_api_object(e)
if not isinstance(r, Song):
continue
"""
TODO
fetch the album for these songs, because the api doesn't
return them
"""
artist.album_collection.extend(r.album_collection)
artist.source_collection.append(source)
return artist
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
album: Album = Album()
# https://genius.com/api/artists/24527/albums?page=1
r = self.connection.get(source.url, name=source.url)
if r is None:
return album
soup = self.get_soup_from_response(r)
# find the content attribute in the meta tag which is contained in the head
data_container = soup.find("meta", {"itemprop": "page_data"})
if data_container is not None:
content = data_container["content"]
dump_to_file("genius_itemprop_album.json", content, is_json=True, exit_after_dump=False)
data = json.loads(content)
album = self.parse_api_object(data.get("album"))
for e in data.get("album_appearances", []):
r = self.parse_api_object(e.get("song"))
if not isinstance(r, Song):
continue
album.song_collection.append(r)
album.source_collection.append(source)
return album
def get_json_content_from_response(self, response, start: str, end: str) -> Optional[str]:
content = response.text
start_index = content.find(start)
if start_index < 0:
return None
start_index += len(start)
end_index = content.find(end, start_index)
if end_index < 0:
return None
return content[start_index:end_index]
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
song: Song = Song()
r = self.connection.get(source.url, name=source.url)
if r is None:
return song
# get the contents that are between `JSON.parse('` and `');`
content = self.get_json_content_from_response(r, start="window.__PRELOADED_STATE__ = JSON.parse('", end="');\n window.__APP_CONFIG__ = ")
if content is not None:
#IMPLEMENT FIX FROM HAZEL
content = escape_json(content)
data = json.loads(content)
lyrics_html = traverse_json_path(data, "songPage.lyricsData.body.html", default=None)
if lyrics_html is not None:
song.lyrics_collection.append(Lyrics(FormattedText(html=lyrics_html)))
dump_to_file("genius_song_script_json.json", content, is_json=True, exit_after_dump=False)
soup = self.get_soup_from_response(r)
for lyrics in soup.find_all("div", {"data-lyrics-container": "true"}):
lyrics_object = Lyrics(FormattedText(html=lyrics.prettify()))
song.lyrics_collection.append(lyrics_object)
song.source_collection.append(source)
return song

View File

@ -1,34 +1,25 @@
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Type, Union, Generator
from typing import Any, Dict, Generator, List, Optional, Type, Union
from urllib.parse import urlparse
import pycountry
from bs4 import BeautifulSoup
from ..connection import Connection
from .abstract import Page
from ..utils.enums.source import SourcePages
from ..utils.enums.album import AlbumType, AlbumStatus
from ..objects import (
Artist,
Source,
Song,
Album,
ID3Timestamp,
FormattedText,
Label,
Target,
DatabaseObject,
Lyrics,
Artwork
)
from ..utils.config import logging_settings
from ..utils import string_processing, shared
from ..objects import (Album, Artist, DatabaseObject,
FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target)
from ..objects.artwork import (Artwork, ArtworkVariant, ArtworkCollection)
from ..utils import shared, string_processing
from ..utils.config import logging_settings, main_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.enums.album import AlbumStatus, AlbumType
from ..utils.string_processing import clean_song_title
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from .abstract import Page
"""
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
@ -111,9 +102,7 @@ def parse_url(url: str) -> MusifyUrl:
class Musify(Page):
# CHANGE
SOURCE_TYPE = SourcePages.MUSIFY
LOGGER = logging_settings["musify_logger"]
SOURCE_TYPE = ALL_SOURCE_TYPES.MUSIFY
HOST = "https://musify.club"
@ -361,7 +350,7 @@ class Musify(Page):
return Song(
title=clean_song_title(song_title, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
main_artist_list=artist_list,
feature_artist_list=artist_list,
source_list=source_list
)
@ -418,6 +407,10 @@ class Musify(Page):
href = artist_soup["href"]
if href is not None:
href_parts = href.split("/")
if len(href_parts) <= 1 or href_parts[-2] != "artist":
return
artist_src_list.append(Source(self.SOURCE_TYPE, self.HOST + href))
name_elem: BeautifulSoup = artist_soup.find("span", {"itemprop": "name"})
@ -455,17 +448,17 @@ class Musify(Page):
for album_info in soup.find_all("ul", {"class": "album-info"}):
list_element: BeautifulSoup = album_info.find("li")
if list_element is not None:
artist_soup: BeautifulSoup
for artist_soup in list_element.find_all("a"):
artist_source_list = []
href = artist_soup["href"]
if href is not None:
artist_source_list = [Source(self.SOURCE_TYPE, self.HOST + href)]
artist_list.append(Artist(
name=artist_soup.text.strip(),
source_list=artist_source_list
))
if list_element is not None:
artist_soup: BeautifulSoup
for artist_soup in list_element.find_all("a"):
artist_source_list = []
href = artist_soup["href"]
if href is not None:
artist_source_list = [Source(self.SOURCE_TYPE, self.HOST + href)]
artist_list.append(Artist(
name=artist_soup.text.strip(),
source_list=artist_source_list
))
# breadcrums
breadcrumb_list_element_list: List[BeautifulSoup] = soup.find_all("ol", {"class": "breadcrumb"})
@ -483,11 +476,11 @@ class Musify(Page):
track_name = list_points[4].text.strip()
# artwork
artwork: Artwork = Artwork()
# album artwork
artwork: ArtworkCollection = ArtworkCollection()
album_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class": "album-img"})
for album_image_element in album_image_element_list:
artwork.append(url=album_image_element.get("data-src", album_image_element.get("src")))
artwork.add_data(url=album_image_element.get("data-src", album_image_element.get("src")))
# lyrics
lyrics_container: List[BeautifulSoup] = soup.find_all("div", {"id": "tabLyrics"})
@ -500,9 +493,18 @@ class Musify(Page):
for video_container in video_container_list:
iframe_list: List[BeautifulSoup] = video_container.findAll("iframe")
for iframe in iframe_list:
"""
the url could look like this
https://www.youtube.com/embed/sNObCkhzOYA?si=dNVgnZMBNVlNb0P_
"""
parsed_url = urlparse(iframe["src"])
path_parts = parsed_url.path.strip("/").split("/")
if path_parts[0] != "embed" or len(path_parts) < 2:
continue
source_list.append(Source(
SourcePages.YOUTUBE,
iframe["src"],
ALL_SOURCE_TYPES.YOUTUBE,
f"https://music.youtube.com/watch?v={path_parts[1]}",
referrer_page=self.SOURCE_TYPE
))
@ -510,7 +512,7 @@ class Musify(Page):
title=clean_song_title(track_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
source_list=source_list,
lyrics_list=lyrics_list,
main_artist_list=artist_list,
feature_artist_list=artist_list,
album_list=album_list,
artwork=artwork,
)
@ -652,10 +654,111 @@ class Musify(Page):
return Song(
title=clean_song_title(song_name, artist_name=artist_list[0].name if len(artist_list) > 0 else None),
tracksort=tracksort,
main_artist_list=artist_list,
feature_artist_list=artist_list,
source_list=source_list
)
def _parse_album(self, soup: BeautifulSoup) -> Album:
name: str = None
source_list: List[Source] = []
artist_list: List[Artist] = []
date: ID3Timestamp = None
"""
if breadcrumb list has 4 elements, then
the -2 is the artist link,
the -1 is the album
"""
# breadcrumb
breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"})
breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"})
if len(breadcrumb_elements) == 4:
# album
album_crumb: BeautifulSoup = breadcrumb_elements[-1]
name = album_crumb.text.strip()
# artist
artist_crumb: BeautifulSoup = breadcrumb_elements[-2]
anchor: BeautifulSoup = artist_crumb.find("a")
if anchor is not None:
href = anchor.get("href")
href_parts = href.split("/")
if not(len(href_parts) <= 1 or href_parts[-2] != "artist"):
artist_source_list: List[Source] = []
if href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip()))
span: BeautifulSoup = anchor.find("span")
if span is not None:
artist_list.append(Artist(
name=span.get_text(strip=True),
source_list=artist_source_list
))
else:
self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case")
# meta
meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"})
if meta_url is not None:
url = meta_url.get("content")
if url is not None:
source_list.append(Source(self.SOURCE_TYPE, self.HOST + url))
meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"})
if meta_name is not None:
_name = meta_name.get("content")
if _name is not None:
name = _name
# album info
album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"})
if album_info_ul is not None:
artist_anchor: BeautifulSoup
for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}):
# line 98
artist_source_list: List[Source] = []
artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"})
if artist_url_meta is not None:
artist_href = artist_url_meta.get("content")
if artist_href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href))
artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"})
if artist_meta_name is not None:
artist_name = artist_meta_name.get("content")
if artist_name is not None:
artist_list.append(Artist(
name=artist_name,
source_list=artist_source_list
))
time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"})
if time_soup is not None:
raw_datetime = time_soup.get("datetime")
if raw_datetime is not None:
try:
date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d")
except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
# album artwork
album_artwork: ArtworkCollection = ArtworkCollection()
album_artwork_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for album_artwork in album_artwork_list:
album_artwork.add_data(url=album_artwork.get("data-src", album_artwork.get("src")))
return Album(
title=name,
source_list=source_list,
artist_list=artist_list,
date=date,
artwork=album_artwork
)
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
"""
fetches album from source:
@ -690,23 +793,20 @@ class Musify(Page):
new_song = self._parse_song_card(card_soup)
album.song_collection.append(new_song)
album.update_tracksort()
return album
def _get_artist_attributes(self, url: MusifyUrl) -> Artist:
def _fetch_initial_artist(self, url: MusifyUrl, source: Source, **kwargs) -> Artist:
"""
fetches the main Artist attributes from this endpoint
https://musify.club/artist/ghost-bath-280348?_pjax=#bodyContent
it needs to parse html
:param url:
:return:
"""
r = self.connection.get(f"https://musify.club/{url.source_type.value}/{url.name_with_id}?_pjax=#bodyContent", name="artist_attributes_" + url.name_with_id)
if r is None:
return Artist()
return Artist(source_list=[source])
soup = self.get_soup_from_response(r)
@ -814,14 +914,21 @@ class Musify(Page):
if note_soup is not None:
notes.html = note_soup.decode_contents()
# get artist profile artwork
main_artist_artwork: ArtworkCollection = ArtworkCollection()
artist_image_element_list: List[BeautifulSoup] = soup.find_all("img", {"class":"artist-img"})
for artist_image_element in artist_image_element_list:
main_artist_artwork.add_data(url=artist_image_element.get("data-src", artist_image_element.get("src")))
return Artist(
name=name,
country=country,
source_list=source_list,
notes=notes
notes=notes,
artwork=main_artist_artwork
)
def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None) -> Album:
def _parse_album_card(self, album_card: BeautifulSoup, artist_name: str = None, **kwargs) -> Album:
"""
<div class="card release-thumbnail" data-type="2">
<a href="/release/ghost-bath-self-loather-2021-1554266">
@ -845,46 +952,20 @@ class Musify(Page):
</div>
"""
_id: Optional[str] = None
name: str = None
source_list: List[Source] = []
timestamp: Optional[ID3Timestamp] = None
album_status = None
def set_name(new_name: str):
nonlocal name
nonlocal artist_name
# example of just setting not working:
# https://musify.club/release/unjoy-eurythmie-psychonaut-4-tired-numb-still-alive-2012-324067
if new_name.count(" - ") != 1:
name = new_name
return
potential_artist_list, potential_name = new_name.split(" - ")
unified_artist_list = string_processing.unify(potential_artist_list)
if artist_name is not None:
if string_processing.unify(artist_name) not in unified_artist_list:
name = new_name
return
name = potential_name
return
name = new_name
album_kwargs: Dict[str, Any] = {
"source_list": [],
}
album_status_id = album_card.get("data-type")
if album_status_id.isdigit():
album_status_id = int(album_status_id)
album_type = ALBUM_TYPE_MAP[album_status_id]
album_kwargs["album_type"] = ALBUM_TYPE_MAP[album_status_id]
if album_status_id == 5:
album_status = AlbumStatus.BOOTLEG
album_kwargs["album_status"] = AlbumStatus.BOOTLEG
def parse_release_anchor(_anchor: BeautifulSoup, text_is_name=False):
nonlocal _id
nonlocal name
nonlocal source_list
nonlocal album_kwargs
if _anchor is None:
return
@ -892,20 +973,13 @@ class Musify(Page):
href = _anchor.get("href")
if href is not None:
# add url to sources
source_list.append(Source(
album_kwargs["source_list"].append(Source(
self.SOURCE_TYPE,
self.HOST + href
))
# split id from url
split_href = href.split("-")
if len(split_href) > 1:
_id = split_href[-1]
if not text_is_name:
return
set_name(_anchor.text)
if text_is_name:
album_kwargs["title"] = clean_song_title(_anchor.text, artist_name)
anchor_list = album_card.find_all("a", recursive=False)
if len(anchor_list) > 0:
@ -916,7 +990,7 @@ class Musify(Page):
if thumbnail is not None:
alt = thumbnail.get("alt")
if alt is not None:
set_name(alt)
album_kwargs["title"] = clean_song_title(alt, artist_name)
image_url = thumbnail.get("src")
else:
@ -933,7 +1007,7 @@ class Musify(Page):
13.11.2021
</small>
"""
nonlocal timestamp
nonlocal album_kwargs
italic_tagging_soup: BeautifulSoup = small_soup.find("i")
if italic_tagging_soup is None:
@ -943,7 +1017,7 @@ class Musify(Page):
return
raw_time = small_soup.text.strip()
timestamp = ID3Timestamp.strptime(raw_time, "%d.%m.%Y")
album_kwargs["date"] = ID3Timestamp.strptime(raw_time, "%d.%m.%Y")
# parse small date
card_footer_list = album_card.find_all("div", {"class": "card-footer"})
@ -956,112 +1030,18 @@ class Musify(Page):
else:
self.LOGGER.debug("there is not even 1 footer in the album card")
return Album(
title=name,
source_list=source_list,
date=timestamp,
album_type=album_type,
album_status=album_status
)
return Album(**album_kwargs)
def _parse_album(self, soup: BeautifulSoup) -> Album:
name: str = None
source_list: List[Source] = []
artist_list: List[Artist] = []
date: ID3Timestamp = None
"""
if breadcrumb list has 4 elements, then
the -2 is the artist link,
the -1 is the album
"""
# breadcrumb
breadcrumb_soup: BeautifulSoup = soup.find("ol", {"class", "breadcrumb"})
breadcrumb_elements: List[BeautifulSoup] = breadcrumb_soup.find_all("li", {"class": "breadcrumb-item"})
if len(breadcrumb_elements) == 4:
# album
album_crumb: BeautifulSoup = breadcrumb_elements[-1]
name = album_crumb.text.strip()
# artist
artist_crumb: BeautifulSoup = breadcrumb_elements[-2]
anchor: BeautifulSoup = artist_crumb.find("a")
if anchor is not None:
href = anchor.get("href")
artist_source_list: List[Source] = []
if href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, self.HOST + href.strip()))
span: BeautifulSoup = anchor.find("span")
if span is not None:
artist_list.append(Artist(
name=span.get_text(strip=True),
source_list=artist_source_list
))
else:
self.LOGGER.debug("there are not 4 breadcrumb items, which shouldn't be the case")
# meta
meta_url: BeautifulSoup = soup.find("meta", {"itemprop": "url"})
if meta_url is not None:
url = meta_url.get("content")
if url is not None:
source_list.append(Source(self.SOURCE_TYPE, self.HOST + url))
meta_name: BeautifulSoup = soup.find("meta", {"itemprop": "name"})
if meta_name is not None:
_name = meta_name.get("content")
if _name is not None:
name = _name
# album info
album_info_ul: BeautifulSoup = soup.find("ul", {"class": "album-info"})
if album_info_ul is not None:
artist_anchor: BeautifulSoup
for artist_anchor in album_info_ul.find_all("a", {"itemprop": "byArtist"}):
# line 98
artist_source_list: List[Source] = []
artist_url_meta = artist_anchor.find("meta", {"itemprop": "url"})
if artist_url_meta is not None:
artist_href = artist_url_meta.get("content")
if artist_href is not None:
artist_source_list.append(Source(self.SOURCE_TYPE, url=self.HOST + artist_href))
artist_meta_name = artist_anchor.find("meta", {"itemprop": "name"})
if artist_meta_name is not None:
artist_name = artist_meta_name.get("content")
if artist_name is not None:
artist_list.append(Artist(
name=artist_name,
source_list=artist_source_list
))
time_soup: BeautifulSoup = album_info_ul.find("time", {"itemprop": "datePublished"})
if time_soup is not None:
raw_datetime = time_soup.get("datetime")
if raw_datetime is not None:
try:
date = ID3Timestamp.strptime(raw_datetime, "%Y-%m-%d")
except ValueError:
self.LOGGER.debug(f"Raw datetime doesn't match time format %Y-%m-%d: {raw_datetime}")
return Album(
title=name,
source_list=source_list,
artist_list=artist_list,
date=date
)
def _get_discography(self, url: MusifyUrl, artist_name: str = None, stop_at_level: int = 1) -> Generator[Album, None, None]:
def _fetch_artist_discography(self, artist: Artist, url: MusifyUrl, artist_name: str = None, **kwargs):
"""
POST https://musify.club/artist/filteralbums
ArtistID: 280348
SortOrder.Property: dateCreated
SortOrder.IsAscending: false
X-Requested-With: XMLHttpRequest
ArtistID: 280348
SortOrder.Property: dateCreated
SortOrder.IsAscending: false
X-Requested-With: XMLHttpRequest
"""
_download_all = kwargs.get("download_all", False)
_album_type_blacklist = kwargs.get("album_type_blacklist", main_settings["album_type_blacklist"])
endpoint = self.HOST + "/" + url.source_type.value + "/filteralbums"
@ -1072,34 +1052,40 @@ class Musify(Page):
"X-Requested-With": "XMLHttpRequest"
}, name="discography_" + url.name_with_id)
if r is None:
return []
soup: BeautifulSoup = BeautifulSoup(r.content, features="html.parser")
return
soup: BeautifulSoup = self.get_soup_from_response(r)
for card_soup in soup.find_all("div", {"class": "card"}):
yield self._parse_album_card(card_soup, artist_name)
album = self._parse_album_card(card_soup, artist_name, **kwargs)
if not self.fetch_options.download_all and album.album_type in self.fetch_options.album_type_blacklist:
continue
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
artist.album_collection.append(album)
def _fetch_artist_artwork(self, source: str, artist: Artist, **kwargs):
# artist artwork
artwork_gallery = self.get_soup_from_response(self.connection.get(source.strip().strip("/") + "/photos"))
if artwork_gallery is not None:
gallery_body_content: BeautifulSoup = artwork_gallery.find(id="bodyContent")
gallery_image_element_list: List[BeautifulSoup] = gallery_body_content.find_all("img")
for gallery_image_element in gallery_image_element_list:
artist.artwork.append(ArtworkVariant(url=gallery_image_element.get("data-src", gallery_image_element.get("src")), width=247, heigth=247))
def fetch_artist(self, source: Source, **kwargs) -> Artist:
"""
fetches artist from source
TODO
[x] discography
[x] attributes
[] picture gallery
Args:
source (Source): the source to fetch
stop_at_level: int = 1: if it is false, every album from discograohy will be fetched. Defaults to False.
Returns:
Artist: the artist fetched
[x] picture gallery
"""
url = parse_url(source.url)
artist = self._get_artist_attributes(url)
artist.main_album_collection.extend(self._get_discography(url, artist.name))
artist = self._fetch_initial_artist(url, source=source, **kwargs)
self._fetch_artist_discography(artist, url, artist.name, **kwargs)
self._fetch_artist_artwork(url.url, artist, **kwargs)
return artist
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:

View File

@ -1,65 +0,0 @@
from typing import List, Optional, Type
from urllib.parse import urlparse
import logging
from ..objects import Source, DatabaseObject
from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target
)
from ..connection import Connection
from ..utils.support_classes.query import Query
from ..utils.support_classes.download_result import DownloadResult
class Preset(Page):
# CHANGE
SOURCE_TYPE = SourcePages.PRESET
LOGGER = logging.getLogger("preset")
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host="https://www.preset.cum/",
logger=self.LOGGER
)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
return []
def label_search(self, label: Label) -> List[Label]:
return []
def artist_search(self, artist: Artist) -> List[Artist]:
return []
def album_search(self, album: Album) -> List[Album]:
return []
def song_search(self, song: Song) -> List[Song]:
return []
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song()
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album()
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
return Artist()
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult()

View File

@ -9,7 +9,6 @@ from .abstract import Page
from ..objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
@ -19,6 +18,7 @@ from ..objects import (
)
from ..connection import Connection
from ..utils.string_processing import clean_song_title
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.support_classes.download_result import DownloadResult
from ..utils.config import youtube_settings, main_settings, logging_settings
@ -39,10 +39,7 @@ def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: s
class YouTube(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
NO_ADDITIONAL_DATA_FROM_SONG = True
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
@ -146,7 +143,7 @@ class YouTube(SuperYouTube):
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" ),
main_artist_list=artist_list
artist_list=artist_list
), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
@ -287,7 +284,7 @@ class YouTube(SuperYouTube):
self.LOGGER.warning(f"didn't found any playlists with piped, falling back to invidious. (it is unusual)")
album_list, artist_name = self.fetch_invidious_album_list(parsed.id)
return Artist(name=artist_name, main_album_list=album_list, source_list=[source])
return Artist(name=artist_name, album_list=album_list, source_list=[source])
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
"""

View File

@ -7,7 +7,6 @@ from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
@ -59,6 +58,19 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
song.album_collection.extend(album_list)
return [song]
if len(album_list) == 1:
album = album_list[0]
album.artist_collection.extend(artist_list)
album.song_collection.extend(song_list)
return [album]
"""
if len(artist_list) == 1:
artist = artist_list[0]
artist.main_album_collection.extend(album_list)
return [artist]
"""
return results

View File

@ -3,12 +3,13 @@ from enum import Enum
from ...utils.config import youtube_settings, logging_settings
from ...utils.string_processing import clean_song_title
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
from ...objects import Source, DatabaseObject
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
@ -18,7 +19,7 @@ from ...objects import (
LOGGER = logging_settings["youtube_music_logger"]
SOURCE_PAGE = SourcePages.YOUTUBE_MUSIC
SOURCE_PAGE = ALL_SOURCE_TYPES.YOUTUBE
class PageType(Enum):

View File

@ -10,7 +10,6 @@ from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
@ -21,6 +20,7 @@ from ...objects import (
from ...connection import Connection
from ...utils.support_classes.download_result import DownloadResult
from ...utils.config import youtube_settings, logging_settings, main_settings
from ...utils.enums import SourceType, ALL_SOURCE_TYPES
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
@ -50,7 +50,7 @@ class YouTubeUrl:
"""
def __init__(self, url: str) -> None:
self.SOURCE_TYPE = SourcePages.YOUTUBE
self.SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
"""
Raises Index exception for wrong url, and value error for not found enum type
@ -58,9 +58,6 @@ class YouTubeUrl:
self.id = ""
parsed = urlparse(url=url)
if parsed.netloc == "music.youtube.com":
self.SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
self.url_type: YouTubeUrlType
type_frag_list = parsed.path.split("/")
@ -124,8 +121,7 @@ class YouTubeUrl:
class SuperYouTube(Page):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE
LOGGER = logging_settings["youtube_logger"]
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
NO_ADDITIONAL_DATA_FROM_SONG = False
@ -145,6 +141,8 @@ class SuperYouTube(Page):
_sponsorblock_connection: Connection = Connection()
self.sponsorblock = python_sponsorblock.SponsorBlock(silent=True, session=_sponsorblock_connection.session)
super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
_url_type = {
YouTubeUrlType.CHANNEL: Artist,

View File

@ -1,44 +1,33 @@
from __future__ import unicode_literals, annotations
from __future__ import annotations, unicode_literals
from typing import Dict, List, Optional, Set, Type
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
import json
import logging
import random
import json
from dataclasses import dataclass
import re
from functools import lru_cache
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Set, Type
from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
import youtube_dl
from youtube_dl.extractor.youtube import YoutubeIE
from youtube_dl.utils import DownloadError
from ...connection import Connection
from ...objects import Album, Artist, ArtworkCollection
from ...objects import DatabaseObject as DataObject
from ...objects import (FormattedText, ID3Timestamp, Label, Lyrics, Song,
Source, Target)
from ...utils import dump_to_file, get_current_millis, traverse_json_path
from ...utils.config import logging_settings, main_settings, youtube_settings
from ...utils.enums import ALL_SOURCE_TYPES, SourceType
from ...utils.enums.album import AlbumType
from ...utils.exception.config import SettingValueError
from ...utils.config import main_settings, youtube_settings, logging_settings
from ...utils.shared import DEBUG, DEBUG_YOUTUBE_INITIALIZING
from ...utils.string_processing import clean_song_title
from ...utils import get_current_millis, traverse_json_path
from ...utils import dump_to_file
from ...objects import Source, DatabaseObject, ID3Timestamp, Artwork
from ..abstract import Page
from ...objects import (
Artist,
Source,
SourcePages,
Song,
Album,
Label,
Target,
Lyrics,
FormattedText
)
from ...connection import Connection
from ...utils.enums.album import AlbumType
from ...utils.support_classes.download_result import DownloadResult
from ..abstract import Page
from ._list_render import parse_renderer
from ._music_object_render import parse_run_element
from .super_youtube import SuperYouTube
@ -176,8 +165,7 @@ ALBUM_TYPE_MAP = {
class YoutubeMusic(SuperYouTube):
# CHANGE
SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
LOGGER = logging_settings["youtube_music_logger"]
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
def __init__(self, *args, ydl_opts: dict = None, **kwargs):
self.yt_music_connection: YoutubeMusicConnection = YoutubeMusicConnection(
@ -193,8 +181,7 @@ class YoutubeMusic(SuperYouTube):
self.start_millis = get_current_millis()
if self.credentials.api_key == "" or DEBUG_YOUTUBE_INITIALIZING:
self._fetch_from_main_page()
self._fetch_from_main_page()
SuperYouTube.__init__(self, *args, **kwargs)
@ -215,6 +202,8 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url: dict = {}
self.not_download: Dict[str, DownloadError] = {}
super().__init__(*args, **kwargs)
def _fetch_from_main_page(self):
"""
===API=KEY===
@ -347,10 +336,10 @@ class YoutubeMusic(SuperYouTube):
default='{}'
)) or {}
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
def get_source_type(self, source: Source) -> Optional[Type[DataObject]]:
return super().get_source_type(source)
def general_search(self, search_query: str) -> List[DatabaseObject]:
def general_search(self, search_query: str) -> List[DataObject]:
search_query = search_query.strip()
urlescaped_query: str = quote(search_query.strip().replace(" ", "+"))
@ -436,6 +425,7 @@ class YoutubeMusic(SuperYouTube):
data: dict = r.json()
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
musicImmersiveHeaderRenderer = header.get("musicImmersiveHeaderRenderer", {})
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@ -448,6 +438,11 @@ class YoutubeMusic(SuperYouTube):
renderer_list = r.json().get("contents", {}).get("singleColumnBrowseResultsRenderer", {}).get("tabs", [{}])[
0].get("tabRenderer", {}).get("content", {}).get("sectionListRenderer", {}).get("contents", [])
# fetch artist artwork
artist_thumbnails = musicImmersiveHeaderRenderer.get("thumbnail", {}).get("musicThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for artist_thumbnail in artist_thumbnails:
artist.artwork.append(artist_thumbnail)
if DEBUG:
for i, content in enumerate(renderer_list):
dump_to_file(f"{i}-artists-renderer.json", json.dumps(content), is_json=True, exit_after_dump=False)
@ -494,7 +489,12 @@ class YoutubeMusic(SuperYouTube):
# album details
header = data.get("header", {})
musicDetailHeaderRenderer = header.get("musicDetailHeaderRenderer", {})
# album artwork
album_thumbnails = musicDetailHeaderRenderer.get("thumbnail", {}).get("croppedSquareThumbnailRenderer", {}).get("thumbnail", {}).get("thumbnails", {})
for album_thumbnail in album_thumbnails:
album.artwork.append(value=album_thumbnail)
title_runs: List[dict] = musicDetailHeaderRenderer.get("title", {}).get("runs", [])
subtitle_runs: List[dict] = musicDetailHeaderRenderer.get("subtitle", {}).get("runs", [])
@ -547,6 +547,11 @@ class YoutubeMusic(SuperYouTube):
return album
def fetch_lyrics(self, video_id: str, playlist_id: str = None) -> str:
"""
1. fetches the tabs of a song, to get the browse id
2. finds the browse id of the lyrics
3. fetches the lyrics with the browse id
"""
request_data = {
"context": {**self.credentials.context, "adSignalsInfo": {"params": []}},
"videoId": video_id,
@ -573,7 +578,8 @@ class YoutubeMusic(SuperYouTube):
pageType = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseEndpointContextSupportedConfigs.browseEndpointContextMusicConfig.pageType", default="")
if pageType in ("MUSIC_TAB_TYPE_LYRICS", "MUSIC_PAGE_TYPE_TRACK_LYRICS") or "lyrics" in pageType.lower():
browse_id = traverse_json_path(tab, "tabRenderer.endpoint.browseEndpoint.browseId", default=None)
break
if browse_id is not None:
break
if browse_id is None:
return None
@ -587,6 +593,8 @@ class YoutubeMusic(SuperYouTube):
},
name=f"fetch_song_lyrics_{video_id}.json"
)
if r is None:
return None
dump_to_file(f"fetch_song_lyrics_{video_id}.json", r.text, is_json=True, exit_after_dump=False)
@ -618,7 +626,7 @@ class YoutubeMusic(SuperYouTube):
Artist(
name=name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
self.SOURCE_TYPE,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
) for name in artist_names]
@ -636,10 +644,10 @@ class YoutubeMusic(SuperYouTube):
note=ydl_res.get("descriptions"),
album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=artist_list,
artwork=ArtworkCollection(*ydl_res.get("thumbnails", [])),
artist_list=artist_list,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
self.SOURCE_TYPE,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
), source],
)
@ -675,7 +683,7 @@ class YoutubeMusic(SuperYouTube):
for album in song.album_list:
album.album_type = AlbumType.LIVE_ALBUM
for thumbnail in video_details.get("thumbnails", []):
song.artwork.append(**thumbnail)
song.artwork.add_data(**thumbnail)
song.lyrics_collection.append(self.fetch_lyrics(browse_id, playlist_id=request_data.get("playlistId")))
@ -717,7 +725,6 @@ class YoutubeMusic(SuperYouTube):
self.download_values_by_url[source.url] = {
"url": _best_format.get("url"),
"chunk_size": _best_format.get("downloader_options", {}).get("http_chunk_size", main_settings["chunk_size"]),
"headers": _best_format.get("http_headers", {}),
}
@ -736,8 +743,9 @@ class YoutubeMusic(SuperYouTube):
raw_headers=True,
disable_cache=True,
headers=media.get("headers", {}),
# chunk_size=media.get("chunk_size", main_settings["chunk_size"]),
chunk_size=main_settings["chunk_size"],
method="GET",
timeout=5,
)
else:
result = DownloadResult(error_message=str(media.get("error") or self.not_download[source.hash_url]))

View File

@ -1,15 +1,18 @@
from datetime import datetime
from pathlib import Path
import inspect
import json
import logging
import inspect
from typing import List, Union
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import Any, List, Union
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args
from .path_manager import LOCATIONS
from .shared import (DEBUG, DEBUG_DUMP, DEBUG_LOGGING, DEBUG_OBJECT_TRACE,
DEBUG_OBJECT_TRACE_CALLSTACK, DEBUG_TRACE, URL_PATTERN)
from .string_processing import hash_url, is_url, unify
"""
IO functions
@ -19,8 +22,13 @@ def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str):
msg = str(msg)
endc = BColors.ENDC.value
if color is BColors.ENDC:
return msg
msg = msg.replace(BColors.ENDC.value, BColors.ENDC.value + color.value)
return color.value + msg + BColors.ENDC.value
@ -120,4 +128,35 @@ def get_current_millis() -> int:
def get_unix_time() -> int:
return int(datetime.now().timestamp())
return int(datetime.now().timestamp())
@lru_cache
def custom_hash(value: Any) -> int:
if is_url(value):
value = hash_url(value)
elif isinstance(value, str):
try:
value = int(value)
except ValueError:
value = unify(value)
return hash(value)
def create_dataclass_instance(t, data: dict):
"""Creates an instance of a dataclass with the given data.
It filters out all data key, which has no attribute in the dataclass.
Args:
t (Type): The dataclass type class
data (dict): the attribute to pass into the constructor
Returns:
Tuple[Type, dict]: The created instance and a dict, containing the data, which was not used in the creation
"""
needed_data = {k: v for k, v in data.items() if k in t.__dataclass_fields__}
removed_data = {k: v for k, v in data.items() if k not in t.__dataclass_fields__}
return t(**needed_data), removed_data

View File

@ -1,11 +1,8 @@
from typing import Tuple
from .config import Config
from .config_files import (
main_config,
logging_config,
youtube_config,
)
from .config_files import main_config, logging_config, youtube_config
_sections: Tuple[Config, ...] = (
main_config.config,

View File

@ -18,8 +18,9 @@ config = Config((
AudioFormatAttribute(name="audio_format", default_value="mp3", description="""Music Kraken will stream the audio into this format.
You can use Audio formats which support ID3.2 and ID3.1,
but you will have cleaner Metadata using ID3.2."""),
Attribute(name="image_format", default_value="jpeg", description="This Changes the format in which images are getting downloaded"),
Attribute(name="result_history", default_value=False, description="""If enabled, you can go back to the previous results.
Attribute(name="result_history", default_value=True, description="""If enabled, you can go back to the previous results.
The consequence is a higher meory consumption, because every result is saved."""),
Attribute(name="history_length", default_value=8, description="""You can choose how far back you can go in the result history.
The further you choose to be able to go back, the higher the memory usage.
@ -28,6 +29,7 @@ The further you choose to be able to go back, the higher the memory usage.
EmptyLine(),
Attribute(name="preferred_artwork_resolution", default_value=1000),
Attribute(name="download_artist_artworks", default_value=True, description="Enables the fetching of artist galleries."),
EmptyLine(),
@ -44,6 +46,7 @@ This means for example, the Studio Albums and EP's are always in front of Single
- album_type
The folder music kraken should put the songs into."""),
Attribute(name="download_file", default_value="{song}.{audio_format}", description="The filename of the audio file."),
Attribute(name="artist_artwork_path", default_value="{genre}/{artist}/{artist}_{image_number}.{image_format}", description="The Path to download artist images to."),
SelectAttribute(name="album_type_blacklist", default_value=[
"Compilation Album",
"Live Album",
@ -152,10 +155,13 @@ class SettingsStructure(TypedDict):
# artwork
preferred_artwork_resolution: int
image_format: str
download_artist_artworks: bool
# paths
music_directory: Path
temp_directory: Path
artist_artwork_path: Path
log_file: Path
not_a_genre_regex: List[str]
ffmpeg_binary: Path

View File

@ -1 +1,128 @@
from .source import SourcePages
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Optional, Type
from mutagen.id3 import PictureType
if TYPE_CHECKING:
from ...pages.abstract import Page
@dataclass
class SourceType:
name: str
homepage: Optional[str] = None
download_priority: int = 0
page_type: Type[Page] = None
page: Page = None
def register_page(self, page: Page):
self.page = page
def __hash__(self):
return hash(self.name)
@property
def has_page(self) -> bool:
return self.page is not None
# for backwards compatibility
@property
def value(self) -> str:
return self.name
class ALL_SOURCE_TYPES:
YOUTUBE = SourceType(name="youtube", homepage="https://music.youtube.com/")
BANDCAMP = SourceType(name="bandcamp", homepage="https://bandcamp.com/", download_priority=10)
MUSIFY = SourceType(name="musify", homepage="https://musify.club/", download_priority=7)
GENIUS = SourceType(name="genius", homepage="https://genius.com/")
MUSICBRAINZ = SourceType(name="musicbrainz", homepage="https://musicbrainz.org/")
ENCYCLOPAEDIA_METALLUM = SourceType(name="encyclopaedia metallum")
DEEZER = SourceType(name="deezer", homepage="https://www.deezer.com/")
SPOTIFY = SourceType(name="spotify", homepage="https://open.spotify.com/")
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = SourceType(name="wikipedia", homepage="https://en.wikipedia.org/wiki/Main_Page")
INSTAGRAM = SourceType(name="instagram", homepage="https://www.instagram.com/")
FACEBOOK = SourceType(name="facebook", homepage="https://www.facebook.com/")
TWITTER = SourceType(name="twitter", homepage="https://twitter.com/")
# Yes somehow this ancient site is linked EVERYWHERE
MYSPACE = SourceType(name="myspace", homepage="https://myspace.com/")
MANUAL = SourceType(name="manual")
PRESET = SourceType(name="preset")
class PictureType(Enum):
"""Enumeration of image types defined by the ID3 standard for the APIC
frame, but also reused in WMA/FLAC/VorbisComment.
This is copied from mutagen.id3.PictureType
"""
OTHER = 0
FILE_ICON = 1
"""32x32 pixels 'file icon' (PNG only)"""
OTHER_FILE_ICON = 2
"""Other file icon"""
COVER_FRONT = 3
"""Cover (front)"""
COVER_BACK = 4
"""Cover (back)"""
LEAFLET_PAGE = 5
"""Leaflet page"""
MEDIA = 6
"""Media (e.g. label side of CD)"""
LEAD_ARTIST = 7
"""Lead artist/lead performer/soloist"""
ARTIST = 8
"""Artist/performer"""
CONDUCTOR = 9
"""Conductor"""
BAND = 10
"""Band/Orchestra"""
COMPOSER = 11
"""Composer"""
LYRICIST = 12
"""Lyricist/text writer"""
RECORDING_LOCATION = 13
"""Recording Location"""
DURING_RECORDING = 14
"""During recording"""
DURING_PERFORMANCE = 15
"""During performance"""
SCREEN_CAPTURE = 16
"""Movie/video screen capture"""
FISH = 17
"""A bright colored fish"""
ILLUSTRATION = 18
"""Illustration"""
BAND_LOGOTYPE = 19
"""Band/artist logotype"""
PUBLISHER_LOGOTYPE = 20
"""Publisher/Studio logotype"""

View File

@ -1,40 +0,0 @@
from enum import Enum
class SourceTypes(Enum):
SONG = "song"
ALBUM = "album"
ARTIST = "artist"
LYRICS = "lyrics"
class SourcePages(Enum):
YOUTUBE = "youtube", "https://www.youtube.com/"
MUSIFY = "musify", "https://musify.club/"
YOUTUBE_MUSIC = "youtube music", "https://music.youtube.com/"
GENIUS = "genius", "https://genius.com/"
MUSICBRAINZ = "musicbrainz", "https://musicbrainz.org/"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp", "https://bandcamp.com/"
DEEZER = "deezer", "https://www.deezer.com/"
SPOTIFY = "spotify", "https://open.spotify.com/"
# This has nothing to do with audio, but bands can be here
WIKIPEDIA = "wikipedia", "https://en.wikipedia.org/wiki/Main_Page"
INSTAGRAM = "instagram", "https://www.instagram.com/"
FACEBOOK = "facebook", "https://www.facebook.com/"
TWITTER = "twitter", "https://twitter.com/"
MYSPACE = "myspace", "https://myspace.com/" # Yes somehow this ancient site is linked EVERYWHERE
MANUAL = "manual", ""
PRESET = "preset", ""
def __new__(cls, value, homepage = None):
member = object.__new__(cls)
member._value_ = value
member.homepage = homepage
return member

View File

@ -1 +1,23 @@
__all__ = ["config"]
class MKBaseException(Exception):
def __init__(self, message: str = None, **kwargs) -> None:
self.message = message
super().__init__(message, **kwargs)
# Downloading
class MKDownloadException(MKBaseException):
pass
class MKMissingNameException(MKDownloadException):
pass
# Frontend
class MKFrontendException(MKBaseException):
pass
class MKInvalidInputException(MKFrontendException):
pass

View File

@ -19,7 +19,7 @@ DEBUG_OBJECT_TRACE = DEBUG and False
DEBUG_OBJECT_TRACE_CALLSTACK = DEBUG_OBJECT_TRACE and False
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
DEBUG_PAGES = DEBUG and False
DEBUG_DUMP = DEBUG and False
DEBUG_DUMP = DEBUG and True
DEBUG_PRINT_ID = DEBUG and True
if DEBUG:

View File

@ -1,13 +1,15 @@
from typing import Tuple, Union, Optional
from pathlib import Path
import re
import string
from functools import lru_cache
from pathlib import Path
from typing import Any, Optional, Tuple, Union
from urllib.parse import ParseResult, parse_qs, urlparse
from transliterate.exceptions import LanguageDetectionError
from transliterate import translit
from pathvalidate import sanitize_filename
from urllib.parse import urlparse, ParseResult, parse_qs
from transliterate import translit
from transliterate.exceptions import LanguageDetectionError
from .shared import URL_PATTERN
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
"(official video)",
@ -116,10 +118,13 @@ def clean_song_title(raw_song_title: str, artist_name: Optional[str] = None) ->
# Remove artist from the start of the title
if raw_song_title.lower().startswith(artist_name.lower()):
raw_song_title = raw_song_title[len(artist_name):].strip()
if raw_song_title.startswith("-"):
raw_song_title = raw_song_title[1:].strip()
possible_new_name = raw_song_title[len(artist_name):].strip()
for char in ("-", "", ":", "|"):
if possible_new_name.startswith(char):
raw_song_title = possible_new_name[1:].strip()
break
return raw_song_title.strip()
@ -226,3 +231,13 @@ def shorten_display_url(url: str, max_length: int = 150, chars_at_end: int = 4,
return url
return url[:max_length] + shorten_string + url[-chars_at_end:]
def is_url(value: Any) -> bool:
if isinstance(value, ParseResult):
return True
if not isinstance(value, str):
return True
# value has to be a string
return re.match(URL_PATTERN, value) is not None

View File

@ -1,9 +1,13 @@
from dataclasses import dataclass, field
from typing import List, Tuple
from __future__ import annotations
from ...utils.config import main_settings, logging_settings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, List, Tuple
if TYPE_CHECKING:
from ...objects import Target
from ...utils.config import logging_settings, main_settings
from ...utils.enums.colors import BColors
from ...objects import Target
UNIT_PREFIXES: List[str] = ["", "k", "m", "g", "t"]
UNIT_DIVISOR = 1024

View File

@ -24,7 +24,7 @@ class Query:
return [self.music_object.name]
if isinstance(self.music_object, Song):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.main_artist_collection]
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]
if isinstance(self.music_object, Album):
return [f"{artist.name} - {self.music_object}" for artist in self.music_object.artist_collection]

View File

@ -69,7 +69,7 @@ dependencies = [
"toml~=0.10.2",
"typing_extensions~=4.7.1",
"python-sponsorblock~=0.0.0",
"python-sponsorblock~=0.1",
"youtube_dl",
]
dynamic = [

View File

@ -3,96 +3,98 @@ import unittest
from music_kraken.objects import Song, Album, Artist, Collection, Country
class TestCollection(unittest.TestCase):
@staticmethod
def complicated_object() -> Artist:
return Artist(
name="artist",
country=Country.by_alpha_2("DE"),
main_album_list=[
Album(
title="album",
song_list=[
Song(
title="song",
album_list=[
Album(title="album", albumsort=123),
],
),
Song(
title="other_song",
album_list=[
Album(title="album", albumsort=423),
],
),
]
),
Album(title="album", barcode="1234567890123"),
def test_song_contains_album(self):
"""
Tests that every song contains the album it is added to in its album_collection
"""
a_1 = Album(
title="album",
song_list= [
Song(title="song"),
]
)
a_2 = a_1.song_collection[0].album_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_song_album_relation(self):
def test_album_contains_song(self):
"""
Tests that
album = album.any_song.one_album
is the same object
Tests that every album contains the song it is added to in its song_collection
"""
s_1 = Song(
title="song",
album_list=[
Album(title="album"),
]
)
s_2 = s_1.album_collection[0].song_collection[0]
self.assertTrue(s_1.id == s_2.id)
def test_auto_add_artist_to_album_feature_artist(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default
"""
a = self.complicated_object().main_album_collection[0]
b = a.song_collection[0].album_collection[0]
c = a.song_collection[1].album_collection[0]
d = b.song_collection[0].album_collection[0]
e = d.song_collection[0].album_collection[0]
f = e.song_collection[0].album_collection[0]
g = f.song_collection[0].album_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id == e.id == f.id == g.id)
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "album")
self.assertTrue(a.barcode == b.barcode == c.barcode == d.barcode == e.barcode == f.barcode == g.barcode == "1234567890123")
self.assertTrue(a.albumsort == b.albumsort == c.albumsort == d.albumsort == e.albumsort == f.albumsort == g.albumsort == 123)
d.title = "new_title"
self.assertTrue(a.title == b.title == c.title == d.title == e.title == f.title == g.title == "new_title")
def test_album_artist_relation(self):
"""
Tests that
artist = artist.any_album.any_song.one_artist
is the same object
"""
a = self.complicated_object()
b = a.main_album_collection[0].artist_collection[0]
c = b.main_album_collection[0].artist_collection[0]
d = c.main_album_collection[0].artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
def test_artist_artist_relation(self):
artist = Artist(
a_1 = Artist(
name="artist",
main_album_list=[
album_list=[
Album(title="album")
]
)
a_2 = a_1.album_collection[0].feature_artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_auto_add_artist_to_album_feature_artist_push(self):
"""
Tests that every artist is added to the album's feature_artist_collection per default but pulled into the album's artist_collection if a merge exitst
"""
a_1 = Artist(
name="artist",
album_list=[
Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
]
)
a_2 = a_1.album_collection[0].artist_collection[0]
self.assertTrue(artist.id == artist.main_album_collection[0].song_collection[0].main_artist_collection[0].id)
self.assertTrue(a_1.id == a_2.id)
def test_artist_artist_relation(self):
"""
Tests the proper syncing between album.artist_collection and song.artist_collection
"""
album = Album(
title="album",
song_list=[
Song(title="song"),
],
artist_list=[
Artist(name="artist"),
]
)
a_1 = album.artist_collection[0]
a_2 = album.song_collection[0].artist_collection[0]
self.assertTrue(a_1.id == a_2.id)
def test_artist_collection_sync(self):
"""
tests the actual implementation of the test above
"""
album_1 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
Song(title="song", artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
@ -102,7 +104,7 @@ class TestCollection(unittest.TestCase):
album_2 = Album(
title="album",
song_list=[
Song(title="song", main_artist_list=[Artist(name="artist")]),
Song(title="song", artist_list=[Artist(name="artist")]),
],
artist_list=[
Artist(name="artist"),
@ -111,17 +113,7 @@ class TestCollection(unittest.TestCase):
album_1.merge(album_2)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].main_artist_collection) == id(album_1.song_collection[0].main_artist_collection))
def test_song_artist_relations(self):
a = self.complicated_object()
b = a.main_album_collection[0].song_collection[0].main_artist_collection[0]
c = b.main_album_collection[0].song_collection[0].main_artist_collection[0]
d = c.main_album_collection[0].song_collection[0].main_artist_collection[0]
self.assertTrue(a.id == b.id == c.id == d.id)
self.assertTrue(a.name == b.name == c.name == d.name == "artist")
self.assertTrue(a.country == b.country == c.country == d.country)
self.assertTrue(id(album_1.artist_collection) == id(album_1.artist_collection) == id(album_1.song_collection[0].artist_collection) == id(album_1.song_collection[0].artist_collection))
if __name__ == "__main__":
unittest.main()