STARTED IMPLEMENTING DB

STARTED IMPLEMENTING DB
This commit is contained in:
Hellow
2023-02-14 23:07:16 +01:00
parent 021f8a6905
commit 5a699c3937
27 changed files with 990 additions and 755 deletions

View File

@@ -1,8 +1,8 @@
from . import (
temp_database,
objects,
database
old_database
)
from .. import objects
MusicObject = objects.MusicObject

View File

@@ -0,0 +1,113 @@
from typing import List, Union, Type
from peewee import (
SqliteDatabase,
PostgresqlDatabase,
MySQLDatabase,
Model,
CharField,
IntegerField,
BooleanField,
ForeignKeyField,
TextField
)
class Album(Model):
"""A class representing an album in the music database."""
title: str = CharField()
label: str = CharField()
album_status: str = CharField()
language: str = CharField()
date: str = CharField()
date_format: str = CharField()
country: str = CharField()
barcode: str = CharField()
albumsort: int = IntegerField()
is_split: bool = BooleanField(default=False)
class Artist(Model):
"""A class representing an artist in the music database."""
name: str = CharField()
class Song(Model):
"""A class representing a song in the music database."""
name: str = CharField()
isrc: str = CharField()
length: int = IntegerField()
tracksort: int = IntegerField()
genre: str = CharField()
album: ForeignKeyField = ForeignKeyField(Album, backref='songs')
class Source(Model):
"""A class representing a source of a song in the music database."""
type: str = CharField()
src: str = CharField()
url: str = CharField()
content_type: str = CharField()
content_id: int = IntegerField()
content: ForeignKeyField = ForeignKeyField('self', backref='content_items', null=True)
@property
def content_object(self) -> Union[Song, Album, Artist, None]:
"""Get the content associated with the source as an object."""
if self.content_type == 'Song':
return Song.get(Song.id == self.content_id)
elif self.content_type == 'Album':
return Album.get(Album.id == self.content_id)
elif self.content_type == 'Artist':
return Artist.get(Artist.id == self.content_id)
else:
return None
@content_object.setter
def content_object(self, value: Union[Song, Album, Artist]) -> None:
"""Set the content associated with the source as an object."""
self.content_type = value.__class__.__name__
self.content_id = value.id
class Target(Model):
"""A class representing a target of a song in the music database."""
file: str = CharField()
path: str = CharField()
song = ForeignKeyField(Song, backref='targets')
class Lyrics(Model):
"""A class representing lyrics of a song in the music database."""
text: str = TextField()
language: str = CharField()
song = ForeignKeyField(Song, backref='lyrics')
class SongArtist(Model):
"""A class representing the relationship between a song and an artist."""
song: ForeignKeyField = ForeignKeyField(Song, backref='song_artists')
artist: ForeignKeyField = ForeignKeyField(Artist, backref='song_artists')
is_feature: bool = BooleanField(default=False)
class AlbumArtist(Model):
"""A class representing the relationship between an album and an artist."""
album: ForeignKeyField = ForeignKeyField(Album, backref='album_artists')
artist: ForeignKeyField = ForeignKeyField(Artist, backref='album_artists')
class Models:
def __init__(self, database: Union[SqliteDatabase, PostgresqlDatabase, MySQLDatabase]):
self.database: Union[SqliteDatabase, PostgresqlDatabase, MySQLDatabase] = database
def get_obj(self, _model: Model):
_model._meta.database = self.database

View File

@@ -1,702 +1,79 @@
import sqlite3
import os
import logging
from typing import List, Tuple
from pkg_resources import resource_string
import datetime
import pycountry
from .objects.parents import Reference
from .objects.source import Source
from .objects import (
Song,
Lyrics,
Target,
Artist,
Album,
ID3Timestamp,
SourceTypes,
SourcePages,
SourceAttribute
from typing import Optional, Union
from enum import Enum
from peewee import (
SqliteDatabase,
MySQLDatabase,
PostgresqlDatabase,
)
"""
import peewee
from . import data_models
db = peewee.SqliteDatabase('music.db')
class BaseModel(peewee.Model):
class Meta:
database = db
class Artist(BaseModel):
name = peewee.CharField()
class Song(BaseModel):
title = peewee.CharField()
artist = peewee.ManyToManyField(Artist, backref='songs')
db.connect()
db.create_tables([Artist, Song, Song.artist.get_through_model()], safe=True)
# Adding a song and its artists
beatles = Artist.create(name='The Beatles')
rolling_stones = Artist.create(name='The Rolling Stones')
song = Song.create(title='Hey Jude')
song.artist.add(beatles, rolling_stones)
# Querying songs by artist
songs = Song.select().join(Song.artist).where(Artist.name == 'The Beatles')
for song in songs:
print(song.title)
"""
logger = logging.getLogger("database")
# Due to this not being deployed on a Server **HOPEFULLY**
# I don't need to parameterize stuff like the where and
# use complicated query builder
SONG_QUERY = """
SELECT
Song.id AS song_id, Song.name AS title, Song.isrc AS isrc, Song.length AS length, Song.album_id as album_id, Song.tracksort,
Target.id AS target_id, Target.file AS file, Target.path AS path, Song.genre AS genre
FROM Song
LEFT JOIN Target ON Song.id=Target.song_id
WHERE {where};
"""
SOURCE_QUERY = """
SELECT id, type, src, url, song_id
FROM Source
WHERE {where};
"""
LYRICS_QUERY = """
SELECT id, text, language, song_id
FROM Lyrics
WHERE {where};
"""
ALBUM_QUERY_UNJOINED = """
SELECT Album.id AS album_id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split
FROM Album
WHERE {where};
"""
ALBUM_QUERY_JOINED = """
SELECT a.id AS album_id, a.title, a.label, a.album_status, a.language, a.date, a.date_format, a.country, a.barcode, a.albumsort, a.is_split
FROM Song
INNER JOIN Album a ON Song.album_id=a.id
WHERE {where};
"""
ARTIST_QUERY = """
SELECT id as artist_id, name as artist_name
FROM Artist
WHERE {where};
"""
class DatabaseType(Enum):
SQLITE = "sqlite"
POSTGRESQL = "postgresql"
MYSQL = "mysql"
class Database:
def __init__(self, database_file: str):
self.database_file: str = database_file
self.connection, self.cursor = self.reset_cursor()
database: Union[SqliteDatabase, PostgresqlDatabase, MySQLDatabase]
self.cursor = self.connection.cursor()
def __init__(
self,
db_type: DatabaseType,
db_name: str,
db_user: Optional[str] = None,
db_password: Optional[str] = None,
db_host: Optional[str] = None,
db_port: Optional[int] = None
):
self.db_type = db_type
self.db_name = db_name
self.db_user = db_user
self.db_password = db_password
self.db_host = db_host
self.db_port = db_port
def reset(self):
"""
Deletes all Data from the database if it exists
and resets the schema defined in self.structure_file
"""
logger.info(f"resetting the database")
self.initialize_database()
# deleting the database
del self.connection
del self.cursor
os.remove(self.database_file)
def create_database(self) -> Union[SqliteDatabase, PostgresqlDatabase, MySQLDatabase]:
"""Create a database instance based on the configured database type and parameters.
# newly creating the database
self.reset_cursor()
query = resource_string("music_kraken", "static_files/new_db.sql").decode('utf-8')
# fill the database with the schematic
self.cursor.executescript(query)
self.connection.commit()
def reset_cursor(self) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
self.connection = sqlite3.connect(self.database_file)
# This is necessary that fetching rows returns dicts instead of tuple
self.connection.row_factory = sqlite3.Row
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def push_one(self, db_object: Song | Lyrics | Target | Artist | Source | Album):
if db_object.dynamic:
return
if type(db_object) == Song:
return self.push_song(song=db_object, pushed=set())
"""
if type(db_object) == Lyrics:
return self.push_lyrics(lyrics=db_object)
if type(db_object) == Target:
return self.push_target(target=db_object)
Returns:
The created database instance, or None if an invalid database type was specified.
"""
if type(db_object) == Artist:
return self.push_artist(artist=db_object, pushed=set())
"""
if type(db_object) == Source:
# needs to have the property type_enum or type_str set
return self.push_source(source=db_object)
"""
if type(db_object) == Album:
return self.push_album(album=db_object, pushed=set())
logger.warning(f"type {type(db_object)} isn't yet supported by the db")
def push(self, db_object_list: List[Song | Lyrics | Target | Artist | Source | Album]):
"""
This function is used to Write the data of any db_object to the database
It syncs a whole list of db_objects to the database and is meant
as the primary method to add to the database.
:param db_object_list:
"""
for db_object in db_object_list:
self.push_one(db_object)
def push_album(self, album: Album, pushed: set):
table = "Album"
query = f"INSERT OR REPLACE INTO {table} (id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
if album.id in pushed:
return
pushed.add(album.id)
date_format, date = album.date.get_timestamp_w_format()
values = (
album.id,
album.title,
album.label,
album.album_status,
album.iso_639_2_language,
date,
date_format,
album.country,
album.barcode,
album.albumsort,
album.is_split
)
self.cursor.execute(query, values)
self.connection.commit()
for song in album.tracklist:
self.push_song(song, pushed=pushed)
for artist in album.artists:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
self.push_artist(artist, pushed=pushed)
for source in album.source_list:
source.type_enum = SourceTypes.ALBUM
source.add_song(album)
self.push_source(source=source)
def push_song(self, song: Song, pushed: set):
if song.dynamic:
return
if song.id in pushed:
return
pushed.add(song.id)
# ADDING THE DATA FOR THE SONG OBJECT
"""
db_field - object attribute
-------------------------------
id - id
name - title
"""
table = "Song"
values = (
song.id,
song.title,
song.isrc,
song.length,
song.get_album_id(),
song.tracksort,
song.genre
)
query = f"INSERT OR REPLACE INTO {table} (id, name, isrc, length, album_id, tracksort, genre) VALUES (?, ?, ?, ?, ?, ?, ?);"
self.cursor.execute(query, values)
self.connection.commit()
# add sources
for source in song.source_list:
source.add_song(song)
source.type_enum = SourceTypes.SONG
self.push_source(source=source)
# add lyrics
for single_lyrics in song.lyrics:
single_lyrics.add_song(song)
self.push_lyrics(lyrics=single_lyrics)
# add target
song.target.add_song(song)
self.push_target(target=song.target)
for main_artist in song.main_artist_list:
self.push_artist_song(artist_ref=Reference(main_artist.id), song_ref=Reference(song.id), is_feature=False)
self.push_artist(artist=main_artist, pushed=pushed)
for feature_artist in song.feature_artist_list:
self.push_artist_song(artist_ref=Reference(feature_artist.id), song_ref=Reference(song.id), is_feature=True)
self.push_artist(artist=feature_artist, pushed=pushed)
if song.album is not None:
self.push_album(song.album, pushed=pushed)
def push_lyrics(self, lyrics: Lyrics):
if lyrics.dynamic:
return
table = "Lyrics"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, text, language) VALUES (?, ?, ?, ?);"
values = (
lyrics.id,
lyrics.song_ref_id,
lyrics.text,
lyrics.language
)
self.cursor.execute(query, values)
self.connection.commit()
def push_source(self, source: Source):
if source.dynamic:
return
table = "Source"
query = f"INSERT OR REPLACE INTO {table} (id, type, song_id, src, url) VALUES (?, ?, ?, ?, ?);"
values = (
source.id,
source.type_str,
source.song_ref_id,
source.page_str,
source.url
)
self.cursor.execute(query, values)
self.connection.commit()
def push_target(self, target: Target):
if target.dynamic:
return
table = "Target"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, file, path) VALUES (?, ?, ?, ?);"
values = (
target.id,
target.song_ref_id,
target.file,
target.path
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_song(self, artist_ref: Reference, song_ref: Reference, is_feature: bool):
table = "SongArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE song_id=\"{song_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (song_id, artist_id, is_feature) VALUES (?, ?, ?);"
values = (
song_ref.id,
artist_ref.id,
is_feature
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_album(self, artist_ref: Reference, album_ref: Reference):
table = "AlbumArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE album_id=\"{album_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (album_id, artist_id) VALUES (?, ?);"
values = (
album_ref.id,
artist_ref.id
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist(self, artist: Artist, pushed: set):
if artist.dynamic:
return
if artist.id in pushed:
return
pushed.add(artist.id)
table = "Artist"
query = f"INSERT OR REPLACE INTO {table} (id, name) VALUES (?, ?);"
values = (
artist.id,
artist.name
)
self.cursor.execute(query, values)
self.connection.commit()
for song in artist.feature_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=True)
self.push_song(song=song, pushed=pushed)
for song in artist.main_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=False)
self.push_song(song=song, pushed=pushed)
for album in artist.main_albums:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
for source in artist.source_list:
source.type_enum = SourceTypes.ARTIST
source.add_song(artist)
self.push_source(source)
def pull_lyrics(self, song_ref: Reference = None, lyrics_ref: Reference = None) -> List[Lyrics]:
"""
Gets a list of sources. if lyrics_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor lyrics_ref are passed in it will return ALL lyrics**
:param song_ref:
:param lyrics_ref:
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif lyrics_ref is not None:
where = f"id=\"{lyrics_ref.id}\""
query = LYRICS_QUERY.format(where=where)
self.cursor.execute(query)
lyrics_rows = self.cursor.fetchall()
return [Lyrics(
id_=lyrics_row['id'],
text=lyrics_row['text'],
language=lyrics_row['language']
) for lyrics_row in lyrics_rows]
def pull_sources(self, artist_ref: Reference = None, song_ref: Reference = None, source_ref: Reference = None, album_ref: Reference = None) -> List[Source]:
"""
Gets a list of sources. if source_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor source_ref are passed in it will return ALL sources**
:param artist_ref:
:param song_ref:
:param source_ref:
:param type_str: the thing the source belongs to like eg. "song" or "album"
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif source_ref is not None:
where = f"id=\"{source_ref.id}\" AND type=\"{SourceTypes.SONG.value}\""
elif artist_ref is not None:
where = f"song_id=\"{artist_ref.id}\" AND type=\"{SourceTypes.ARTIST.value}\""
elif album_ref is not None:
where = f"song_id=\"{album_ref.id}\" AND type=\"{SourceTypes.ALBUM.value}\""
query = SOURCE_QUERY.format(where=where)
self.cursor.execute(query)
source_rows = self.cursor.fetchall()
return [
Source(
page_enum=SourcePages(source_row['src']),
type_enum=SourceTypes(source_row['type']),
url=source_row['url'],
id_=source_row['id']
) for source_row in source_rows
]
def pull_artist_song(self, song_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "SongArtist"
wheres = []
if song_ref is not None:
wheres.append(f"song_id=\"{song_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["song_id"]),
Reference(join["artist_id"]),
bool(join["is_feature"])
) for join in joins]
def pull_artist_album(self, album_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "AlbumArtist"
wheres = []
if album_ref is not None:
wheres.append(f"album_id=\"{album_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["album_id"]),
Reference(join["artist_id"])
) for join in joins]
def get_artist_from_row(self, artist_row, exclude_relations: set = None, flat: bool = False) -> Artist:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Artist)
artist_id = artist_row['artist_id']
artist_obj = Artist(
id_=artist_id,
name=artist_row['artist_name'],
source_list=self.pull_sources(artist_ref=Reference(id_=artist_id))
)
if flat:
return artist_obj
# fetch songs :D
for song_ref, _, is_feature in self.pull_artist_song(artist_ref=Reference(id_=artist_id)):
new_songs = self.pull_songs(song_ref=song_ref, exclude_relations=new_exclude_relations)
if len(new_songs) < 1:
continue
new_song = new_songs[0]
if is_feature:
artist_obj.feature_songs.append(new_song)
else:
artist_obj.main_songs.append(new_song)
# fetch albums
for album_ref, _ in self.pull_artist_album(artist_ref=Reference(id_=artist_id)):
new_albums = self.pull_albums(album_ref=album_ref, exclude_relations=new_exclude_relations)
if len(new_albums) < 1:
continue
artist_obj.main_albums.append(new_albums[0])
return artist_obj
def pull_artists(self, artist_ref: Reference = None, exclude_relations: set = None, flat: bool = False) -> List[Artist]:
"""
:param artist_ref:
:param exclude_relations:
:param flat: if it is true it ONLY fetches the artist data
:return:
"""
where = "1=1"
if artist_ref is not None:
where = f"Artist.id=\"{artist_ref.id}\""
query = ARTIST_QUERY.format(where=where)
self.cursor.execute(query)
artist_rows = self.cursor.fetchall()
return [(
self.get_artist_from_row(artist_row, exclude_relations=exclude_relations, flat=flat)
) for artist_row in artist_rows]
def get_song_from_row(self, song_result, exclude_relations: set = None) -> Song:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Song)
song_id = song_result['song_id']
# maybee fetch album
song_obj = Song(
id_=song_id,
title=song_result['title'],
isrc=song_result['isrc'],
length=song_result['length'],
tracksort=song_result['tracksort'],
genre=song_result['genre'],
target=Target(
id_=song_result['target_id'],
file=song_result['file'],
path=song_result['path']
),
source_list=self.pull_sources(song_ref=Reference(id_=song_id)),
lyrics=self.pull_lyrics(song_ref=Reference(id_=song_id)),
)
if Album not in exclude_relations and song_result['album_id'] is not None:
album_obj = self.pull_albums(album_ref=Reference(song_result['album_id']),
exclude_relations=new_exclude_relations)
if len(album_obj) > 0:
song_obj.album = album_obj[0]
flat_artist = Artist in exclude_relations
main_artists = []
feature_artists = []
for song_ref, artist_ref, is_feature in self.pull_artist_song(song_ref=Reference(song_id)):
if is_feature:
feature_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
else:
main_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
song_obj.main_artist_list = main_artists
song_obj.feature_artist_list = feature_artists
return song_obj
def pull_songs(self, song_ref: Reference = None, album_ref: Reference = None, exclude_relations: set = set()) -> \
List[Song]:
"""
This function is used to get one song (including its children like Sources etc)
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param song_ref:
:param album_ref:
:return requested_song:
"""
where = "1=1"
if song_ref is not None:
where = f"Song.id=\"{song_ref.id}\""
elif album_ref is not None:
where = f"Song.album_id=\"{album_ref.id}\""
query = SONG_QUERY.format(where=where)
self.cursor.execute(query)
song_rows = self.cursor.fetchall()
return [self.get_song_from_row(
song_result=song_result,
exclude_relations=exclude_relations
) for song_result in song_rows]
def get_album_from_row(self, album_result, exclude_relations=None) -> Album:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = exclude_relations.copy()
new_exclude_relations.add(Album)
album_id = album_result['album_id']
language = album_result['language']
if language is not None:
language = pycountry.languages.get(alpha_3=album_result['language'])
album_obj = Album(
id_=album_id,
title=album_result['title'],
label=album_result['label'],
album_status=album_result['album_status'],
language=language,
date=ID3Timestamp.strptime(album_result['date'], album_result['date_format']),
country=album_result['country'],
barcode=album_result['barcode'],
is_split=album_result['is_split'],
albumsort=album_result['albumsort'],
source_list=self.pull_sources(album_ref=Reference(id_=album_id))
)
if Song not in exclude_relations:
# getting the tracklist
tracklist: List[Song] = self.pull_songs(
album_ref=Reference(id_=album_id),
exclude_relations=new_exclude_relations
# SQLITE
if self.db_type == DatabaseType.SQLITE:
return SqliteDatabase(self.db_name)
# POSTGRES
if self.db_type == DatabaseType.POSTGRESQL:
return PostgresqlDatabase(
self.db_name,
user=self.db_user,
password=self.db_password,
host=self.db_host,
port=self.db_port,
)
album_obj.set_tracklist(tracklist=tracklist)
flat_artist = Artist in exclude_relations
for _, artist_ref in self.pull_artist_album(album_ref=Reference(id_=album_id)):
artists = self.pull_artists(artist_ref, flat=flat_artist, exclude_relations=new_exclude_relations)
if len(artists) < 1:
continue
album_obj.artists.append(artists[0])
# MYSQL
if self.db_type == DatabaseType.MYSQL:
return MySQLDatabase(
self.db_name,
user=self.db_user,
password=self.db_password,
host=self.db_host,
port=self.db_port,
)
return album_obj
raise ValueError("define a Valid database type")
def pull_albums(self, album_ref: Reference = None, song_ref: Reference = None, exclude_relations: set = None) -> \
List[Album]:
def initialize_database(self):
"""
This function is used to get matching albums/releses
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param album_ref:
:return requested_album_list:
Connect to the database
initialize the previously defined databases
create tables if they don't exist.
"""
if exclude_relations is None:
exclude_relations = set()
self.database = self.create_database()
query = ALBUM_QUERY_UNJOINED
where = "1=1"
if album_ref is not None:
query = ALBUM_QUERY_UNJOINED
where = f"Album.id=\"{album_ref.id}\""
elif song_ref is not None:
query = ALBUM_QUERY_JOINED
where = f"Song.id=\"{song_ref.id}\""
query = query.format(where=where)
self.cursor.execute(query)
album_rows = self.cursor.fetchall()
return [self.get_album_from_row(
album_result=album_row,
exclude_relations=exclude_relations
) for album_row in album_rows]
if __name__ == "__main__":
cache = Database("")
self.database.connect()

View File

@@ -2,7 +2,7 @@ from collections import defaultdict
from typing import Dict, List, Optional
import weakref
from .objects import MusicObject
from src.music_kraken.objects import MusicObject
"""
This is a cache for the objects, that et pulled out of the database.

View File

@@ -1,26 +0,0 @@
from . import (
song,
metadata,
source,
parents,
formatted_text
)
MusicObject = parents.DatabaseObject
ID3Mapping = metadata.Mapping
ID3Timestamp = metadata.ID3Timestamp
SourceTypes = source.SourceTypes
SourcePages = source.SourcePages
SourceAttribute = source.SourceAttribute
Song = song.Song
Artist = song.Artist
Source = source.Source
Target = song.Target
Lyrics = song.Lyrics
Album = song.Album
FormattedText = formatted_text.FormattedText

View File

@@ -1,22 +0,0 @@
from ...utils.shared import (
DATABASE_LOGGER as logger
)
from .parents import (
DatabaseObject,
Reference
)
class Artist(DatabaseObject):
def __init__(self, id_: str = None, mb_id: str = None, name: str = None) -> None:
super().__init__(id_=id_)
self.mb_id = mb_id
self.name = name
def __eq__(self, __o: object) -> bool:
if type(__o) != type(self):
return False
return self.id == __o.id
def __str__(self) -> str:
return self.name

View File

@@ -1,84 +0,0 @@
from typing import List
from .source import SourceAttribute
from ...utils import string_processing
class Collection:
"""
This an class for the iterables
like tracklist or discography
"""
_data: List[SourceAttribute]
_by_url: dict
_by_attribute: dict
def __init__(self, data: list = None, map_attributes: list = None, element_type=None) -> None:
"""
Attribute needs to point to
"""
self._by_url = dict()
self.map_attributes = map_attributes or []
self.element_type = element_type
self._by_attribute = {attr: dict() for attr in map_attributes}
self._data = data or []
for element in self._data:
self.map_element(element=element)
def map_element(self, element: SourceAttribute):
for source_url in element.source_url_map:
self._by_url[source_url] = element
for attr in self.map_attributes:
value = element.__getattribute__(attr)
if type(value) != str:
# this also throws out all none values
continue
self._by_attribute[attr][string_processing.unify(value)] = element
def get_object_with_source(self, url: str) -> any:
"""
Returns either None, or the object, that has a source
matching the url.
"""
if url in self._by_url:
return self._by_url[url]
def get_object_with_attribute(self, name: str, value: str):
if name not in self.map_attributes:
raise ValueError(f"didn't map the attribute {name}")
unified = string_processing.unify(value)
if unified in self._by_attribute[name]:
return self._by_attribute[name][unified]
def append(self, element: SourceAttribute):
if type(element) is not self.element_type and self.element_type is not None:
raise TypeError(f"{type(element)} is not the set type {self.element_type}")
self._data.append(element)
self.map_element(element)
def __iter__(self):
for element in self._data:
yield element
def __str__(self) -> str:
return "\n".join([f"{str(j).zfill(2)}: {i}" for j, i in enumerate(self._data)])
def __len__(self) -> int:
return len(self._data)
def copy(self) -> List:
"""
returns a shallow copy of the data list
"""
return self._data.copy()

View File

@@ -1,123 +0,0 @@
import pandoc
"""
TODO
implement in setup.py a skript to install pandocs
https://pandoc.org/installing.html
!!!!!!!!!!!!!!!!!!IMPORTANT!!!!!!!!!!!!!!!!!!
"""
class FormattedText:
doc = None
def __init__(
self,
plaintext: str = None,
markdown: str = None,
html: str = None
) -> None:
self.set_plaintext(plaintext)
self.set_markdown(markdown)
self.set_html(html)
def set_plaintext(self, plaintext: str):
if plaintext is None:
return
self.doc = pandoc.read(plaintext)
def set_markdown(self, markdown: str):
if markdown is None:
return
self.doc = pandoc.read(markdown, format="markdown")
def set_html(self, html: str):
if html is None:
return
self.doc = pandoc.read(html, format="html")
def get_markdown(self) -> str:
if self.doc is None:
return None
return pandoc.write(self.doc, format="markdown").strip()
def get_html(self) -> str:
if self.doc is None:
return None
return pandoc.write(self.doc, format="html").strip()
def get_plaintext(self) -> str:
if self.doc is None:
return None
return pandoc.write(self.doc, format="plain").strip()
plaintext = property(fget=get_plaintext, fset=set_plaintext)
markdown = property(fget=get_markdown, fset=set_markdown)
html = property(fget=get_html, fset=set_html)
class NotesAttributes:
def __init__(self) -> None:
pass
if __name__ == "__main__":
_plaintext = """
World of Work
1. The right to help out society, and being paied for it
2. The right to get paied, so you can get along well.
3. The right for every individual to sell their products to provide for
themselfes or for others
4. The right of fair competitions, meaning eg. no monopoles.
5. The right for a home.
6. The right to good healthcare
7. The right of protections against tragedies, be it personal ones, or
global ones.
8. The right to be educated in a way that enables you to work.
3 most important ones
1. The right to get paied, so you can get along well.
2. The right for a home.
3. The right for a good healthcare.
"""
_markdown = """
# World of Work
1. The right to help out society, and being paied for it
2. **The right to get paied, so you can get along well.**
3. The right for every individual to sell their products to provide for themselfes or for others
4. The right of fair competitions, meaning eg. no monopoles.
5. **The right for a home.**
6. **The right to good healthcare**
7. The right of protections against tragedies, be it personal ones, or global ones.
8. The right to be educated in a way that enables you to work.
## 3 most important ones
1. The right to get paied, so you can get along well.
2. The right for a home.
3. The right for a good healthcare.
"""
_html = """
<b>Contact:</b> <a href="mailto:ghostbath@live.com">ghostbath@live.com</a><br />
<br />
Although the band originally claimed that they were from Chongqing, China, it has been revealed in a 2015 interview with <b>Noisey</b> that they're an American band based in Minot, North Dakota.<br />
<br />
According to the band, "Ghost Bath" refers to "the act of committing suicide by submerging in a body of water."<br />
<br />
<b>Compilation appearance(s):</b><br />
- "Luminescence" on <i>Jericho Vol.36 - Nyctophobia</i> (2018) []
"""
# notes = FormattedText(html=html)
# notes = FormattedText(markdown=_markdown)
notes = FormattedText(plaintext=_plaintext)
# print(notes.get_html())
# print("-"*30)
# print(notes.get_markdown())
print(notes.get_markdown())

View File

@@ -1,379 +0,0 @@
from enum import Enum
from typing import List, Dict, Tuple
import dateutil.tz
from mutagen import id3
import datetime
class Mapping(Enum):
"""
These frames belong to the id3 standart
https://web.archive.org/web/20220830091059/https://id3.org/id3v2.4.0-frames
https://id3lib.sourceforge.net/id3/id3v2com-00.html
https://mutagen-specs.readthedocs.io/en/latest/id3/id3v2.4.0-frames.html
"""
# Textframes
TITLE = "TIT2"
ISRC = "TSRC"
LENGTH = "TLEN" # in milliseconds
DATE = "TYER"
TRACKNUMBER = "TRCK"
TOTALTRACKS = "TRCK" # Stored in the same frame with TRACKNUMBER, separated by '/': e.g. '4/9'.
TITLESORTORDER = "TSOT"
ENCODING_SETTINGS = "TSSE"
SUBTITLE = "TIT3"
SET_SUBTITLE = "TSST"
RELEASE_DATE = "TDRL"
RECORDING_DATES = "TXXX"
PUBLISHER_URL = "WPUB"
PUBLISHER = "TPUB"
RATING = "POPM"
DISCNUMBER = "TPOS"
MOVEMENT_COUNT = "MVIN"
TOTALDISCS = "TPOS"
ORIGINAL_RELEASE_DATE = "TDOR"
ORIGINAL_ARTIST = "TOPE"
ORIGINAL_ALBUM = "TOAL"
MEDIA_TYPE = "TMED"
LYRICIST = "TEXT"
WRITER = "TEXT"
ARTIST = "TPE1"
LANGUAGE = "TLAN" # https://en.wikipedia.org/wiki/ISO_639-2
ITUNESCOMPILATION = "TCMP"
REMIXED_BY = "TPE4"
RADIO_STATION_OWNER = "TRSO"
RADIO_STATION = "TRSN"
INITIAL_KEY = "TKEY"
OWNER = "TOWN"
ENCODED_BY = "TENC"
COPYRIGHT = "TCOP"
GENRE = "TCON"
GROUPING = "TIT1"
CONDUCTOR = "TPE3"
COMPOSERSORTORDER = "TSOC"
COMPOSER = "TCOM"
BPM = "TBPM"
ALBUM_ARTIST = "TPE2"
BAND = "TPE2"
ARTISTSORTORDER = "TSOP"
ALBUM = "TALB"
ALBUMSORTORDER = "TSOA"
ALBUMARTISTSORTORDER = "TSO2"
TAGGING_TIME = "TDTG"
SOURCE_WEBPAGE_URL = "WOAS"
FILE_WEBPAGE_URL = "WOAF"
INTERNET_RADIO_WEBPAGE_URL = "WORS"
ARTIST_WEBPAGE_URL = "WOAR"
COPYRIGHT_URL = "WCOP"
COMMERCIAL_INFORMATION_URL = "WCOM"
PAYMEMT_URL = "WPAY"
MOVEMENT_INDEX = "MVIN"
MOVEMENT_NAME = "MVNM"
UNSYNCED_LYRICS = "USLT"
COMMENT = "COMM"
@classmethod
def get_text_instance(cls, key: str, value: str):
return id3.Frames[key](encoding=3, text=value)
@classmethod
def get_url_instance(cls, key: str, url: str):
return id3.Frames[key](encoding=3, url=url)
@classmethod
def get_mutagen_instance(cls, attribute, value):
key = attribute.value
if key[0] == 'T':
# a text fiel
return cls.get_text_instance(key, value)
if key[0] == "W":
# an url field
return cls.get_url_instance(key, value)
class ID3Timestamp:
def __init__(
self,
year: int = None,
month: int = None,
day: int = None,
hour: int = None,
minute: int = None,
second: int = None
):
self.year = year
self.month = month
self.day = day
self.hour = hour
self.minute = minute
self.second = second
self.has_year = year is not None
self.has_month = month is not None
self.has_day = day is not None
self.has_hour = hour is not None
self.has_minute = minute is not None
self.has_second = second is not None
if not self.has_year:
year = 1
if not self.has_month:
month = 1
if not self.has_day:
day = 1
if not self.has_hour:
hour = 1
if not self.has_minute:
minute = 1
if not self.has_second:
second = 1
self.date_obj = datetime.datetime(
year=year,
month=month,
day=day,
hour=hour,
minute=minute,
second=second
)
def get_time_format(self) -> str:
"""
https://mutagen-specs.readthedocs.io/en/latest/id3/id3v2.4.0-structure.html
The timestamp fields are based on a subset of ISO 8601. When being as precise as possible the format of a
time string is
- yyyy-MM-ddTHH:mm:ss
- (year[%Y], “-”, month[%m], “-”, day[%d], “T”, hour (out of 24)[%H], ”:”, minutes[%M], ”:”, seconds[%S])
- %Y-%m-%dT%H:%M:%S
but the precision may be reduced by removing as many time indicators as wanted. Hence valid timestamps are
- yyyy
- yyyy-MM
- yyyy-MM-dd
- yyyy-MM-ddTHH
- yyyy-MM-ddTHH:mm
- yyyy-MM-ddTHH:mm:ss
All time stamps are UTC. For durations, use the slash character as described in 8601,
and for multiple non-contiguous dates, use multiple strings, if allowed by the frame definition.
:return timestamp: as timestamp in the format of the id3 time as above described
"""
if self.has_year and self.has_month and self.has_day and self.has_hour and self.has_minute and self.has_second:
return "%Y-%m-%dT%H:%M:%S"
if self.has_year and self.has_month and self.has_day and self.has_hour and self.has_minute:
return "%Y-%m-%dT%H:%M"
if self.has_year and self.has_month and self.has_day and self.has_hour:
return "%Y-%m-%dT%H"
if self.has_year and self.has_month and self.has_day:
return "%Y-%m-%d"
if self.has_year and self.has_month:
return "%Y-%m"
if self.has_year:
return "%Y"
return ""
def get_timestamp(self) -> str:
time_format = self.get_time_format()
return self.date_obj.strftime(time_format)
def get_timestamp_w_format(self) -> Tuple[str, str]:
time_format = self.get_time_format()
return time_format, self.date_obj.strftime(time_format)
@classmethod
def strptime(cls, time_stamp: str, format: str):
"""
day: "%d"
month: "%b", "%B", "%m"
year: "%y", "%Y"
hour: "%H", "%I"
minute: "%M"
second: "%S"
"""
date_obj = datetime.datetime.strptime(time_stamp, format)
day = None
if "%d" in format:
day = date_obj.day
month = None
if any([i in format for i in ("%b", "%B", "%m")]):
month = date_obj.month
year = None
if any([i in format for i in ("%y", "%Y")]):
year = date_obj.year
hour = None
if any([i in format for i in ("%H", "%I")]):
hour = date_obj.hour
minute = None
if "%M" in format:
minute = date_obj.minute
second = None
if "%S" in format:
second = date_obj.second
return cls(
year=year,
month=month,
day=day,
hour=hour,
minute=minute,
second=second
)
@classmethod
def now(cls):
date_obj = datetime.datetime.now()
return cls(
year=date_obj.year,
month=date_obj.month,
day=date_obj.day,
hour=date_obj.hour,
minute=date_obj.minute,
second=date_obj.second
)
def strftime(self, format: str) -> str:
return self.date_obj.strftime(format)
def __str__(self) -> str:
return self.timestamp
def __repr__(self) -> str:
return self.timestamp
timestamp: str = property(fget=get_timestamp)
class MetadataAttribute:
"""
This class shall be added to any object, which can return data for tagging
"""
class Metadata:
# it's a null byte for the later concatenation of text frames
NULL_BYTE: str = "\x00"
# this is pretty self-explanatory
# the key is an enum from Mapping
# the value is a list with each value
# the mutagen object for each frame will be generated dynamically
id3_dict: Dict[any, list]
def __init__(self, id3_dict: Dict[any, list] = None) -> None:
self.id3_dict = dict()
if id3_dict is not None:
self.add_metadata_dict(id3_dict)
def __setitem__(self, frame, value_list: list, override_existing: bool = True):
if type(value_list) != list:
raise ValueError(f"can only set attribute to list, not {type(value_list)}")
new_val = [i for i in value_list if i not in {None, ''}]
if len(new_val) == 0:
return
if override_existing:
self.id3_dict[frame] = new_val
else:
if frame not in self.id3_dict:
self.id3_dict[frame] = new_val
return
self.id3_dict[frame].extend(new_val)
def __getitem__(self, key):
if key not in self.id3_dict:
return None
return self.id3_dict[key]
def delete_field(self, key: str):
if key in self.id3_dict:
return self.id3_dict.pop(key)
def add_metadata_dict(self, metadata_dict: dict, override_existing: bool = True):
for field_enum, value in metadata_dict.items():
self.__setitem__(field_enum, value, override_existing=override_existing)
def merge(self, other, override_existing: bool = False):
"""
adds the values of another metadata obj to this one
other is a value of the type MetadataAttribute.Metadata
"""
self.add_metadata_dict(other.id3_dict, override_existing=override_existing)
def merge_many(self, many_other):
"""
adds the values of many other metadata objects to this one
"""
for other in many_other:
self.merge(other)
def get_id3_value(self, field):
if field not in self.id3_dict:
return None
list_data = self.id3_dict[field]
# convert for example the time objects to timestamps
for i, element in enumerate(list_data):
# for performances sake I don't do other checks if it is already the right type
if type(element) == str:
continue
if type(element) in {int}:
list_data[i] = str(element)
if type(element) == ID3Timestamp:
list_data[i] = element.timestamp
continue
"""
Version 2.4 of the specification prescribes that all text fields (the fields that start with a T, except for TXXX) can contain multiple values separated by a null character.
Thus if above conditions are met, I concatenate the list,
else I take the first element
"""
if field.value[0].upper() == "T" and field.value.upper() != "TXXX":
return self.NULL_BYTE.join(list_data)
return list_data[0]
def get_mutagen_object(self, field):
return Mapping.get_mutagen_instance(field, self.get_id3_value(field))
def __str__(self) -> str:
rows = []
for key, value in self.id3_dict.items():
rows.append(f"{key} - {str(value)}")
return "\n".join(rows)
def __iter__(self):
"""
returns a generator, you can iterate through,
to directly tagg a file with id3 container.
"""
# set the tagging timestamp to the current time
self.__setitem__(Mapping.TAGGING_TIME, [ID3Timestamp.now()])
for field in self.id3_dict:
yield self.get_mutagen_object(field)
def get_metadata(self) -> Metadata:
"""
this is intendet to be overwritten by the child class
"""
return MetadataAttribute.Metadata()
metadata = property(fget=lambda self: self.get_metadata())

View File

@@ -1,96 +0,0 @@
import uuid
from ...utils.shared import (
SONG_LOGGER as logger
)
class Reference:
def __init__(self, id_: str) -> None:
self.id = id_
def __str__(self):
return f"references to an object with the id: {self.id}"
def __eq__(self, __o: object) -> bool:
if type(__o) != type(self):
return False
return self.id == __o.id
class DatabaseObject:
empty: bool
def __init__(self, id_: str = None, dynamic: bool = False, empty: bool = False, **kwargs) -> None:
"""
empty means it is an placeholder.
it makes the object perform the same, it is just the same
"""
self.id_: str | None = id_
self.dynamic = dynamic
self.empty = empty
def get_id(self) -> str:
"""
returns the id if it is set, else
it returns a randomly generated UUID
https://docs.python.org/3/library/uuid.html
if the object is empty, it returns None
if the object is dynamic, it raises an error
"""
if self.empty:
return None
if self.dynamic:
raise ValueError("Dynamic objects have no idea, because they are not in the database")
if self.id_ is None:
self.id_ = str(uuid.uuid4())
logger.info(f"id for {self.__str__()} isn't set. Setting to {self.id_}")
return self.id_
def get_reference(self) -> Reference:
return Reference(self.id)
def get_options(self) -> list:
"""
makes only sense in
- artist
- song
- album
"""
return []
def get_option_string(self) -> str:
"""
makes only sense in
- artist
- song
- album
"""
return ""
id = property(fget=get_id)
reference = property(fget=get_reference)
options = property(fget=get_options)
options_str = property(fget=get_option_string)
class SongAttribute:
def __init__(self, song=None):
# the reference to the song the lyrics belong to
self.song = song
def add_song(self, song):
self.song = song
def get_ref_song_id(self):
if self.song is None:
return None
return self.song.reference.id
def set_ref_song_id(self, song_id):
self.song_ref = Reference(song_id)
song_ref_id = property(fget=get_ref_song_id, fset=set_ref_song_id)

View File

@@ -1,484 +0,0 @@
import os
from typing import List
import pycountry
import copy
from .metadata import (
Mapping as id3Mapping,
ID3Timestamp,
MetadataAttribute
)
from ...utils.shared import (
MUSIC_DIR,
DATABASE_LOGGER as logger
)
from .parents import (
DatabaseObject,
Reference,
SongAttribute
)
from .source import (
Source,
SourceTypes,
SourcePages,
SourceAttribute
)
from .formatted_text import FormattedText
from .collection import Collection
"""
All Objects dependent
"""
class Target(DatabaseObject, SongAttribute):
"""
create somehow like that
```python
# I know path is pointless, and I will change that (don't worry about backwards compatibility there)
Target(file="~/Music/genre/artist/album/song.mp3", path="~/Music/genre/artist/album")
```
"""
def __init__(self, id_: str = None, file: str = None, path: str = None) -> None:
DatabaseObject.__init__(self, id_=id_)
SongAttribute.__init__(self)
self._file = file
self._path = path
def set_file(self, _file: str):
self._file = _file
def get_file(self) -> str | None:
if self._file is None:
return None
return os.path.join(MUSIC_DIR, self._file)
def set_path(self, _path: str):
self._path = _path
def get_path(self) -> str | None:
if self._path is None:
return None
return os.path.join(MUSIC_DIR, self._path)
def get_exists_on_disc(self) -> bool:
"""
returns True when file can be found on disc
returns False when file can't be found on disc or no filepath is set
"""
if not self.is_set():
return False
return os.path.exists(self.file)
def is_set(self) -> bool:
return not (self._file is None or self._path is None)
file = property(fget=get_file, fset=set_file)
path = property(fget=get_path, fset=set_path)
exists_on_disc = property(fget=get_exists_on_disc)
class Lyrics(DatabaseObject, SongAttribute, SourceAttribute, MetadataAttribute):
def __init__(
self,
text: str,
language: str,
id_: str = None,
source_list: List[Source] = None
) -> None:
DatabaseObject.__init__(self, id_=id_)
SongAttribute.__init__(self)
self.text = text
self.language = language
if source_list is not None:
self.source_list = source_list
def get_metadata(self) -> MetadataAttribute.Metadata:
return super().get_metadata()
class Song(DatabaseObject, SourceAttribute, MetadataAttribute):
"""
Class representing a song object, with attributes id, mb_id, title, album_name, isrc, length,
tracksort, genre, source_list, target, lyrics_list, album, main_artist_list, and feature_artist_list.
Inherits from DatabaseObject, SourceAttribute, and MetadataAttribute classes.
"""
def __init__(
self,
id_: str = None,
mb_id: str = None,
title: str = None,
isrc: str = None,
length: int = None,
tracksort: int = None,
genre: str = None,
source_list: List[Source] = None,
target: Target = None,
lyrics_list: List[Lyrics] = None,
album=None,
main_artist_list: list = None,
feature_artist_list: list = None,
**kwargs
) -> None:
"""
Initializes the Song object with the following attributes:
"""
super().__init__(id_=id_, **kwargs)
# attributes
# *private* attributes
self.title: str = title
self.isrc: str = isrc
self.length: int = length
self.mb_id: str | None = mb_id
self.tracksort: int = tracksort or 0
self.genre: str = genre
self.source_list = source_list or []
self.target = target or Target()
self.lyrics_list = lyrics_list or []
# initialize with either a passed in album, or an empty one,
# so it can at least properly generate dynamic attributes
self._album = album or Album(empty=True)
self.album = album
self.main_artist_collection = Collection(
data=main_artist_list or [],
map_attributes=["title"],
element_type=Artist
)
self.feature_artist_collection = Collection(
data=feature_artist_list or [],
map_attributes=["title"],
element_type=Artist
)
def __eq__(self, other):
if type(other) != type(self):
return False
return self.id == other.id
def get_artist_credits(self) -> str:
main_artists = ", ".join([artist.name for artist in self.main_artist_collection])
feature_artists = ", ".join([artist.name for artist in self.feature_artist_collection])
if len(feature_artists) == 0:
return main_artists
return f"{main_artists} feat. {feature_artists}"
def __str__(self) -> str:
artist_credit_str = ""
artist_credits = self.get_artist_credits()
if artist_credits != "":
artist_credit_str = f" by {artist_credits}"
return f"\"{self.title}\"{artist_credit_str}"
def __repr__(self) -> str:
return f"Song(\"{self.title}\")"
def get_tracksort_str(self):
"""
if the album tracklist is empty, it sets it length to 1, this song has to be in the Album
:returns id3_tracksort: {song_position}/{album.length_of_tracklist}
"""
return f"{self.tracksort}/{len(self.album.tracklist) or 1}"
def get_metadata(self) -> MetadataAttribute.Metadata:
metadata = MetadataAttribute.Metadata({
id3Mapping.TITLE: [self.title],
id3Mapping.ISRC: [self.isrc],
id3Mapping.LENGTH: [self.length],
id3Mapping.GENRE: [self.genre],
id3Mapping.TRACKNUMBER: [self.tracksort_str]
})
metadata.merge_many([s.get_song_metadata() for s in self.source_list])
if not self.album.empty:
metadata.merge(self.album.metadata)
metadata.merge_many([a.metadata for a in self.main_artist_list])
metadata.merge_many([a.metadata for a in self.feature_artist_list])
metadata.merge_many([l.metadata for l in self.lyrics])
return metadata
def get_options(self) -> list:
"""
Return a list of related objects including the song object, album object, main artist objects, and feature artist objects.
:return: a list of objects that are related to the Song object
"""
options = self.main_artist_list.copy()
options.extend(self.feature_artist_list.copy())
if not self.album.empty:
options.append(self.album)
options.append(self)
return options
def get_option_string(self) -> str:
return f"Song({self.title}) of Album({self.album.title}) from Artists({self.get_artist_credits()})"
tracksort_str = property(fget=get_tracksort_str)
main_artist_list: list = property(fget=lambda self: self.main_artist_collection.copy())
feature_artist_list: list = property(fget=lambda self: self.feature_artist_collection.copy())
"""
All objects dependent on Album
"""
class Album(DatabaseObject, SourceAttribute, MetadataAttribute):
def __init__(
self,
id_: str = None,
title: str = None,
label: str = None,
album_status: str = None,
language: pycountry.Languages = None,
date: ID3Timestamp = None,
country: str = None,
barcode: str = None,
is_split: bool = False,
albumsort: int = None,
dynamic: bool = False,
source_list: List[Source] = None,
artist_list: list = None,
tracklist: List[Song] = None,
album_type: str = None,
**kwargs
) -> None:
DatabaseObject.__init__(self, id_=id_, dynamic=dynamic, **kwargs)
"""
TODO
add to db
"""
self.album_type = album_type
self.title: str = title
self.album_status: str = album_status
self.label = label
self.language: pycountry.Languages = language
self.date: ID3Timestamp = date or ID3Timestamp()
self.country: str = country
"""
TODO
find out the id3 tag for barcode and implement it
maybee look at how mutagen does it with easy_id3
"""
self.barcode: str = barcode
self.is_split: bool = is_split
"""
TODO
implement a function in the Artist class,
to set albumsort with help of the release year
"""
self.albumsort: int | None = albumsort
self._tracklist = Collection(
data=tracklist or [],
map_attributes=["title"],
element_type=Song
)
self.source_list = source_list or []
self.artists = Collection(
data=artist_list or [],
map_attributes=["name"],
element_type=Artist
)
def __str__(self) -> str:
return f"-----{self.title}-----\n{self.tracklist}"
def __repr__(self):
return f"Album(\"{self.title}\")"
def __len__(self) -> int:
return len(self.tracklist)
def set_tracklist(self, tracklist):
tracklist_list = []
if type(tracklist) == Collection:
tracklist_list = tracklist_list
elif type(tracklist) == list:
tracklist_list = tracklist
self._tracklist = Collection(
data=tracklist_list,
map_attributes=["title"],
element_type=Song
)
def get_metadata(self) -> MetadataAttribute.Metadata:
return MetadataAttribute.Metadata({
id3Mapping.ALBUM: [self.title],
id3Mapping.COPYRIGHT: [self.copyright],
id3Mapping.LANGUAGE: [self.iso_639_2_language],
id3Mapping.ALBUM_ARTIST: [a.name for a in self.artists],
id3Mapping.DATE: [self.date.timestamp]
})
def get_copyright(self) -> str:
if self.date is None:
return None
if self.date.year == 1 or self.label is None:
return None
return f"{self.date.year} {self.label}"
def get_iso_639_2_lang(self) -> str:
if self.language is None:
return None
return self.language.alpha_3
def get_options(self) -> list:
options = self.artists.copy()
options.append(self)
for track in self.tracklist:
new_track: Song = copy.copy(track)
new_track.album = self
options.append(new_track)
return options
def get_option_string(self) -> str:
return f"Album: {self.title}; Artists {', '.join([i.name for i in self.artists])}"
copyright = property(fget=get_copyright)
iso_639_2_language = property(fget=get_iso_639_2_lang)
tracklist: Collection = property(fget=lambda self: self._tracklist, fset=set_tracklist)
"""
All objects dependent on Artist
"""
class Artist(DatabaseObject, SourceAttribute, MetadataAttribute):
"""
main_songs
feature_song
albums
"""
def __init__(
self,
id_: str = None,
name: str = None,
source_list: List[Source] = None,
feature_songs: List[Song] = None,
main_albums: List[Album] = None,
notes: FormattedText = None,
lyrical_themes: List[str] = None,
general_genre: str = "",
country=None,
formed_in: ID3Timestamp = None
):
DatabaseObject.__init__(self, id_=id_)
"""
TODO implement album type and notes
"""
self.country: pycountry.Country = country
self.formed_in: ID3Timestamp = formed_in
"""
notes, generall genre, lyrics themes are attributes
which are meant to only use in outputs to describe the object
i mean do as you want but there aint no strict rule about em so good luck
"""
self.notes: FormattedText = notes or FormattedText()
self.lyrical_themes: List[str] = lyrical_themes or []
self.general_genre = general_genre
self.name: str = name
self.feature_songs = Collection(
data=feature_songs,
map_attributes=["title"],
element_type=Song
)
self.main_albums = Collection(
data=main_albums,
map_attributes=["title"],
element_type=Album
)
if source_list is not None:
self.source_list = source_list
def __str__(self):
string = self.name or ""
plaintext_notes = self.notes.get_plaintext()
if plaintext_notes is not None:
string += "\n" + plaintext_notes
return string
def __repr__(self):
return self.__str__()
def __eq__(self, __o: object) -> bool:
return self.id_ == __o.id_
def get_features(self) -> Album:
feature_release = Album(
title="features",
album_status="dynamic",
is_split=True,
albumsort=666,
dynamic=True
)
for feature in self.feature_songs:
feature_release.add_song(feature)
return feature_release
def get_all_songs(self) -> List[Song]:
"""
returns a list of all Songs.
probaply not that usefull, because it is unsorted
"""
collection = []
for album in self.discography:
collection.extend(album)
return collection
def get_discography(self) -> List[Album]:
flat_copy_discography = self.main_albums.copy()
flat_copy_discography.append(self.get_features())
return flat_copy_discography
def get_metadata(self) -> MetadataAttribute.Metadata:
metadata = MetadataAttribute.Metadata({
id3Mapping.ARTIST: [self.name]
})
metadata.merge_many([s.get_artist_metadata() for s in self.source_list])
return metadata
def get_options(self) -> list:
options = [self]
options.extend(self.main_albums)
options.extend(self.feature_songs)
return options
def get_option_string(self) -> str:
return f"Artist: {self.name}"
discography: List[Album] = property(fget=get_discography)
features: Album = property(fget=get_features)
all_songs: Album = property(fget=get_all_songs)

View File

@@ -1,194 +0,0 @@
from enum import Enum
from typing import List, Dict
from .metadata import Mapping, MetadataAttribute
from .parents import (
DatabaseObject,
SongAttribute,
)
class SourceTypes(Enum):
SONG = "song"
ALBUM = "album"
ARTIST = "artist"
LYRICS = "lyrics"
class SourcePages(Enum):
YOUTUBE = "youtube"
MUSIFY = "musify"
GENIUS = "genius"
MUSICBRAINZ = "musicbrainz"
ENCYCLOPAEDIA_METALLUM = "encyclopaedia metallum"
BANDCAMP = "bandcamp"
DEEZER = "deezer"
SPOTIFY = "spotify"
# This has nothing to do with audio, but bands can be here
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
TWITTER = "twitter" # I will use nitter though lol
@classmethod
def get_homepage(cls, attribute) -> str:
homepage_map = {
cls.YOUTUBE: "https://www.youtube.com/",
cls.MUSIFY: "https://musify.club/",
cls.MUSICBRAINZ: "https://musicbrainz.org/",
cls.ENCYCLOPAEDIA_METALLUM: "https://www.metal-archives.com/",
cls.GENIUS: "https://genius.com/",
cls.BANDCAMP: "https://bandcamp.com/",
cls.DEEZER: "https://www.deezer.com/",
cls.INSTAGRAM: "https://www.instagram.com/",
cls.FACEBOOK: "https://www.facebook.com/",
cls.SPOTIFY: "https://open.spotify.com/",
cls.TWITTER: "https://twitter.com/"
}
return homepage_map[attribute]
class Source(DatabaseObject, SongAttribute, MetadataAttribute):
"""
create somehow like that
```python
# url won't be a valid one due to it being just an example
Source(src="youtube", url="https://youtu.be/dfnsdajlhkjhsd")
```
"""
def __init__(self, page_enum, url: str, id_: str = None, type_enum=None) -> None:
DatabaseObject.__init__(self, id_=id_)
SongAttribute.__init__(self)
self.type_enum = type_enum
self.page_enum = page_enum
self.url = url
@classmethod
def match_url(cls, url: str):
"""
this shouldn't be used, unlesse you are not certain what the source is for
the reason is that it is more inefficient
"""
if url.startswith("https://www.youtube"):
return cls(SourcePages.YOUTUBE, url)
if url.startswith("https://www.deezer"):
return cls(SourcePages.DEEZER, url)
if url.startswith("https://open.spotify.com"):
return cls(SourcePages.SPOTIFY, url)
if "bandcamp" in url:
return cls(SourcePages.BANDCAMP, url)
if url.startswith("https://www.metal-archives.com/"):
return cls(SourcePages.ENCYCLOPAEDIA_METALLUM, url)
# the less important once
if url.startswith("https://www.facebook"):
return cls(SourcePages.FACEBOOK, url)
if url.startswith("https://www.instagram"):
return cls(SourcePages.INSTAGRAM, url)
if url.startswith("https://twitter"):
return cls(SourcePages.TWITTER, url)
def get_song_metadata(self) -> MetadataAttribute.Metadata:
return MetadataAttribute.Metadata({
Mapping.FILE_WEBPAGE_URL: [self.url],
Mapping.SOURCE_WEBPAGE_URL: [self.homepage]
})
def get_artist_metadata(self) -> MetadataAttribute.Metadata:
return MetadataAttribute.Metadata({
Mapping.ARTIST_WEBPAGE_URL: [self.url]
})
def get_metadata(self) -> MetadataAttribute.Metadata:
if self.type_enum == SourceTypes.SONG:
return self.get_song_metadata()
if self.type_enum == SourceTypes.ARTIST:
return self.get_artist_metadata()
return super().get_metadata()
def __str__(self):
return self.__repr__()
def __repr__(self) -> str:
return f"Src({self.page_enum.value}: {self.url})"
page_str = property(fget=lambda self: self.page_enum.value)
type_str = property(fget=lambda self: self.type_enum.value)
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
class SourceAttribute:
"""
This is a class that is meant to be inherited from.
it adds the source_list attribute to a class
"""
_source_dict: Dict[object, List[Source]]
source_url_map: Dict[str, Source]
def __new__(cls, **kwargs):
new = object.__new__(cls)
new._source_dict = {page_enum: list() for page_enum in SourcePages}
new.source_url_map = dict()
return new
def match_source_with_url(self, url: str) -> bool:
"""
this function returns true, if a source with this url exists,
else it returns false
:param url:
:return source_with_url_exists:
"""
return url in self.source_url_map
def match_source(self, source: Source) -> bool:
return self.match_source_with_url(source.url)
def add_source(self, source: Source):
"""
adds a new Source to the sources
"""
if self.match_source(source):
return
self.source_url_map[source.url] = source
self._source_dict[source.page_enum].append(source)
def get_sources_from_page(self, page_enum) -> List[Source]:
"""
getting the sources for a specific page like
youtube or musify
"""
return self._source_dict[page_enum]
def get_source_list(self) -> List[Source]:
"""
gets all sources
"""
return [item for _, page_list in self._source_dict.items() for item in page_list]
def set_source_list(self, source_list: List[Source]):
self._source_dict = {page_enum: list() for page_enum in SourcePages}
for source in source_list:
self.add_source(source)
def get_source_dict(self) -> Dict[object, List[Source]]:
"""
gets a dictionary of all Sources,
where the key is a page enum,
and the value is a List with all sources of according page
"""
return self._source_dict
source_list: List[Source] = property(fget=get_source_list, fset=set_source_list)
source_dict: Dict[object, List[Source]] = property(fget=get_source_dict)

View File

@@ -0,0 +1,700 @@
import sqlite3
import os
import logging
from typing import List, Tuple
from pkg_resources import resource_string
import pycountry
from src.music_kraken.objects.parents import Reference
from src.music_kraken.objects.source import Source
from src.music_kraken.objects import (
Song,
Lyrics,
Target,
Artist,
Album,
ID3Timestamp,
SourceTypes,
SourcePages
)
"""
import peewee
db = peewee.SqliteDatabase('music.db')
class BaseModel(peewee.Model):
class Meta:
database = db
class Artist(BaseModel):
name = peewee.CharField()
class Song(BaseModel):
title = peewee.CharField()
artist = peewee.ManyToManyField(Artist, backref='songs')
db.connect()
db.create_tables([Artist, Song, Song.artist.get_through_model()], safe=True)
# Adding a song and its artists
beatles = Artist.create(name='The Beatles')
rolling_stones = Artist.create(name='The Rolling Stones')
song = Song.create(title='Hey Jude')
song.artist.add(beatles, rolling_stones)
# Querying songs by artist
songs = Song.select().join(Song.artist).where(Artist.name == 'The Beatles')
for song in songs:
print(song.title)
"""
logger = logging.getLogger("database")
# Due to this not being deployed on a Server **HOPEFULLY**
# I don't need to parameterize stuff like the where and
# use complicated query builder
SONG_QUERY = """
SELECT
Song.id AS song_id, Song.name AS title, Song.isrc AS isrc, Song.length AS length, Song.album_id as album_id, Song.tracksort,
Target.id AS target_id, Target.file AS file, Target.path AS path, Song.genre AS genre
FROM Song
LEFT JOIN Target ON Song.id=Target.song_id
WHERE {where};
"""
SOURCE_QUERY = """
SELECT id, type, src, url, song_id
FROM Source
WHERE {where};
"""
LYRICS_QUERY = """
SELECT id, text, language, song_id
FROM Lyrics
WHERE {where};
"""
ALBUM_QUERY_UNJOINED = """
SELECT Album.id AS album_id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split
FROM Album
WHERE {where};
"""
ALBUM_QUERY_JOINED = """
SELECT a.id AS album_id, a.title, a.label, a.album_status, a.language, a.date, a.date_format, a.country, a.barcode, a.albumsort, a.is_split
FROM Song
INNER JOIN Album a ON Song.album_id=a.id
WHERE {where};
"""
ARTIST_QUERY = """
SELECT id as artist_id, name as artist_name
FROM Artist
WHERE {where};
"""
class Database:
def __init__(self, database_file: str):
self.database_file: str = database_file
self.connection, self.cursor = self.reset_cursor()
self.cursor = self.connection.cursor()
def reset(self):
"""
Deletes all Data from the database if it exists
and resets the schema defined in self.structure_file
"""
logger.info(f"resetting the database")
# deleting the database
del self.connection
del self.cursor
os.remove(self.database_file)
# newly creating the database
self.reset_cursor()
query = resource_string("music_kraken", "static_files/new_db.sql").decode('utf-8')
# fill the database with the schematic
self.cursor.executescript(query)
self.connection.commit()
def reset_cursor(self) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
self.connection = sqlite3.connect(self.database_file)
# This is necessary that fetching rows returns dicts instead of tuple
self.connection.row_factory = sqlite3.Row
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def push_one(self, db_object: Song | Lyrics | Target | Artist | Source | Album):
if db_object.dynamic:
return
if type(db_object) == Song:
return self.push_song(song=db_object, pushed=set())
"""
if type(db_object) == Lyrics:
return self.push_lyrics(lyrics=db_object)
if type(db_object) == Target:
return self.push_target(target=db_object)
"""
if type(db_object) == Artist:
return self.push_artist(artist=db_object, pushed=set())
"""
if type(db_object) == Source:
# needs to have the property type_enum or type_str set
return self.push_source(source=db_object)
"""
if type(db_object) == Album:
return self.push_album(album=db_object, pushed=set())
logger.warning(f"type {type(db_object)} isn't yet supported by the db")
def push(self, db_object_list: List[Song | Lyrics | Target | Artist | Source | Album]):
"""
This function is used to Write the data of any db_object to the database
It syncs a whole list of db_objects to the database and is meant
as the primary method to add to the database.
:param db_object_list:
"""
for db_object in db_object_list:
self.push_one(db_object)
def push_album(self, album: Album, pushed: set):
table = "Album"
query = f"INSERT OR REPLACE INTO {table} (id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
if album.id in pushed:
return
pushed.add(album.id)
date_format, date = album.date.get_timestamp_w_format()
values = (
album.id,
album.title,
album.label,
album.album_status,
album.iso_639_2_language,
date,
date_format,
album.country,
album.barcode,
album.albumsort,
album.is_split
)
self.cursor.execute(query, values)
self.connection.commit()
for song in album.tracklist:
self.push_song(song, pushed=pushed)
for artist in album.artists:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
self.push_artist(artist, pushed=pushed)
for source in album.source_list:
source.type_enum = SourceTypes.ALBUM
source.add_song(album)
self.push_source(source=source)
def push_song(self, song: Song, pushed: set):
if song.dynamic:
return
if song.id in pushed:
return
pushed.add(song.id)
# ADDING THE DATA FOR THE SONG OBJECT
"""
db_field - object attribute
-------------------------------
id - id
name - title
"""
table = "Song"
values = (
song.id,
song.title,
song.isrc,
song.length,
song.get_album_id(),
song.tracksort,
song.genre
)
query = f"INSERT OR REPLACE INTO {table} (id, name, isrc, length, album_id, tracksort, genre) VALUES (?, ?, ?, ?, ?, ?, ?);"
self.cursor.execute(query, values)
self.connection.commit()
# add sources
for source in song.source_list:
source.add_song(song)
source.type_enum = SourceTypes.SONG
self.push_source(source=source)
# add lyrics
for single_lyrics in song.lyrics:
single_lyrics.add_song(song)
self.push_lyrics(lyrics=single_lyrics)
# add target
song.target.add_song(song)
self.push_target(target=song.target)
for main_artist in song.main_artist_list:
self.push_artist_song(artist_ref=Reference(main_artist.id), song_ref=Reference(song.id), is_feature=False)
self.push_artist(artist=main_artist, pushed=pushed)
for feature_artist in song.feature_artist_list:
self.push_artist_song(artist_ref=Reference(feature_artist.id), song_ref=Reference(song.id), is_feature=True)
self.push_artist(artist=feature_artist, pushed=pushed)
if song.album is not None:
self.push_album(song.album, pushed=pushed)
def push_lyrics(self, lyrics: Lyrics):
if lyrics.dynamic:
return
table = "Lyrics"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, text, language) VALUES (?, ?, ?, ?);"
values = (
lyrics.id,
lyrics.song_ref_id,
lyrics.text,
lyrics.language
)
self.cursor.execute(query, values)
self.connection.commit()
def push_source(self, source: Source):
if source.dynamic:
return
table = "Source"
query = f"INSERT OR REPLACE INTO {table} (id, type, song_id, src, url) VALUES (?, ?, ?, ?, ?);"
values = (
source.id,
source.type_str,
source.song_ref_id,
source.page_str,
source.url
)
self.cursor.execute(query, values)
self.connection.commit()
def push_target(self, target: Target):
if target.dynamic:
return
table = "Target"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, file, path) VALUES (?, ?, ?, ?);"
values = (
target.id,
target.song_ref_id,
target.file,
target.path
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_song(self, artist_ref: Reference, song_ref: Reference, is_feature: bool):
table = "SongArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE song_id=\"{song_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (song_id, artist_id, is_feature) VALUES (?, ?, ?);"
values = (
song_ref.id,
artist_ref.id,
is_feature
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_album(self, artist_ref: Reference, album_ref: Reference):
table = "AlbumArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE album_id=\"{album_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (album_id, artist_id) VALUES (?, ?);"
values = (
album_ref.id,
artist_ref.id
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist(self, artist: Artist, pushed: set):
if artist.dynamic:
return
if artist.id in pushed:
return
pushed.add(artist.id)
table = "Artist"
query = f"INSERT OR REPLACE INTO {table} (id, name) VALUES (?, ?);"
values = (
artist.id,
artist.name
)
self.cursor.execute(query, values)
self.connection.commit()
for song in artist.feature_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=True)
self.push_song(song=song, pushed=pushed)
for song in artist.main_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=False)
self.push_song(song=song, pushed=pushed)
for album in artist.main_albums:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
for source in artist.source_list:
source.type_enum = SourceTypes.ARTIST
source.add_song(artist)
self.push_source(source)
def pull_lyrics(self, song_ref: Reference = None, lyrics_ref: Reference = None) -> List[Lyrics]:
"""
Gets a list of sources. if lyrics_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor lyrics_ref are passed in it will return ALL lyrics**
:param song_ref:
:param lyrics_ref:
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif lyrics_ref is not None:
where = f"id=\"{lyrics_ref.id}\""
query = LYRICS_QUERY.format(where=where)
self.cursor.execute(query)
lyrics_rows = self.cursor.fetchall()
return [Lyrics(
id_=lyrics_row['id'],
text=lyrics_row['text'],
language=lyrics_row['language']
) for lyrics_row in lyrics_rows]
def pull_sources(self, artist_ref: Reference = None, song_ref: Reference = None, source_ref: Reference = None, album_ref: Reference = None) -> List[Source]:
"""
Gets a list of sources. if source_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor source_ref are passed in it will return ALL sources**
:param artist_ref:
:param song_ref:
:param source_ref:
:param type_str: the thing the source belongs to like eg. "song" or "album"
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif source_ref is not None:
where = f"id=\"{source_ref.id}\" AND type=\"{SourceTypes.SONG.value}\""
elif artist_ref is not None:
where = f"song_id=\"{artist_ref.id}\" AND type=\"{SourceTypes.ARTIST.value}\""
elif album_ref is not None:
where = f"song_id=\"{album_ref.id}\" AND type=\"{SourceTypes.ALBUM.value}\""
query = SOURCE_QUERY.format(where=where)
self.cursor.execute(query)
source_rows = self.cursor.fetchall()
return [
Source(
page_enum=SourcePages(source_row['src']),
type_enum=SourceTypes(source_row['type']),
url=source_row['url'],
id_=source_row['id']
) for source_row in source_rows
]
def pull_artist_song(self, song_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "SongArtist"
wheres = []
if song_ref is not None:
wheres.append(f"song_id=\"{song_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["song_id"]),
Reference(join["artist_id"]),
bool(join["is_feature"])
) for join in joins]
def pull_artist_album(self, album_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "AlbumArtist"
wheres = []
if album_ref is not None:
wheres.append(f"album_id=\"{album_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["album_id"]),
Reference(join["artist_id"])
) for join in joins]
def get_artist_from_row(self, artist_row, exclude_relations: set = None, flat: bool = False) -> Artist:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Artist)
artist_id = artist_row['artist_id']
artist_obj = Artist(
id_=artist_id,
name=artist_row['artist_name'],
source_list=self.pull_sources(artist_ref=Reference(id_=artist_id))
)
if flat:
return artist_obj
# fetch songs :D
for song_ref, _, is_feature in self.pull_artist_song(artist_ref=Reference(id_=artist_id)):
new_songs = self.pull_songs(song_ref=song_ref, exclude_relations=new_exclude_relations)
if len(new_songs) < 1:
continue
new_song = new_songs[0]
if is_feature:
artist_obj.feature_songs.append(new_song)
else:
artist_obj.main_songs.append(new_song)
# fetch albums
for album_ref, _ in self.pull_artist_album(artist_ref=Reference(id_=artist_id)):
new_albums = self.pull_albums(album_ref=album_ref, exclude_relations=new_exclude_relations)
if len(new_albums) < 1:
continue
artist_obj.main_albums.append(new_albums[0])
return artist_obj
def pull_artists(self, artist_ref: Reference = None, exclude_relations: set = None, flat: bool = False) -> List[Artist]:
"""
:param artist_ref:
:param exclude_relations:
:param flat: if it is true it ONLY fetches the artist data
:return:
"""
where = "1=1"
if artist_ref is not None:
where = f"Artist.id=\"{artist_ref.id}\""
query = ARTIST_QUERY.format(where=where)
self.cursor.execute(query)
artist_rows = self.cursor.fetchall()
return [(
self.get_artist_from_row(artist_row, exclude_relations=exclude_relations, flat=flat)
) for artist_row in artist_rows]
def get_song_from_row(self, song_result, exclude_relations: set = None) -> Song:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Song)
song_id = song_result['song_id']
# maybee fetch album
song_obj = Song(
id_=song_id,
title=song_result['title'],
isrc=song_result['isrc'],
length=song_result['length'],
tracksort=song_result['tracksort'],
genre=song_result['genre'],
target=Target(
id_=song_result['target_id'],
file=song_result['file'],
path=song_result['path']
),
source_list=self.pull_sources(song_ref=Reference(id_=song_id)),
lyrics=self.pull_lyrics(song_ref=Reference(id_=song_id)),
)
if Album not in exclude_relations and song_result['album_id'] is not None:
album_obj = self.pull_albums(album_ref=Reference(song_result['album_id']),
exclude_relations=new_exclude_relations)
if len(album_obj) > 0:
song_obj.album = album_obj[0]
flat_artist = Artist in exclude_relations
main_artists = []
feature_artists = []
for song_ref, artist_ref, is_feature in self.pull_artist_song(song_ref=Reference(song_id)):
if is_feature:
feature_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
else:
main_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
song_obj.main_artist_list = main_artists
song_obj.feature_artist_list = feature_artists
return song_obj
def pull_songs(self, song_ref: Reference = None, album_ref: Reference = None, exclude_relations: set = set()) -> \
List[Song]:
"""
This function is used to get one song (including its children like Sources etc)
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param song_ref:
:param album_ref:
:return requested_song:
"""
where = "1=1"
if song_ref is not None:
where = f"Song.id=\"{song_ref.id}\""
elif album_ref is not None:
where = f"Song.album_id=\"{album_ref.id}\""
query = SONG_QUERY.format(where=where)
self.cursor.execute(query)
song_rows = self.cursor.fetchall()
return [self.get_song_from_row(
song_result=song_result,
exclude_relations=exclude_relations
) for song_result in song_rows]
def get_album_from_row(self, album_result, exclude_relations=None) -> Album:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = exclude_relations.copy()
new_exclude_relations.add(Album)
album_id = album_result['album_id']
language = album_result['language']
if language is not None:
language = pycountry.languages.get(alpha_3=album_result['language'])
album_obj = Album(
id_=album_id,
title=album_result['title'],
label=album_result['label'],
album_status=album_result['album_status'],
language=language,
date=ID3Timestamp.strptime(album_result['date'], album_result['date_format']),
country=album_result['country'],
barcode=album_result['barcode'],
is_split=album_result['is_split'],
albumsort=album_result['albumsort'],
source_list=self.pull_sources(album_ref=Reference(id_=album_id))
)
if Song not in exclude_relations:
# getting the tracklist
tracklist: List[Song] = self.pull_songs(
album_ref=Reference(id_=album_id),
exclude_relations=new_exclude_relations
)
album_obj.set_tracklist(tracklist=tracklist)
flat_artist = Artist in exclude_relations
for _, artist_ref in self.pull_artist_album(album_ref=Reference(id_=album_id)):
artists = self.pull_artists(artist_ref, flat=flat_artist, exclude_relations=new_exclude_relations)
if len(artists) < 1:
continue
album_obj.artists.append(artists[0])
return album_obj
def pull_albums(self, album_ref: Reference = None, song_ref: Reference = None, exclude_relations: set = None) -> \
List[Album]:
"""
This function is used to get matching albums/releses
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param album_ref:
:return requested_album_list:
"""
if exclude_relations is None:
exclude_relations = set()
query = ALBUM_QUERY_UNJOINED
where = "1=1"
if album_ref is not None:
query = ALBUM_QUERY_UNJOINED
where = f"Album.id=\"{album_ref.id}\""
elif song_ref is not None:
query = ALBUM_QUERY_JOINED
where = f"Song.id=\"{song_ref.id}\""
query = query.format(where=where)
self.cursor.execute(query)
album_rows = self.cursor.fetchall()
return [self.get_album_from_row(
album_result=album_row,
exclude_relations=exclude_relations
) for album_row in album_rows]
if __name__ == "__main__":
cache = Database("")

View File

@@ -1,4 +1,4 @@
from .database import Database
from .old_database import Database
from ..utils.shared import (
TEMP_DATABASE_PATH,