This commit is contained in:
Hellow 2023-01-30 23:54:21 +01:00
parent 06cc826a21
commit 65ccdee2cb
24 changed files with 701 additions and 1245 deletions

View File

@ -26,7 +26,7 @@ Total : 43 files, 2560 codes, 558 comments, 774 blanks, all 3892 lines
| [src/music_kraken/database/__init__.py](/src/music_kraken/database/__init__.py) | Python | 11 | 1 | 4 | 16 |
| [src/music_kraken/database/database.py](/src/music_kraken/database/database.py) | Python | 191 | 102 | 45 | 338 |
| [src/music_kraken/database/get_song.py](/src/music_kraken/database/get_song.py) | Python | 40 | 5 | 11 | 56 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 172 | 78 | 55 | 305 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 172 | 78 | 55 | 305 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 11 | 0 | 4 | 15 |
| [src/music_kraken/database/objects/artist.py](/src/music_kraken/database/objects/artist.py) | Python | 18 | 0 | 5 | 23 |
| [src/music_kraken/database/objects/database_object.py](/src/music_kraken/database/objects/database_object.py) | Python | 21 | 5 | 11 | 37 |

View File

@ -19,7 +19,7 @@ Total : 20 files, 700 codes, 165 comments, 162 blanks, all 1027 lines
| [src/music_kraken/database/database.py](/src/music_kraken/database/database.py) | Python | 25 | 22 | 4 | 51 |
| [src/music_kraken/database/get_song.py](/src/music_kraken/database/get_song.py) | Python | 40 | 5 | 11 | 56 |
| [src/music_kraken/database/metadata.py](/src/music_kraken/database/metadata.py) | Python | -13 | 0 | -5 | -18 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 172 | 78 | 55 | 305 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 172 | 78 | 55 | 305 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 11 | 0 | 4 | 15 |
| [src/music_kraken/database/objects/artist.py](/src/music_kraken/database/objects/artist.py) | Python | 18 | 0 | 5 | 23 |
| [src/music_kraken/database/objects/database_object.py](/src/music_kraken/database/objects/database_object.py) | Python | 21 | 5 | 11 | 37 |

View File

@ -26,7 +26,7 @@ Total : 45 files, 2886 codes, 594 comments, 854 blanks, all 4334 lines
| [src/music_kraken/database/__init__.py](/src/music_kraken/database/__init__.py) | Python | 11 | 1 | 4 | 16 |
| [src/music_kraken/database/database.py](/src/music_kraken/database/database.py) | Python | 191 | 102 | 45 | 338 |
| [src/music_kraken/database/get_song.py](/src/music_kraken/database/get_song.py) | Python | 40 | 5 | 11 | 56 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 327 | 98 | 89 | 514 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 327 | 98 | 89 | 514 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 10 | 0 | 3 | 13 |
| [src/music_kraken/database/objects/artist.py](/src/music_kraken/database/objects/artist.py) | Python | 18 | 0 | 5 | 23 |
| [src/music_kraken/database/objects/database_object.py](/src/music_kraken/database/objects/database_object.py) | Python | 28 | 7 | 13 | 48 |

View File

@ -12,7 +12,7 @@ Total : 10 files, 326 codes, 36 comments, 80 blanks, all 442 lines
| filename | language | code | comment | blank | total |
| :--- | :--- | ---: | ---: | ---: | ---: |
| [src/goof.py](/src/goof.py) | Python | 30 | -1 | 7 | 36 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 155 | 20 | 34 | 209 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 155 | 20 | 34 | 209 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | -1 | 0 | -1 | -2 |
| [src/music_kraken/database/objects/database_object.py](/src/music_kraken/database/objects/database_object.py) | Python | 7 | 2 | 2 | 11 |
| [src/music_kraken/database/objects/song.py](/src/music_kraken/database/objects/song.py) | Python | 76 | 9 | 26 | 111 |

View File

@ -26,7 +26,7 @@ Total : 49 files, 3402 codes, 663 comments, 973 blanks, all 5038 lines
| [src/music_kraken/database/__init__.py](/src/music_kraken/database/__init__.py) | Python | 12 | 1 | 4 | 17 |
| [src/music_kraken/database/database.py](/src/music_kraken/database/database.py) | Python | 191 | 102 | 45 | 338 |
| [src/music_kraken/database/get_song.py](/src/music_kraken/database/get_song.py) | Python | 40 | 5 | 11 | 56 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 401 | 109 | 107 | 617 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 401 | 109 | 107 | 617 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 14 | 0 | 4 | 18 |
| [src/music_kraken/database/objects/artist.py](/src/music_kraken/database/objects/artist.py) | Python | 18 | 0 | 5 | 23 |
| [src/music_kraken/database/objects/metadata.py](/src/music_kraken/database/objects/metadata.py) | Python | 245 | 52 | 50 | 347 |

View File

@ -14,7 +14,7 @@ Total : 16 files, 516 codes, 69 comments, 119 blanks, all 704 lines
| [src/goof.py](/src/goof.py) | Python | 42 | 2 | 10 | 54 |
| [src/music_kraken/__init__.py](/src/music_kraken/__init__.py) | Python | 1 | 0 | 0 | 1 |
| [src/music_kraken/database/__init__.py](/src/music_kraken/database/__init__.py) | Python | 1 | 0 | 0 | 1 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 74 | 11 | 18 | 103 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 74 | 11 | 18 | 103 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 4 | 0 | 1 | 5 |
| [src/music_kraken/database/objects/database_object.py](/src/music_kraken/database/objects/database_object.py) | Python | -28 | -7 | -13 | -48 |
| [src/music_kraken/database/objects/metadata.py](/src/music_kraken/database/objects/metadata.py) | Python | 245 | 52 | 50 | 347 |

View File

@ -26,7 +26,7 @@ Total : 49 files, 3404 codes, 664 comments, 974 blanks, all 5042 lines
| [src/music_kraken/database/__init__.py](/src/music_kraken/database/__init__.py) | Python | 12 | 1 | 4 | 17 |
| [src/music_kraken/database/database.py](/src/music_kraken/database/database.py) | Python | 191 | 102 | 45 | 338 |
| [src/music_kraken/database/get_song.py](/src/music_kraken/database/get_song.py) | Python | 40 | 5 | 11 | 56 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 402 | 110 | 107 | 619 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 402 | 110 | 107 | 619 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 15 | 0 | 5 | 20 |
| [src/music_kraken/database/objects/artist.py](/src/music_kraken/database/objects/artist.py) | Python | 18 | 0 | 5 | 23 |
| [src/music_kraken/database/objects/metadata.py](/src/music_kraken/database/objects/metadata.py) | Python | 245 | 52 | 50 | 347 |

View File

@ -11,7 +11,7 @@ Total : 2 files, 2 codes, 1 comments, 1 blanks, all 4 lines
## Files
| filename | language | code | comment | blank | total |
| :--- | :--- | ---: | ---: | ---: | ---: |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/new_database.py) | Python | 1 | 1 | 0 | 2 |
| [src/music_kraken/database/new_database.py](/src/music_kraken/database/database.py) | Python | 1 | 1 | 0 | 2 |
| [src/music_kraken/database/objects/__init__.py](/src/music_kraken/database/objects/__init__.py) | Python | 1 | 0 | 1 | 2 |
[Summary](results.md) / [Details](details.md) / [Diff Summary](diff.md) / Diff Details

View File

@ -281,7 +281,7 @@ All the data, the functions that download stuff use, can be gotten from the temp
The cache can be simply used like this:
```python
music_kraken.cache
music_kraken.test_db
```
When fetching any song data from the cache, you will get it as Song

View File

@ -8,7 +8,7 @@ from music_kraken import (
Artist,
ID3Timestamp,
SourcePages,
SourceTypes
cache
)
from music_kraken.tagging import (
@ -17,7 +17,7 @@ from music_kraken.tagging import (
write_many_metadata
)
import music_kraken.database.new_database as db
import music_kraken.database.database as db
import pycountry
import logging
@ -29,7 +29,6 @@ def div(msg: str = ""):
print("-" * 50 + msg + "-" * 50)
cache = music_kraken.database.new_database.Database("test.db")
cache.reset()

View File

@ -1,10 +1,46 @@
from music_kraken import (
Song,
Database
)
from music_kraken.pages import (
EncyclopaediaMetallum
)
test_db = Database("test.db")
# test_db.reset()
def print_song(song_: Song):
print(str(song_.metadata))
print("----album--")
print(song_.album)
print("----src----")
print("song:")
print(song_.source_list)
print("album:")
print(song_.album.source_list)
print("\n")
# only_smile = EncyclopaediaMetallum.search_by_query("only smile")
# print(EncyclopaediaMetallum.search_by_query("#a Ghost Bath"))
# print(EncyclopaediaMetallum.search_by_query("#a Ghost Bath #r Self Loather"))
print(EncyclopaediaMetallum.search_by_query("#a Ghost Bath #r Self Loather #t hide from the sun"))
songs_in_db = test_db.pull_songs()
song: Song
if len(songs_in_db) <= 0:
print("didn't find song in db.... downloading")
song: Song = EncyclopaediaMetallum.search_by_query("#a Ghost Bath #r Self Loather #t hide from the sun")[0]
test_db.push_song(song)
else:
print("found song in database")
song = songs_in_db[0]
print_song(song)
artist = song.main_artist_list[0]
artist = EncyclopaediaMetallum.fetch_artist_details(artist)
# print(only_smile)

View File

@ -46,12 +46,14 @@ SourcePages = database.SourcePages
Target = database.Target
Lyrics = database.Lyrics
Album = database.Album
ID3Timestamp = database.ID3Timestamp
MetadataSearch = metadata.MetadataSearch
MetadataDownload = metadata.MetadataDownload
# cache = database.cache
cache = database.cache
Database = database.Database
def fetch_metadata(type_: str, id_: str):
metadata_downloader = MetadataDownload()

View File

@ -1,14 +1,2 @@
from typing import List
from ..database.song import Song as song_object
from . import (
fetch_source,
fetch_audio
)
def fetch_sources(songs: List[song_object], skip_existing_files: bool = False):
fetch_source.Download.fetch_sources(songs=songs, skip_existing_files=skip_existing_files)
def fetch_audios(songs: List[song_object], override_existing: bool = False):
fetch_audio.Download.fetch_audios(songs=songs, override_existing=override_existing)

View File

@ -1,6 +1,7 @@
from . import (
temp_database,
objects
objects,
database
)
MusicObject = objects.MusicObject
@ -12,8 +13,8 @@ Song = objects.Song
Source = objects.Source
Target = objects.Target
Lyrics = objects.Lyrics
Album = objects.Album
Artist = objects.Artist
# cache = temp_database.TempDatabase()
Database = database.Database
cache = temp_database.TempDatabase()

View File

@ -1,337 +1,647 @@
from typing import List
import sqlite3
import os
import logging
import json
from typing import List, Tuple
from pkg_resources import resource_string
import datetime
import pycountry
from .song import (
from .objects.parents import Reference
from .objects.source import Source
from .objects import (
Song,
Lyrics,
Metadata,
Target,
Artist,
Source
)
from .get_song import get_song_from_response
from ..utils.shared import (
DATABASE_LOGGER
Album,
ID3Timestamp,
SourceTypes,
SourcePages,
SourceAttribute
)
logger = DATABASE_LOGGER
logger = logging.getLogger("database")
# Due to this not being deployed on a Server **HOPEFULLY**
# I don't need to parameterize stuff like the where and
# use complicated query builder
SONG_QUERY = """
SELECT
Song.id AS song_id, Song.name AS title, Song.isrc AS isrc, Song.length AS length, Song.album_id as album_id, Song.tracksort,
Target.id AS target_id, Target.file AS file, Target.path AS path, Song.genre AS genre
FROM Song
LEFT JOIN Target ON Song.id=Target.song_id
WHERE {where};
"""
SOURCE_QUERY = """
SELECT id, type, src, url, song_id
FROM Source
WHERE {where};
"""
LYRICS_QUERY = """
SELECT id, text, language, song_id
FROM Lyrics
WHERE {where};
"""
ALBUM_QUERY_UNJOINED = """
SELECT Album.id AS album_id, title, label, album_status, language, date, country, barcode, albumsort, is_split
FROM Album
WHERE {where};
"""
ALBUM_QUERY_JOINED = """
SELECT a.id AS album_id, a.title, a.label, a.album_status, a.language, a.date, a.country, a.barcode, a.albumsort, a.is_split
FROM Song
INNER JOIN Album a ON Song.album_id=a.id
WHERE {where};
"""
ARTIST_QUERY = """
SELECT id as artist_id, name as artist_name
FROM Artist
WHERE {where};
"""
class Database:
def __init__(self, path_to_db: str, reset_anyways: bool = False):
self.path_to_db = path_to_db
def __init__(self, database_file: str):
self.database_file: str = database_file
self.connection, self.cursor = self.reset_cursor()
self.connection = sqlite3.connect(self.path_to_db)
self.cursor = self.connection.cursor()
# init database
self.init_db(reset_anyways=reset_anyways)
def reset(self):
"""
Deletes all Data from the database if it exists
and resets the schema defined in self.structure_file
"""
logger.info(f"resetting the database")
def init_db(self, reset_anyways: bool = False):
# check if db exists
exists = True
try:
query = 'SELECT * FROM track;'
self.cursor.execute(query)
_ = self.cursor.fetchall()
except sqlite3.OperationalError:
exists = False
# deleting the database
del self.connection
del self.cursor
os.remove(self.database_file)
if not exists:
logger.info("Database does not exist yet.")
# newly creating the database
self.reset_cursor()
query = resource_string("music_kraken", "static_files/new_db.sql").decode('utf-8')
if reset_anyways or not exists:
# reset the database if reset_anyways is true or if an error has been thrown previously.
logger.info(f"Reseting the database.")
query = resource_string("music_kraken", "static_files/temp_database_structure.sql").decode('utf-8')
# fill the database with the schematic
self.cursor.executescript(query)
self.connection.commit()
def add_artist(
self,
musicbrainz_artistid: str,
artist: str = None
):
query = "INSERT OR REPLACE INTO artist (id, name) VALUES (?, ?);"
values = musicbrainz_artistid, artist
def reset_cursor(self) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
self.connection = sqlite3.connect(self.database_file)
# This is necessary that fetching rows returns dicts instead of tuple
self.connection.row_factory = sqlite3.Row
self.cursor.execute(query, values)
self.connection.commit()
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def add_release_group(
self,
musicbrainz_releasegroupid: str,
artist_ids: list,
albumartist: str = None,
albumsort: int = None,
musicbrainz_albumtype: str = None,
compilation: str = None,
album_artist_id: str = None
):
# add adjacency
adjacency_list = []
for artist_id in artist_ids:
adjacency_list.append((artist_id, musicbrainz_releasegroupid))
adjacency_values = tuple(adjacency_list)
adjacency_query = "INSERT OR REPLACE INTO artist_release_group (artist_id, release_group_id) VALUES (?, ?);"
self.cursor.executemany(adjacency_query, adjacency_values)
self.connection.commit()
def push_one(self, db_object: Song | Lyrics | Target | Artist | Source | Album):
if db_object.dynamic:
return
# add release group
query = "INSERT OR REPLACE INTO release_group (id, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id) VALUES (?, ?, ?, ?, ?, ?);"
values = musicbrainz_releasegroupid, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id
self.cursor.execute(query, values)
self.connection.commit()
if type(db_object) == Song:
return self.push_song(song=db_object)
def add_release(
self,
musicbrainz_albumid: str,
release_group_id: str,
title: str = None,
copyright_: str = None,
album_status: str = None,
language: str = None,
year: str = None,
date: str = None,
country: str = None,
barcode: str = None
):
query = "INSERT OR REPLACE INTO release_ (id, release_group_id, title, copyright, album_status, language, year, date, country, barcode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
values = musicbrainz_albumid, release_group_id, title, copyright_, album_status, language, year, date, country, barcode
if type(db_object) == Lyrics:
return self.push_lyrics(lyrics=db_object)
self.cursor.execute(query, values)
self.connection.commit()
if type(db_object) == Target:
return self.push_target(target=db_object)
def add_track(
self,
musicbrainz_releasetrackid: str,
musicbrainz_albumid: str,
feature_aritsts: list,
tracknumber: str = None,
track: str = None,
isrc: str = None,
length: int = None
):
# add adjacency
adjacency_list = []
for artist_id in feature_aritsts:
adjacency_list.append((artist_id, musicbrainz_releasetrackid))
adjacency_values = tuple(adjacency_list)
adjacency_query = "INSERT OR REPLACE INTO artist_track (artist_id, track_id) VALUES (?, ?);"
self.cursor.executemany(adjacency_query, adjacency_values)
self.connection.commit()
if type(db_object) == Artist:
return self.push_artist(artist=db_object)
# add track
query = "INSERT OR REPLACE INTO track (id, release_id, track, isrc, tracknumber, length) VALUES (?, ?, ?, ?, ?, ?);"
values = musicbrainz_releasetrackid, musicbrainz_albumid, track, isrc, tracknumber, length
self.cursor.execute(query, values)
self.connection.commit()
if type(db_object) == Source:
# needs to have the property type_enum or type_str set
return self.push_source(source=db_object)
@staticmethod
def get_custom_track_query(custom_where: list) -> str:
where_args = [
"1 = 1"
]
where_args.extend(custom_where)
if type(db_object) == Album:
return self.push_album(album=db_object)
where_arg = " AND ".join(where_args)
query = f"""
SELECT DISTINCT
json_object(
'artists', json_group_array(
(
SELECT DISTINCT json_object(
'id', artist.id,
'name', artist.name
)
)
),
'source', json_group_array(
(
SELECT DISTINCT json_object(
'src', src_table.src,
'url', src_table.url,
'valid', src_table.valid
)
)
),
'lyrics', json_group_array(
(
SELECT DISTINCT json_object(
'text', lyrics_table.text
'language', lyrics_table.language
)
)
),
'target', json_group_array(
(
SELECT DISTINCT json_object(
'file', target.file
'path', target.path
)
)
),
'id', track.id,
'mb_id', track.mb_id,
'tracknumber', track.tracknumber,
'titlesort', track.tracknumber,
'musicbrainz_releasetrackid', track.id,
'musicbrainz_albumid', release_.id,
'title', track.track,
'isrc', track.isrc,
'album', release_.title,
'copyright', release_.copyright,
'album_status', release_.album_status,
'language', release_.language,
'year', release_.year,
'date', release_.date,
'country', release_.country,
'barcode', release_.barcode,
'albumartist', release_group.albumartist,
'albumsort', release_group.albumsort,
'musicbrainz_albumtype', release_group.musicbrainz_albumtype,
'compilation', release_group.compilation,
'album_artist_id', release_group.album_artist_id,
'length', track.length,
'path', track.path,
'file', track.file,
'genre', track.genre,
'url', track.url,
'src', track.src,
'lyrics', track.lyrics
)
FROM track
LEFT JOIN release_ ON track.release_id = release_.id
LEFT JOIN release_group ON release_.id = release_group.id
LEFT JOIN artist_track ON track.id = artist_track.track_id
LEFT JOIN artist ON artist_track.artist_id = artist.id
LEFT JOIN source src_table ON track.id = src_table.track_id
LEFT JOIN lyrics lyrics_table ON track.id = lyrics_table.track_id
LEFT JOIN target ON track.id = target.track_id
WHERE
{where_arg}
GROUP BY track.id;
logger.warning(f"type {type(db_object)} isn't yet supported by the db")
def push(self, db_object_list: List[Song | Lyrics | Target | Artist | Source | Album]):
"""
return query
This function is used to Write the data of any db_object to the database
def get_custom_track(self, custom_where: list) -> List[Song]:
query = Database.get_custom_track_query(custom_where=custom_where)
return [get_song_from_response(json.loads(i[0])) for i in self.cursor.execute(query)]
It syncs a whole list of db_objects to the database and is meant
as the primary method to add to the database.
def get_track_metadata(self, musicbrainz_releasetrackid: str):
# this would be vulnerable if musicbrainz_releasetrackid would be user input
resulting_tracks = self.get_custom_track([f'track.id == "{musicbrainz_releasetrackid}"'])
if len(resulting_tracks) != 1:
return -1
return resulting_tracks[0]
def get_tracks_to_download(self) -> List[Song]:
return self.get_custom_track(['track.downloaded == 0'])
def get_tracks_without_src(self) -> List[Song]:
return self.get_custom_track(["(track.url IS NULL OR track.src IS NULL)"])
def get_tracks_without_isrc(self) -> List[Song]:
return self.get_custom_track(["track.isrc IS NULL"])
def get_tracks_without_filepath(self) -> List[Song]:
return self.get_custom_track(["(track.file IS NULL OR track.path IS NULL OR track.genre IS NULL)"])
def get_tracks_for_lyrics(self) -> List[Song]:
return self.get_custom_track(["track.lyrics IS NULL"])
def add_lyrics(self, song: Song, lyrics: Lyrics):
query = f"""
UPDATE track
SET lyrics = ?
WHERE '{song.id}' == id;
:param db_object_list:
"""
self.cursor.execute(query, (str(lyrics.text),))
for db_object in db_object_list:
self.push_one(db_object)
def push_album(self, album: Album):
table = "Album"
query = f"INSERT OR REPLACE INTO {table} (id, title, label, album_status, language, date, country, barcode, albumsort, is_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
values = (
album.id,
album.title,
album.label,
album.album_status,
album.iso_639_2_language,
album.date.strftime("%Y-%m-%d"),
album.country,
album.barcode,
album.albumsort,
album.is_split
)
self.cursor.execute(query, values)
self.connection.commit()
def update_download_status(self, track_id: str):
query = f"UPDATE track SET downloaded = 1, WHERE '{track_id}' == id;"
for song in album.tracklist:
self.push_song(song)
for artist in album.artists:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
self.push_artist(artist)
for source in album.source_list:
source.type_enum = SourceTypes.ALBUM
source.add_song(album)
self.push_source(source=source)
def push_song(self, song: Song):
if song.dynamic:
return
# ADDING THE DATA FOR THE SONG OBJECT
"""
db_field - object attribute
-------------------------------
id - id
name - title
"""
table = "Song"
values = (
song.id,
song.title,
song.isrc,
song.length,
song.get_album_id(),
song.tracksort,
song.genre
)
query = f"INSERT OR REPLACE INTO {table} (id, name, isrc, length, album_id, tracksort, genre) VALUES (?, ?, ?, ?, ?, ?, ?);"
self.cursor.execute(query, values)
self.connection.commit()
# add sources
for source in song.source_list:
source.add_song(song)
source.type_enum = SourceTypes.SONG
self.push_source(source=source)
# add lyrics
for single_lyrics in song.lyrics:
single_lyrics.add_song(song)
self.push_lyrics(lyrics=single_lyrics)
# add target
song.target.add_song(song)
self.push_target(target=song.target)
for main_artist in song.main_artist_list:
self.push_artist_song(artist_ref=Reference(main_artist.id), song_ref=Reference(song.id), is_feature=False)
self.push_artist(artist=main_artist)
for feature_artist in song.feature_artist_list:
self.push_artist_song(artist_ref=Reference(feature_artist.id), song_ref=Reference(song.id), is_feature=True)
self.push_artist(artist=feature_artist)
if song.album is not None:
self.push_album(song.album)
def push_lyrics(self, lyrics: Lyrics, ):
if lyrics.song_ref_id is None:
logger.warning("the Lyrics don't refer to a song")
table = "Lyrics"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, text, language) VALUES (?, ?, ?, ?);"
values = (
lyrics.id,
lyrics.song_ref_id,
lyrics.text,
lyrics.language
)
self.cursor.execute(query, values)
self.connection.commit()
def push_source(self, source: Source):
if source.song_ref_id is None:
logger.warning(f"the Source {source} don't refer to a song")
table = "Source"
query = f"INSERT OR REPLACE INTO {table} (id, type, song_id, src, url) VALUES (?, ?, ?, ?, ?);"
values = (
source.id,
source.type_str,
source.song_ref_id,
source.page_str,
source.url
)
self.cursor.execute(query, values)
self.connection.commit()
def push_target(self, target: Target):
if target.song_ref_id is None:
logger.warning("the Target doesn't refer to a song")
table = "Target"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, file, path) VALUES (?, ?, ?, ?);"
values = (
target.id,
target.song_ref_id,
target.file,
target.path
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_song(self, artist_ref: Reference, song_ref: Reference, is_feature: bool):
table = "SongArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE song_id=\"{song_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (song_id, artist_id, is_feature) VALUES (?, ?, ?);"
values = (
song_ref.id,
artist_ref.id,
is_feature
)
self.cursor.execute(query, values)
self.connection.commit()
def set_field_of_song(self, track_id: str, key: str, value: str):
query = f"UPDATE track SET {key} = ? WHERE '{track_id}' == id;"
self.cursor.execute(query, (value,))
def push_artist_album(self, artist_ref: Reference, album_ref: Reference):
table = "AlbumArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE album_id=\"{album_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (album_id, artist_id) VALUES (?, ?);"
values = (
album_ref.id,
artist_ref.id
)
self.cursor.execute(query, values)
self.connection.commit()
def set_download_data(self, track_id: str, url: str, src: str):
query = f"""
UPDATE track
SET url = ?,
src = ?
WHERE '{track_id}' == id;
def push_artist(self, artist: Artist):
table = "Artist"
query = f"INSERT OR REPLACE INTO {table} (id, name) VALUES (?, ?);"
values = (
artist.id,
artist.name
)
self.cursor.execute(query, values)
self.connection.commit()
for song in artist.feature_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=True)
self.push_song(song=song)
for song in artist.main_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=False)
self.push_song(song=song)
for album in artist.main_albums:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
for source in artist.source_list:
source.type_enum = SourceTypes.ARTIST
self.push_source(source)
def pull_lyrics(self, song_ref: Reference = None, lyrics_ref: Reference = None) -> List[Lyrics]:
"""
self.cursor.execute(query, (url, src))
self.connection.commit()
query = "INSERT OR REPLACE INTO source (track_id, src, url) VALUES (?, ?, ?);"
self.cursor.execute(query, (track_id, src, url))
self.connection.commit()
def set_filepath(self, track_id: str, file: str, path: str, genre: str):
query = f"""
UPDATE track
SET file = ?,
path = ?,
genre = ?
WHERE '{track_id}' == id;
Gets a list of sources. if lyrics_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor lyrics_ref are passed in it will return ALL lyrics**
:param song_ref:
:param lyrics_ref:
:return:
"""
self.cursor.execute(query, (file, path, genre))
self.connection.commit()
def write_target(self, song_id: str, target: Target):
query = f"UPDATE track SET file = ?, path = ? WHERE '{song_id}' == id;"
self.cursor.execute(query, (target.file, target.path))
self.connection.commit()
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif lyrics_ref is not None:
where = f"id=\"{lyrics_ref.id}\""
def write_artist(self, artist: Artist, song_id: str = None, release_group_id: str = None):
artist_id = artist.id
query = LYRICS_QUERY.format(where=where)
self.cursor.execute(query)
query = "INSERT OR REPLACE INTO artist (id, mb_id, name) VALUES (?, ?, ?);"
self.cursor.execute(query, (artist_id, artist.mb_id, artist.name))
self.connection.commit()
lyrics_rows = self.cursor.fetchall()
return [Lyrics(
id_=lyrics_row['id'],
text=lyrics_row['text'],
language=lyrics_row['language']
) for lyrics_row in lyrics_rows]
if song_id is not None:
adjacency_query = "INSERT OR REPLACE INTO artist_track (artist_id, track_id) VALUES (?, ?);"
self.cursor.execute(adjacency_query, (artist_id, song_id))
self.connection.commit()
def pull_sources(self, artist_ref: Reference = None, song_ref: Reference = None, source_ref: Reference = None, album_ref: Reference = None) -> List[Source]:
"""
Gets a list of sources. if source_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor source_ref are passed in it will return ALL sources**
:param artist_ref:
:param song_ref:
:param source_ref:
:param type_str: the thing the source belongs to like eg. "song" or "album"
:return:
"""
if release_group_id is not None:
adjacency_query = "INSERT OR REPLACE INTO artist_release_group (artist_id, release_group_id) VALUES (?, ?);"
self.cursor.execute(adjacency_query, (artist_id, release_group_id))
self.connection.commit()
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif source_ref is not None:
where = f"id=\"{source_ref.id}\" AND type=\"{SourceTypes.SONG.value}\""
elif artist_ref is not None:
where = f"song_id=\"{artist_ref.id}\" AND type=\"{SourceTypes.ARTIST.value}\""
elif album_ref is not None:
where = f"song_id=\"{album_ref.id}\" AND type=\"{SourceTypes.ALBUM.value}\""
def write_many_artists(self, song_id: str, artist_list: List[Artist]):
for artist in artist_list:
self.write_artist(song_id=song_id, artist=artist)
query = SOURCE_QUERY.format(where=where)
self.cursor.execute(query)
def write_source(self, song_id: str, source: Source):
pass
source_rows = self.cursor.fetchall()
def write_many_sources(self, song_id: str, source_list: List[Source]):
for source in source_list:
self.write_source(song_id=song_id, source=source)
return [
Source(
page_enum=SourcePages(source_row['src']),
type_enum=SourceTypes(source_row['type']),
url=source_row['url'],
id_=source_row['id']
) for source_row in source_rows
]
def write_song(self, song: Song):
song_id = song.id
def pull_artist_song(self, song_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "SongArtist"
wheres = []
if song_ref is not None:
wheres.append(f"song_id=\"{song_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
# write artists
self.write_many_artists(song_id=song_id, artist_list=song.artists)
# write sources
self.write_many_sources(song_id=song_id, source_list=song.sources)
# write target
self.write_target(song_id=song_id, target=song.target)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
def write_many_song(self, songs: List[Song]):
for song in songs:
self.write_song(song=song)
return [(
Reference(join["song_id"]),
Reference(join["artist_id"]),
bool(join["is_feature"])
) for join in joins]
def pull_artist_album(self, album_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "AlbumArtist"
wheres = []
if album_ref is not None:
wheres.append(f"album_id=\"{album_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["album_id"]),
Reference(join["artist_id"])
) for join in joins]
def get_artist_from_row(self, artist_row, exclude_relations: set = None, flat: bool = False) -> Artist:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Artist)
artist_id = artist_row['artist_id']
artist_obj = Artist(
id_=artist_id,
name=artist_row['artist_name'],
source_list=self.pull_sources(artist_ref=Reference(id_=artist_id))
)
if flat:
return artist_obj
# fetch songs :D
for song_ref, _, is_feature in self.pull_artist_song(artist_ref=Reference(id_=artist_id)):
new_songs = self.pull_songs(song_ref=song_ref, exclude_relations=new_exclude_relations)
if len(new_songs) < 1:
continue
new_song = new_songs[0]
if is_feature:
artist_obj.feature_songs.append(new_song)
else:
artist_obj.main_songs.append(new_song)
# fetch albums
for album_ref, _ in self.pull_artist_album(artist_ref=Reference(id_=artist_id)):
new_albums = self.pull_albums(album_ref=album_ref, exclude_relations=new_exclude_relations)
if len(new_albums) < 1:
continue
artist_obj.main_albums.append(new_albums[0])
return artist_obj
def pull_artists(self, artist_ref: Reference = None, exclude_relations: set = None, flat: bool = False) -> List[Artist]:
"""
:param artist_ref:
:param exclude_relations:
:param flat: if it is true it ONLY fetches the artist data
:return:
"""
where = "1=1"
if artist_ref is not None:
where = f"Artist.id=\"{artist_ref.id}\""
query = ARTIST_QUERY.format(where=where)
self.cursor.execute(query)
artist_rows = self.cursor.fetchall()
return [(
self.get_artist_from_row(artist_row, exclude_relations=exclude_relations, flat=flat)
) for artist_row in artist_rows]
def get_song_from_row(self, song_result, exclude_relations: set = None) -> Song:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Song)
song_id = song_result['song_id']
# maybee fetch album
song_obj = Song(
id_=song_id,
title=song_result['title'],
isrc=song_result['isrc'],
length=song_result['length'],
tracksort=song_result['tracksort'],
genre=song_result['genre'],
target=Target(
id_=song_result['target_id'],
file=song_result['file'],
path=song_result['path']
),
source_list=self.pull_sources(song_ref=Reference(id_=song_id)),
lyrics=self.pull_lyrics(song_ref=Reference(id_=song_id)),
)
if Album not in exclude_relations and song_result['album_id'] is not None:
album_obj = self.pull_albums(album_ref=Reference(song_result['album_id']),
exclude_relations=new_exclude_relations)
if len(album_obj) > 0:
song_obj.album = album_obj[0]
flat_artist = Artist in exclude_relations
main_artists = []
feature_artists = []
for song_ref, artist_ref, is_feature in self.pull_artist_song(song_ref=Reference(song_id)):
if is_feature:
feature_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
else:
main_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
song_obj.main_artist_list = main_artists
song_obj.feature_artist_list = feature_artists
return song_obj
def pull_songs(self, song_ref: Reference = None, album_ref: Reference = None, exclude_relations: set = set()) -> \
List[Song]:
"""
This function is used to get one song (including its children like Sources etc)
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param song_ref:
:param album_ref:
:return requested_song:
"""
where = "1=1"
if song_ref is not None:
where = f"Song.id=\"{song_ref.id}\""
elif album_ref is not None:
where = f"Song.album_id=\"{album_ref.id}\""
query = SONG_QUERY.format(where=where)
self.cursor.execute(query)
song_rows = self.cursor.fetchall()
return [self.get_song_from_row(
song_result=song_result,
exclude_relations=exclude_relations
) for song_result in song_rows]
def get_album_from_row(self, album_result, exclude_relations=None) -> Album:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = exclude_relations.copy()
new_exclude_relations.add(Album)
album_id = album_result['album_id']
language = album_result['language']
if language is not None:
language = pycountry.languages.get(alpha_3=album_result['language'])
album_obj = Album(
id_=album_id,
title=album_result['title'],
label=album_result['label'],
album_status=album_result['album_status'],
language=language,
date=ID3Timestamp.strptime(album_result['date'], "%Y-%m-%d"),
country=album_result['country'],
barcode=album_result['barcode'],
is_split=album_result['is_split'],
albumsort=album_result['albumsort'],
source_list=self.pull_sources(album_ref=Reference(id_=album_id))
)
if Song not in exclude_relations:
# getting the tracklist
tracklist: List[Song] = self.pull_songs(
album_ref=Reference(id_=album_id),
exclude_relations=new_exclude_relations
)
album_obj.set_tracklist(tracklist=tracklist)
flat_artist = Artist in exclude_relations
for _, artist_ref in self.pull_artist_album(album_ref=Reference(id_=album_id)):
artists = self.pull_artists(artist_ref, flat=flat_artist, exclude_relations=new_exclude_relations)
if len(artists) < 1:
continue
album_obj.artists.append(artists[0])
return album_obj
def pull_albums(self, album_ref: Reference = None, song_ref: Reference = None, exclude_relations: set = None) -> \
List[Album]:
"""
This function is used to get matching albums/releses
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param album_ref:
:return requested_album_list:
"""
if exclude_relations is None:
exclude_relations = set()
query = ALBUM_QUERY_UNJOINED
where = "1=1"
if album_ref is not None:
query = ALBUM_QUERY_UNJOINED
where = f"Album.id=\"{album_ref.id}\""
elif song_ref is not None:
query = ALBUM_QUERY_JOINED
where = f"Song.id=\"{song_ref.id}\""
query = query.format(where=where)
self.cursor.execute(query)
album_rows = self.cursor.fetchall()
return [self.get_album_from_row(
album_result=album_row,
exclude_relations=exclude_relations
) for album_row in album_rows]
if __name__ == "__main__":
cache = Database("")

View File

@ -1,55 +0,0 @@
from typing import List
from .song import (
Song,
Source,
Target,
Metadata,
Artist,
LyricsContainer,
Lyrics
)
def get_song_from_response(response: dict) -> Song:
# artists
artists = [Artist(id_=a['id'], mb_id=a['id'], name=a['name']) for a in response['artists']]
# metadata
metadata = Metadata()
for key, value in response.items():
metadata[key] = value
metadata['artists'] = [a.name for a in artists]
# sources
sources: List[Source] = []
for src in response['source']:
if src['src'] is None:
continue
sources.append(Source(src=src['src'], url=src['url']))
# target
target = Target(file=response['file'], path=response['path'])
# Lyrics
lyrics_container = LyricsContainer()
lyrics_container.append(Lyrics(text=response['lyrics'], language='en'))
length = response['length']
if length is not None:
length = int(length)
song = Song(
id_=response['id'],
mb_id=response['id'],
title=response['title'],
release=response['album'],
isrc=response['isrc'],
length=length,
artists=artists,
metadata=metadata,
sources=sources,
target=target
)
return song

View File

@ -1,644 +0,0 @@
import sqlite3
import os
import logging
from typing import List, Tuple
from pkg_resources import resource_string
import datetime
import pycountry
from .objects.parents import Reference
from .objects.source import Source
from .objects import (
Song,
Lyrics,
Target,
Artist,
Album,
ID3Timestamp,
SourceTypes,
SourcePages,
SourceAttribute
)
logger = logging.getLogger("database")
# Due to this not being deployed on a Server **HOPEFULLY**
# I don't need to parameterize stuff like the where and
# use complicated query builder
SONG_QUERY = """
SELECT
Song.id AS song_id, Song.name AS title, Song.isrc AS isrc, Song.length AS length, Song.album_id as album_id, Song.tracksort,
Target.id AS target_id, Target.file AS file, Target.path AS path, Song.genre AS genre
FROM Song
LEFT JOIN Target ON Song.id=Target.song_id
WHERE {where};
"""
SOURCE_QUERY = """
SELECT id, type, src, url, song_id
FROM Source
WHERE {where};
"""
LYRICS_QUERY = """
SELECT id, text, language, song_id
FROM Lyrics
WHERE {where};
"""
ALBUM_QUERY_UNJOINED = """
SELECT Album.id AS album_id, title, label, album_status, language, date, country, barcode, albumsort, is_split
FROM Album
WHERE {where};
"""
ALBUM_QUERY_JOINED = """
SELECT a.id AS album_id, a.title, a.label, a.album_status, a.language, a.date, a.country, a.barcode, a.albumsort, a.is_split
FROM Song
INNER JOIN Album a ON Song.album_id=a.id
WHERE {where};
"""
ARTIST_QUERY = """
SELECT id as artist_id, name as artist_name
FROM Artist
WHERE {where};
"""
class Database:
def __init__(self, database_file: str):
self.database_file: str = database_file
self.connection, self.cursor = self.reset_cursor()
self.cursor = self.connection.cursor()
def reset(self):
"""
Deletes all Data from the database if it exists
and resets the schema defined in self.structure_file
"""
logger.info(f"resetting the database")
# deleting the database
del self.connection
del self.cursor
os.remove(self.database_file)
# newly creating the database
self.reset_cursor()
query = resource_string("music_kraken", "static_files/new_db.sql").decode('utf-8')
# fill the database with the schematic
self.cursor.executescript(query)
self.connection.commit()
def reset_cursor(self) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
self.connection = sqlite3.connect(self.database_file)
# This is necessary that fetching rows returns dicts instead of tuple
self.connection.row_factory = sqlite3.Row
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def push_one(self, db_object: Song | Lyrics | Target | Artist | Source | Album):
if db_object.dynamic:
return
if type(db_object) == Song:
return self.push_song(song=db_object)
if type(db_object) == Lyrics:
return self.push_lyrics(lyrics=db_object)
if type(db_object) == Target:
return self.push_target(target=db_object)
if type(db_object) == Artist:
return self.push_artist(artist=db_object)
if type(db_object) == Source:
# needs to have the property type_enum or type_str set
return self.push_source(source=db_object)
if type(db_object) == Album:
return self.push_album(album=db_object)
logger.warning(f"type {type(db_object)} isn't yet supported by the db")
def push(self, db_object_list: List[Song | Lyrics | Target | Artist | Source | Album]):
"""
This function is used to Write the data of any db_object to the database
It syncs a whole list of db_objects to the database and is meant
as the primary method to add to the database.
:param db_object_list:
"""
for db_object in db_object_list:
self.push_one(db_object)
def push_album(self, album: Album):
table = "Album"
query = f"INSERT OR REPLACE INTO {table} (id, title, label, album_status, language, date, country, barcode, albumsort, is_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
values = (
album.id,
album.title,
album.label,
album.album_status,
album.iso_639_2_language,
album.date.strftime("%Y-%m-%d"),
album.country,
album.barcode,
album.albumsort,
album.is_split
)
self.cursor.execute(query, values)
self.connection.commit()
for song in album.tracklist:
self.push_song(song)
for artist in album.artists:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
self.push_artist(artist)
for source in album.source_list:
source.type_enum = SourceTypes.ALBUM
source.add_song(album)
self.push_source(source=source)
def push_song(self, song: Song):
if song.dynamic:
return
# ADDING THE DATA FOR THE SONG OBJECT
"""
db_field - object attribute
-------------------------------
id - id
name - title
"""
table = "Song"
values = (
song.id,
song.title,
song.isrc,
song.length,
song.get_album_id(),
song.tracksort,
song.genre
)
query = f"INSERT OR REPLACE INTO {table} (id, name, isrc, length, album_id, tracksort, genre) VALUES (?, ?, ?, ?, ?, ?, ?);"
self.cursor.execute(query, values)
self.connection.commit()
# add sources
for source in song.source_list:
source.add_song(song)
source.type_enum = SourceTypes.SONG
self.push_source(source=source)
# add lyrics
for single_lyrics in song.lyrics:
single_lyrics.add_song(song)
self.push_lyrics(lyrics=single_lyrics)
# add target
song.target.add_song(song)
self.push_target(target=song.target)
for main_artist in song.main_artist_list:
self.push_artist_song(artist_ref=Reference(main_artist.id), song_ref=Reference(song.id), is_feature=False)
self.push_artist(artist=main_artist)
for feature_artist in song.feature_artist_list:
self.push_artist_song(artist_ref=Reference(feature_artist.id), song_ref=Reference(song.id), is_feature=True)
self.push_artist(artist=feature_artist)
if song.album is not None:
self.push_album(song.album)
def push_lyrics(self, lyrics: Lyrics, ):
if lyrics.song_ref_id is None:
logger.warning("the Lyrics don't refer to a song")
table = "Lyrics"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, text, language) VALUES (?, ?, ?, ?);"
values = (
lyrics.id,
lyrics.song_ref_id,
lyrics.text,
lyrics.language
)
self.cursor.execute(query, values)
self.connection.commit()
def push_source(self, source: Source):
if source.song_ref_id is None:
logger.warning(f"the Source {source} don't refer to a song")
table = "Source"
query = f"INSERT OR REPLACE INTO {table} (id, type, song_id, src, url) VALUES (?, ?, ?, ?, ?);"
values = (
source.id,
source.type_str,
source.song_ref_id,
source.page_str,
source.url
)
self.cursor.execute(query, values)
self.connection.commit()
def push_target(self, target: Target):
if target.song_ref_id is None:
logger.warning("the Target doesn't refer to a song")
table = "Target"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, file, path) VALUES (?, ?, ?, ?);"
values = (
target.id,
target.song_ref_id,
target.file,
target.path
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_song(self, artist_ref: Reference, song_ref: Reference, is_feature: bool):
table = "SongArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE song_id=\"{song_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (song_id, artist_id, is_feature) VALUES (?, ?, ?);"
values = (
song_ref.id,
artist_ref.id,
is_feature
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_album(self, artist_ref: Reference, album_ref: Reference):
table = "AlbumArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE album_id=\"{album_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (album_id, artist_id) VALUES (?, ?);"
values = (
album_ref.id,
artist_ref.id
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist(self, artist: Artist):
table = "Artist"
query = f"INSERT OR REPLACE INTO {table} (id, name) VALUES (?, ?);"
values = (
artist.id,
artist.name
)
self.cursor.execute(query, values)
self.connection.commit()
for song in artist.feature_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=True)
self.push_song(song=song)
for song in artist.main_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=False)
self.push_song(song=song)
for album in artist.main_albums:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
for source in artist.source_list:
source.type_enum = SourceTypes.ARTIST
self.push_source(source)
def pull_lyrics(self, song_ref: Reference = None, lyrics_ref: Reference = None) -> List[Lyrics]:
"""
Gets a list of sources. if lyrics_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor lyrics_ref are passed in it will return ALL lyrics**
:param song_ref:
:param lyrics_ref:
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif lyrics_ref is not None:
where = f"id=\"{lyrics_ref.id}\""
query = LYRICS_QUERY.format(where=where)
self.cursor.execute(query)
lyrics_rows = self.cursor.fetchall()
return [Lyrics(
id_=lyrics_row['id'],
text=lyrics_row['text'],
language=lyrics_row['language']
) for lyrics_row in lyrics_rows]
def pull_sources(self, artist_ref: Reference = None, song_ref: Reference = None, source_ref: Reference = None, album_ref: Reference = None) -> List[Source]:
"""
Gets a list of sources. if source_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor source_ref are passed in it will return ALL sources**
:param artist_ref:
:param song_ref:
:param source_ref:
:param type_str: the thing the source belongs to like eg. "song" or "album"
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif source_ref is not None:
where = f"id=\"{source_ref.id}\" AND type=\"{SourceTypes.SONG.value}\""
elif artist_ref is not None:
where = f"song_id=\"{artist_ref.id}\" AND type=\"{SourceTypes.ARTIST.value}\""
elif album_ref is not None:
where = f"song_id=\"{album_ref.id}\" AND type=\"{SourceTypes.ALBUM.value}\""
query = SOURCE_QUERY.format(where=where)
self.cursor.execute(query)
source_rows = self.cursor.fetchall()
return [
Source(
page_enum=SourcePages(source_row['src']),
type_enum=SourceTypes(source_row['type']),
url=source_row['url'],
id_=source_row['id']
) for source_row in source_rows
]
def pull_artist_song(self, song_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "SongArtist"
wheres = []
if song_ref is not None:
wheres.append(f"song_id=\"{song_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["song_id"]),
Reference(join["artist_id"]),
bool(join["is_feature"])
) for join in joins]
def pull_artist_album(self, album_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "AlbumArtist"
wheres = []
if album_ref is not None:
wheres.append(f"album_id=\"{album_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["album_id"]),
Reference(join["artist_id"])
) for join in joins]
def get_artist_from_row(self, artist_row, exclude_relations: set = None, flat: bool = False) -> Artist:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Artist)
artist_id = artist_row['artist_id']
artist_obj = Artist(
id_=artist_id,
name=artist_row['artist_name'],
source_list=self.pull_sources(artist_ref=Reference(id_=artist_id))
)
if flat:
return artist_obj
# fetch songs :D
for song_ref, _, is_feature in self.pull_artist_song(artist_ref=Reference(id_=artist_id)):
new_songs = self.pull_songs(song_ref=song_ref, exclude_relations=new_exclude_relations)
if len(new_songs) < 1:
continue
new_song = new_songs[0]
if is_feature:
artist_obj.feature_songs.append(new_song)
else:
artist_obj.main_songs.append(new_song)
# fetch albums
for album_ref, _ in self.pull_artist_album(artist_ref=Reference(id_=artist_id)):
new_albums = self.pull_albums(album_ref=album_ref, exclude_relations=new_exclude_relations)
if len(new_albums) < 1:
continue
artist_obj.main_albums.append(new_albums[0])
return artist_obj
def pull_artists(self, artist_ref: Reference = None, exclude_relations: set = None, flat: bool = False) -> List[Artist]:
"""
:param artist_ref:
:param exclude_relations:
:param flat: if it is true it ONLY fetches the artist data
:return:
"""
where = "1=1"
if artist_ref is not None:
where = f"Artist.id=\"{artist_ref.id}\""
query = ARTIST_QUERY.format(where=where)
self.cursor.execute(query)
artist_rows = self.cursor.fetchall()
return [(
self.get_artist_from_row(artist_row, exclude_relations=exclude_relations, flat=flat)
) for artist_row in artist_rows]
def get_song_from_row(self, song_result, exclude_relations: set = None) -> Song:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Song)
song_id = song_result['song_id']
# maybee fetch album
song_obj = Song(
id_=song_id,
title=song_result['title'],
isrc=song_result['isrc'],
length=song_result['length'],
tracksort=song_result['tracksort'],
genre=song_result['genre'],
target=Target(
id_=song_result['target_id'],
file=song_result['file'],
path=song_result['path']
),
source_list=self.pull_sources(song_ref=Reference(id_=song_id)),
lyrics=self.pull_lyrics(song_ref=Reference(id_=song_id)),
)
if Album not in exclude_relations and song_result['album_id'] is not None:
album_obj = self.pull_albums(album_ref=Reference(song_result['album_id']),
exclude_relations=new_exclude_relations)
if len(album_obj) > 0:
song_obj.album = album_obj[0]
flat_artist = Artist in exclude_relations
main_artists = []
feature_artists = []
for song_ref, artist_ref, is_feature in self.pull_artist_song(song_ref=Reference(song_id)):
if is_feature:
feature_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
else:
main_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
song_obj.main_artist_list = main_artists
song_obj.feature_artist_list = feature_artists
return song_obj
def pull_songs(self, song_ref: Reference = None, album_ref: Reference = None, exclude_relations: set = set()) -> \
List[Song]:
"""
This function is used to get one song (including its children like Sources etc)
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param song_ref:
:param album_ref:
:return requested_song:
"""
where = "1=1"
if song_ref is not None:
where = f"Song.id=\"{song_ref.id}\""
elif album_ref is not None:
where = f"Song.album_id=\"{album_ref.id}\""
query = SONG_QUERY.format(where=where)
self.cursor.execute(query)
song_rows = self.cursor.fetchall()
return [self.get_song_from_row(
song_result=song_result,
exclude_relations=exclude_relations
) for song_result in song_rows]
def get_album_from_row(self, album_result, exclude_relations=None) -> Album:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = exclude_relations.copy()
new_exclude_relations.add(Album)
album_id = album_result['album_id']
album_obj = Album(
id_=album_id,
title=album_result['title'],
label=album_result['label'],
album_status=album_result['album_status'],
language=pycountry.languages.get(alpha_3=album_result['language']),
date=ID3Timestamp.strptime(album_result['date'], "%Y-%m-%d"),
country=album_result['country'],
barcode=album_result['barcode'],
is_split=album_result['is_split'],
albumsort=album_result['albumsort'],
source_list=self.pull_sources(album_ref=Reference(id_=album_id))
)
if Song not in exclude_relations:
# getting the tracklist
tracklist: List[Song] = self.pull_songs(
album_ref=Reference(id_=album_id),
exclude_relations=new_exclude_relations
)
album_obj.set_tracklist(tracklist=tracklist)
flat_artist = Artist in exclude_relations
for _, artist_ref in self.pull_artist_album(album_ref=Reference(id_=album_id)):
artists = self.pull_artists(artist_ref, flat=flat_artist, exclude_relations=new_exclude_relations)
if len(artists) < 1:
continue
album_obj.artists.append(artists[0])
return album_obj
def pull_albums(self, album_ref: Reference = None, song_ref: Reference = None, exclude_relations: set = None) -> \
List[Album]:
"""
This function is used to get matching albums/releses
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param album_ref:
:return requested_album_list:
"""
if exclude_relations is None:
exclude_relations = set()
query = ALBUM_QUERY_UNJOINED
where = "1=1"
if album_ref is not None:
query = ALBUM_QUERY_UNJOINED
where = f"Album.id=\"{album_ref.id}\""
elif song_ref is not None:
query = ALBUM_QUERY_JOINED
where = f"Song.id=\"{song_ref.id}\""
query = query.format(where=where)
self.cursor.execute(query)
album_rows = self.cursor.fetchall()
return [self.get_album_from_row(
album_result=album_row,
exclude_relations=exclude_relations
) for album_row in album_rows]
if __name__ == "__main__":
cache = Database("")

View File

@ -257,19 +257,16 @@ class MetadataAttribute:
# the mutagen object for each frame will be generated dynamically
id3_dict: Dict[any, list]
def __init__(self, id3_dict: Dict[any, list] = None) -> None:
self.id3_dict = dict()
if id3_dict is not None:
self.add_metadata_dict(id3_dict)
def __setitem__(self, frame, value_list: list, override_existing: bool = True):
if len(value_list) == 0:
return
if type(value_list) != list:
raise ValueError(f"can only set attribute to list, not {type(value_list)}")
new_val = [i for i in value_list if i is not None]
new_val = [i for i in value_list if i not in {None, ''}]
if len(new_val) == 0:
return
@ -288,10 +285,9 @@ class MetadataAttribute:
return None
return self.id3_dict[key]
def delete_field(self, key: str):
if key in self.id3_attributes:
return self.id3_attributes.pop(key)
if key in self.id3_dict:
return self.id3_dict.pop(key)
def add_metadata_dict(self, metadata_dict: dict, override_existing: bool = True):
for field_enum, value in metadata_dict.items():
@ -314,7 +310,6 @@ class MetadataAttribute:
for other in many_other:
self.merge(other)
def get_id3_value(self, field):
if field not in self.id3_dict:
return None
@ -327,6 +322,9 @@ class MetadataAttribute:
if type(element) == str:
continue
if type(element) in {int}:
list_data[i] = str(element)
if type(element) == ID3Timestamp:
list_data[i] = element.timestamp
continue
@ -350,7 +348,6 @@ class MetadataAttribute:
rows.append(f"{key} - {str(value)}")
return "\n".join(rows)
def __iter__(self):
"""
returns a generator, you can iterate through,

View File

@ -200,7 +200,7 @@ class Song(DatabaseObject, SourceAttribute, MetadataAttribute):
metadata = MetadataAttribute.Metadata({
id3Mapping.TITLE: [self.title],
id3Mapping.ISRC: [self.isrc],
id3Mapping.LENGTH: [str(self.length)],
id3Mapping.LENGTH: [self.length],
id3Mapping.GENRE: [self.genre],
id3Mapping.TRACKNUMBER: [self.tracksort_str]
})
@ -321,6 +321,8 @@ class Album(DatabaseObject, SourceAttribute, MetadataAttribute):
})
def get_copyright(self) -> str:
if self.date is None:
return None
if self.date.year == 1 or self.label is None:
return None

View File

@ -1,189 +0,0 @@
from typing import List
import uuid
import os
from mutagen.easyid3 import EasyID3
from ..utils.shared import (
MUSIC_DIR,
SONG_LOGGER as logger
)
from .objects.parents import DatabaseObject
class Metadata:
def __init__(self) -> None:
self.data = {}
def get_all_metadata(self):
return list(self.data.items())
def __setitem__(self, item, value):
if item in EasyID3.valid_keys.keys():
self.data[item] = value
def __getitem__(self, item):
if item not in self.data:
return None
return self.data[item]
class Source(DatabaseObject):
def __init__(self, id_: str = None, src: str = None, url: str = None) -> None:
super().__init__(id_=id_)
self.src = src
self.url = url
class Target(DatabaseObject):
def __init__(self, id_:str = None, file: str = None, path: str = None) -> None:
super().__init__(id_=id_)
self._file = file
self._path = path
def set_file(self, _file: str):
self._file = _file
def get_file(self) -> str | None:
if self._file is None:
return None
return os.path.join(MUSIC_DIR, self._file)
def set_path(self, _path: str):
self._path = _path
def get_path(self) -> str | None:
if self._path is None:
return None
return os.path.join(MUSIC_DIR, self._path)
def get_exists_on_disc(self) -> bool:
"""
returns True when file can be found on disc
returns False when file can't be found on disc or no filepath is set
"""
if not self.is_set():
return False
return os.path.exists(self.file)
def is_set(self) -> bool:
return not (self._file is None or self._path is None)
file = property(fget=get_file, fset=set_file)
path = property(fget=get_path, fset=set_path)
exists_on_disc = property(fget=get_exists_on_disc)
class Artist(DatabaseObject):
def __init__(self, id_: str = None, mb_id: str = None, name: str = None) -> None:
super().__init__(id_=id_)
self.mb_id = mb_id
self.name = name
def __eq__(self, __o: object) -> bool:
if type(__o) != type(self):
return False
return self.id == __o.id
def __str__(self) -> str:
return self.name
class Lyrics(DatabaseObject):
def __init__(self, text: str, language: str, id_: str = None) -> None:
super().__init__(id_=id_)
self.text = text
self.language = language
class LyricsContainer:
def __init__(self):
self.lyrics_list: List[Lyrics] = []
def append(self, lyrics: Lyrics):
# due to my db not supporting multiple Lyrics yet, I just use for doing stuff with the lyrics
# the first element. I know this implementation is junk, but take it or leave it, it is going
# soon anyway
if len(self.lyrics_list) >= 1:
return
self.lyrics_list.append(lyrics)
# unfortunately can't do this here directly, because of circular imports. If anyone
# took the time to get familiar with this codebase... thank you, and if you have any
# suggestion of resolving this, please open an issue.
# cache.add_lyrics(track_id=self.parent.id, lyrics=lyrics.text)
def extend(self, lyrics_list: List[Lyrics]):
for lyrics in lyrics_list:
self.append(lyrics)
is_empty = property(fget=lambda self: len(self.lyrics_list) <= 0)
class Song(DatabaseObject):
def __init__(
self,
id_: str = None,
mb_id: str = None,
title: str = None,
release: str = None,
isrc: str = None,
length: int = None,
artists: List[Artist] = None,
metadata: Metadata = None,
sources: List[Source] = None,
target: Target = None,
lyrics: LyricsContainer = None
) -> None:
"""
id: is not NECESARRILY the musicbrainz id, but is DISTINCT for every song
mb_id: is the musicbrainz_id
target: Each Song can have exactly one target which can be either full or empty
lyrics: There can be multiple lyrics. Each Lyrics object can me added to multiple lyrics
"""
super().__init__(id_=id_)
# attributes
# self.id_: str | None = id_
self.mb_id: str | None = mb_id
self.title: str | None = title
self.release: str | None = release
self.isrc: str | None = isrc
self.length: int | None = length
if metadata is None:
metadata = Metadata()
self.metadata: Metadata = metadata
# joins
if artists is None:
artists = []
self.artists: List[Artist] = artists
if sources is None:
sources = []
self.sources: List[Source] = sources
if target is None:
target = Target()
self.target: Target = target
if lyrics is None:
lyrics = LyricsContainer()
self.lyrics: LyricsContainer = lyrics
def __str__(self) -> str:
return f"\"{self.title}\" by {', '.join([str(a) for a in self.artists])}"
def __repr__(self) -> str:
return self.__str__()
def get_metadata(self):
return self.metadata.get_all_metadata()
def has_isrc(self) -> bool:
return self.isrc is not None
def get_artist_names(self) -> List[str]:
return [a.name for a in self.artists]

View File

@ -1,4 +1,4 @@
from .new_database import Database
from .database import Database
from ..utils.shared import (
TEMP_DATABASE_PATH,

View File

@ -6,7 +6,6 @@ from ..database import (
Song,
Source,
Album,
Metadata,
Artist,
Lyrics,
Target,

View File

@ -46,7 +46,10 @@ class EncyclopaediaMetallum(Page):
@classmethod
def search_for_song(cls, query: Page.Query) -> List[Song]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?songTitle={song}&bandName={artist}&releaseTitle={album}&lyrics=&genre=&sEcho=1&iColumns=5&sColumns=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&mDataProp_3=3&mDataProp_4=4&_=1674550595663"
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/songs/?songTitle={song}&bandName={" \
"artist}&releaseTitle={album}&lyrics=&genre=&sEcho=1&iColumns=5&sColumns=&iDisplayStart=0" \
"&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&mDataProp_3=3&mDataProp_4=4&_" \
"=1674550595663"
r = cls.API_SESSION.get(endpoint.format(song=query.song_str, artist=query.artist_str, album=query.album_str))
if r.status_code != 200:
@ -64,7 +67,11 @@ class EncyclopaediaMetallum(Page):
@classmethod
def search_for_album(cls, query: Page.Query) -> List[Album]:
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/albums/?bandName={artist}&releaseTitle={album}&releaseYearFrom=&releaseMonthFrom=&releaseYearTo=&releaseMonthTo=&country=&location=&releaseLabelName=&releaseCatalogNumber=&releaseIdentifiers=&releaseRecordingInfo=&releaseDescription=&releaseNotes=&genre=&sEcho=1&iColumns=3&sColumns=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&_=1674563943747"
endpoint = "https://www.metal-archives.com/search/ajax-advanced/searching/albums/?bandName={" \
"artist}&releaseTitle={album}&releaseYearFrom=&releaseMonthFrom=&releaseYearTo=&releaseMonthTo" \
"=&country=&location=&releaseLabelName=&releaseCatalogNumber=&releaseIdentifiers" \
"=&releaseRecordingInfo=&releaseDescription=&releaseNotes=&genre=&sEcho=1&iColumns=3&sColumns" \
"=&iDisplayStart=0&iDisplayLength=200&mDataProp_0=0&mDataProp_1=1&mDataProp_2=2&_=1674563943747"
r = cls.API_SESSION.get(endpoint.format(artist=query.artist_str, album=query.album_str))
if r.status_code != 200:
@ -88,7 +95,7 @@ class EncyclopaediaMetallum(Page):
return []
return [
cls.get_artist_from_json(html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
cls.get_artist_from_json(artist_html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
for raw_artist in r.json()['aaData']
]
@ -106,22 +113,22 @@ class EncyclopaediaMetallum(Page):
return []
return [
cls.get_artist_from_json(html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
cls.get_artist_from_json(artist_html=raw_artist[0], genre=raw_artist[1], country=raw_artist[2])
for raw_artist in r.json()['aaData']
]
@classmethod
def get_artist_from_json(cls, html=None, genre=None, country=None) -> Artist:
def get_artist_from_json(cls, artist_html=None, genre=None, country=None) -> Artist:
"""
TODO parse the country to a standart
"""
# parse the html
# parse the html for the band name and link on metal-archives
soup = BeautifulSoup(html, 'html.parser')
soup = BeautifulSoup(artist_html, 'html.parser')
anchor = soup.find('a')
artist_name = anchor.text
artist_url = anchor.get('href')
artist_id = int(artist_url.split("/")[-1])
artist_id = artist_url.split("/")[-1]
notes = f"{artist_name} is a {genre} band from {country}"
@ -136,7 +143,7 @@ class EncyclopaediaMetallum(Page):
return Artist(
id_=artist_id,
name=artist_name,
sources=[
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, artist_url)
],
notes=notes
@ -150,17 +157,19 @@ class EncyclopaediaMetallum(Page):
anchor = soup.find('a')
album_name = anchor.text
album_url = anchor.get('href')
album_id = int(album_url.split("/")[-1])
album_id = album_url.split("/")[-1]
"""
TODO implement release type
TODO add artist argument to
"""
return Album(
id_=album_id,
title=album_name,
sources=[
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, album_url)
],
artists=[
cls.get_artist_from_json(artist_html=artist_html)
]
)
@ -169,7 +178,6 @@ class EncyclopaediaMetallum(Page):
lyrics_html=None) -> Song:
song_id = None
if lyrics_html is not None:
# <a href="javascript:;" id="lyricsLink_5948443" title="Toggle lyrics display" class="viewLyrics iconContainer ui-state-default"><span class="ui-icon ui-icon-script">Edit song lyrics</span></a>
soup = BeautifulSoup(lyrics_html, 'html.parser')
anchor = soup.find('a')
raw_song_id = anchor.get('id')
@ -179,20 +187,22 @@ class EncyclopaediaMetallum(Page):
id_=song_id,
title=title,
main_artist_list=[
cls.get_artist_from_json(html=artist_html)
cls.get_artist_from_json(artist_html=artist_html)
],
album=cls.get_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html)
album=cls.get_album_from_json(album_html=album_html, release_type=release_type, artist_html=artist_html),
source_list=[
Source(SourcePages.ENCYCLOPAEDIA_METALLUM, song_id)
]
)
@classmethod
def fetch_artist_details(cls, artist: Artist) -> Artist:
relevant_source = None
for source in artist.sources:
if source.page_enum == cls.SOURCE_TYPE:
relevant_source = source
break
if relevant_source is None:
source_list = artist.get_sources_from_page(cls.SOURCE_TYPE)
if len(source_list) == 0:
return artist
print(relevant_source.url)
# taking the fist source, cuz I only need one and multiple sources don't make that much sense
source = source_list[0]
print(source)
return artist

Binary file not shown.