cleaned up

This commit is contained in:
Hellow 2023-03-24 18:09:53 +01:00
parent 267bf52847
commit 34465ce46e
8 changed files with 1 additions and 1191 deletions

View File

@ -38,38 +38,7 @@ logging.getLogger("musicbrainzngs").setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
def get_options_from_query(query: str) -> List[objects.DatabaseObject]:
options = []
for MetadataPage in pages.MetadataPages:
options.extend(MetadataPage.search_by_query(query=query))
return options
def get_options_from_option(option: objects.DatabaseObject) -> List[objects.DatabaseObject]:
for MetadataPage in pages.MetadataPages:
option = MetadataPage.fetch_details(option, flat=False)
return option.get_options()
def print_options(options: List[objects.DatabaseObject]):
print("\n".join([f"{str(j).zfill(2)}: {i.get_option_string()}" for j, i in enumerate(options)]))
def cli():
options = []
while True:
command: str = input(">> ").strip()
if command.isdigit():
option_index = int(command)
if option_index >= len(options):
print(f"option {option_index} doesn't exist")
continue
options = get_options_from_option(options[option_index])
else:
options = get_options_from_query(command)
print_options(options)
print("HelloWorld")

View File

@ -1,106 +0,0 @@
from collections import defaultdict
from typing import Dict, List, Optional
import weakref
from src.music_kraken.objects import DatabaseObject
"""
This is a cache for the objects, that et pulled out of the database.
This is necessary, to not have duplicate objects with the same id.
Using a cache that maps the ojects to their id has multiple benefits:
- if you modify the object at any point, all objects with the same id get modified *(copy by reference)*
- less ram usage
- to further decrease ram usage I only store weak refs and not a strong reference, for the gc to still work
"""
class ObjectCache:
"""
ObjectCache is a cache for the objects retrieved from a database.
It maps each object to its id and uses weak references to manage its memory usage.
Using a cache for these objects provides several benefits:
- Modifying an object updates all objects with the same id (due to copy by reference)
- Reduced memory usage
:attr object_to_id: Dictionary that maps MusicObjects to their id.
:attr weakref_map: Dictionary that uses weak references to MusicObjects as keys and their id as values.
:method exists: Check if a MusicObject already exists in the cache.
:method append: Add a MusicObject to the cache if it does not already exist.
:method extent: Add a list of MusicObjects to the cache.
:method remove: Remove a MusicObject from the cache by its id.
:method get: Retrieve a MusicObject from the cache by its id. """
object_to_id: Dict[str, DatabaseObject]
weakref_map: Dict[weakref.ref, str]
def __init__(self) -> None:
self.object_to_id = dict()
self.weakref_map = defaultdict()
def exists(self, music_object: DatabaseObject) -> bool:
"""
Check if a MusicObject with the same id already exists in the cache.
:param music_object: The MusicObject to check for.
:return: True if the MusicObject exists, False otherwise.
"""
if music_object.dynamic:
return True
return music_object.id in self.object_to_id
def on_death(self, weakref_: weakref.ref) -> None:
"""
Callback function that gets triggered when the reference count of a MusicObject drops to 0.
This function removes the MusicObject from the cache.
:param weakref_: The weak reference of the MusicObject that has been garbage collected.
"""
data_id = self.weakref_map.pop(weakref_)
self.object_to_id.pop(data_id)
def append(self, music_object: DatabaseObject) -> bool:
"""
Add a MusicObject to the cache.
:param music_object: The MusicObject to add to the cache.
:return: True if the MusicObject already exists in the cache, False otherwise.
"""
if self.exists(music_object):
return True
self.weakref_map[weakref.ref(music_object, self.on_death)] = music_object.id
self.object_to_id[music_object.id] = music_object
return False
def extent(self, music_object_list: List[DatabaseObject]):
"""
adjacent to the extent method of list, this appends n Object
"""
for music_object in music_object_list:
self.append(music_object)
def remove(self, _id: str):
"""
Remove a MusicObject from the cache.
:param _id: The id of the MusicObject to remove from the cache.
"""
data = self.object_to_id.get(_id)
if data:
self.weakref_map.pop(weakref.ref(data))
self.object_to_id.pop(_id)
def __getitem__(self, item) -> Optional[DatabaseObject]:
"""
this returns the data obj
:param item: the id of the music object
:return:
"""
return self.object_to_id.get(item)
def get(self, _id: str) -> Optional[DatabaseObject]:
return self.__getitem__(_id)

View File

@ -1,700 +0,0 @@
import sqlite3
import os
import logging
from typing import List, Tuple
from pkg_resources import resource_string
import pycountry
from src.music_kraken.objects.parents import Reference
from src.music_kraken.objects.source import Source
from src.music_kraken.objects import (
Song,
Lyrics,
Target,
Artist,
Album,
ID3Timestamp,
SourceTypes,
SourcePages
)
"""
import peewee
db = peewee.SqliteDatabase('music.db')
class BaseModel(peewee.Model):
class Meta:
database = db
class Artist(BaseModel):
name = peewee.CharField()
class Song(BaseModel):
title = peewee.CharField()
artist = peewee.ManyToManyField(Artist, backref='songs')
db.connect()
db.create_tables([Artist, Song, Song.artist.get_through_model()], safe=True)
# Adding a song and its artists
beatles = Artist.create(name='The Beatles')
rolling_stones = Artist.create(name='The Rolling Stones')
song = Song.create(title='Hey Jude')
song.artist.add(beatles, rolling_stones)
# Querying songs by artist
songs = Song.select().join(Song.artist).where(Artist.name == 'The Beatles')
for song in songs:
print(song.title)
"""
logger = logging.getLogger("database")
# Due to this not being deployed on a Server **HOPEFULLY**
# I don't need to parameterize stuff like the where and
# use complicated query builder
SONG_QUERY = """
SELECT
Song.id AS song_id, Song.name AS title, Song.isrc AS isrc, Song.length AS length, Song.album_id as album_id, Song.tracksort,
Target.id AS target_id, Target.file AS file, Target.path AS path, Song.genre AS genre
FROM Song
LEFT JOIN Target ON Song.id=Target.song_id
WHERE {where};
"""
SOURCE_QUERY = """
SELECT id, type, src, url, song_id
FROM Source
WHERE {where};
"""
LYRICS_QUERY = """
SELECT id, text, language, song_id
FROM Lyrics
WHERE {where};
"""
ALBUM_QUERY_UNJOINED = """
SELECT Album.id AS album_id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split
FROM Album
WHERE {where};
"""
ALBUM_QUERY_JOINED = """
SELECT a.id AS album_id, a.title, a.label, a.album_status, a.language, a.date, a.date_format, a.country, a.barcode, a.albumsort, a.is_split
FROM Song
INNER JOIN Album a ON Song.album_id=a.id
WHERE {where};
"""
ARTIST_QUERY = """
SELECT id as artist_id, name as artist_name
FROM Artist
WHERE {where};
"""
class Database:
def __init__(self, database_file: str):
self.database_file: str = database_file
self.connection, self.cursor = self.reset_cursor()
self.cursor = self.connection.cursor()
def reset(self):
"""
Deletes all Data from the database if it exists
and resets the schema defined in self.structure_file
"""
logger.info(f"resetting the database")
# deleting the database
del self.connection
del self.cursor
os.remove(self.database_file)
# newly creating the database
self.reset_cursor()
query = resource_string("music_kraken", "static_files/new_db.sql").decode('utf-8')
# fill the database with the schematic
self.cursor.executescript(query)
self.connection.commit()
def reset_cursor(self) -> Tuple[sqlite3.Connection, sqlite3.Cursor]:
self.connection = sqlite3.connect(self.database_file)
# This is necessary that fetching rows returns dicts instead of tuple
self.connection.row_factory = sqlite3.Row
self.cursor = self.connection.cursor()
return self.connection, self.cursor
def push_one(self, db_object: Song | Lyrics | Target | Artist | Source | Album):
if db_object.dynamic:
return
if type(db_object) == Song:
return self.push_song(song=db_object, pushed=set())
"""
if type(db_object) == Lyrics:
return self.push_lyrics(lyrics=db_object)
if type(db_object) == Target:
return self.push_target(target=db_object)
"""
if type(db_object) == Artist:
return self.push_artist(artist=db_object, pushed=set())
"""
if type(db_object) == Source:
# needs to have the property type_enum or type_str set
return self.push_source(source=db_object)
"""
if type(db_object) == Album:
return self.push_album(album=db_object, pushed=set())
logger.warning(f"type {type(db_object)} isn't yet supported by the db")
def push(self, db_object_list: List[Song | Lyrics | Target | Artist | Source | Album]):
"""
This function is used to Write the data of any db_object to the database
It syncs a whole list of db_objects to the database and is meant
as the primary method to add to the database.
:param db_object_list:
"""
for db_object in db_object_list:
self.push_one(db_object)
def push_album(self, album: Album, pushed: set):
table = "Album"
query = f"INSERT OR REPLACE INTO {table} (id, title, label, album_status, language, date, date_format, country, barcode, albumsort, is_split) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
if album.id in pushed:
return
pushed.add(album.id)
date_format, date = album.date.get_timestamp_w_format()
values = (
album.id,
album.title,
album.label,
album.album_status,
album.iso_639_2_language,
date,
date_format,
album.country,
album.barcode,
album.albumsort,
album.is_split
)
self.cursor.execute(query, values)
self.connection.commit()
for song in album.tracklist:
self.push_song(song, pushed=pushed)
for artist in album.artist_collection:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
self.push_artist(artist, pushed=pushed)
for source in album.source_list:
source.type_enum = SourceTypes.ALBUM
source.add_song(album)
self.push_source(source=source)
def push_song(self, song: Song, pushed: set):
if song.dynamic:
return
if song.id in pushed:
return
pushed.add(song.id)
# ADDING THE DATA FOR THE SONG OBJECT
"""
db_field - object attribute
-------------------------------
id - id
name - title
"""
table = "Song"
values = (
song.id,
song.title,
song.isrc,
song.length,
song.get_album_id(),
song.tracksort,
song.genre
)
query = f"INSERT OR REPLACE INTO {table} (id, name, isrc, length, album_id, tracksort, genre) VALUES (?, ?, ?, ?, ?, ?, ?);"
self.cursor.execute(query, values)
self.connection.commit()
# add sources
for source in song.source_list:
source.add_song(song)
source.type_enum = SourceTypes.SONG
self.push_source(source=source)
# add lyrics
for single_lyrics in song.lyrics:
single_lyrics.add_song(song)
self.push_lyrics(lyrics=single_lyrics)
# add target
song.target.add_song(song)
self.push_target(target=song.target)
for main_artist in song.main_artist_list:
self.push_artist_song(artist_ref=Reference(main_artist.id), song_ref=Reference(song.id), is_feature=False)
self.push_artist(artist=main_artist, pushed=pushed)
for feature_artist in song.feature_artist_list:
self.push_artist_song(artist_ref=Reference(feature_artist.id), song_ref=Reference(song.id), is_feature=True)
self.push_artist(artist=feature_artist, pushed=pushed)
if song.album is not None:
self.push_album(song.album, pushed=pushed)
def push_lyrics(self, lyrics: Lyrics):
if lyrics.dynamic:
return
table = "Lyrics"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, text, language) VALUES (?, ?, ?, ?);"
values = (
lyrics.id,
lyrics.song_ref_id,
lyrics.text,
lyrics.language
)
self.cursor.execute(query, values)
self.connection.commit()
def push_source(self, source: Source):
if source.dynamic:
return
table = "Source"
query = f"INSERT OR REPLACE INTO {table} (id, type, song_id, src, url) VALUES (?, ?, ?, ?, ?);"
values = (
source.id,
source.type_str,
source.song_ref_id,
source.page_str,
source.url
)
self.cursor.execute(query, values)
self.connection.commit()
def push_target(self, target: Target):
if target.dynamic:
return
table = "Target"
query = f"INSERT OR REPLACE INTO {table} (id, song_id, file, path) VALUES (?, ?, ?, ?);"
values = (
target.id,
target.song_ref_id,
target.file,
target.path
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_song(self, artist_ref: Reference, song_ref: Reference, is_feature: bool):
table = "SongArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE song_id=\"{song_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (song_id, artist_id, is_feature) VALUES (?, ?, ?);"
values = (
song_ref.id,
artist_ref.id,
is_feature
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist_album(self, artist_ref: Reference, album_ref: Reference):
table = "AlbumArtist"
# checking if already exists
query = f"SELECT * FROM {table} WHERE album_id=\"{album_ref.id}\" AND artist_id=\"{artist_ref.id}\""
self.cursor.execute(query)
if len(self.cursor.fetchall()) > 0:
# join already exists
return
query = f"INSERT OR REPLACE INTO {table} (album_id, artist_id) VALUES (?, ?);"
values = (
album_ref.id,
artist_ref.id
)
self.cursor.execute(query, values)
self.connection.commit()
def push_artist(self, artist: Artist, pushed: set):
if artist.dynamic:
return
if artist.id in pushed:
return
pushed.add(artist.id)
table = "Artist"
query = f"INSERT OR REPLACE INTO {table} (id, name) VALUES (?, ?);"
values = (
artist.id,
artist.name
)
self.cursor.execute(query, values)
self.connection.commit()
for song in artist.feature_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=True)
self.push_song(song=song, pushed=pushed)
for song in artist.main_songs:
self.push_artist_song(artist_ref=artist.reference, song_ref=song.reference, is_feature=False)
self.push_song(song=song, pushed=pushed)
for album in artist.main_albums:
self.push_artist_album(artist_ref=artist.reference, album_ref=album.reference)
for source in artist.source_list:
source.type_enum = SourceTypes.ARTIST
source.add_song(artist)
self.push_source(source)
def pull_lyrics(self, song_ref: Reference = None, lyrics_ref: Reference = None) -> List[Lyrics]:
"""
Gets a list of sources. if lyrics_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor lyrics_ref are passed in it will return ALL lyrics**
:param song_ref:
:param lyrics_ref:
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif lyrics_ref is not None:
where = f"id=\"{lyrics_ref.id}\""
query = LYRICS_QUERY.format(where=where)
self.cursor.execute(query)
lyrics_rows = self.cursor.fetchall()
return [Lyrics(
id_=lyrics_row['id'],
text=lyrics_row['text'],
language=lyrics_row['language']
) for lyrics_row in lyrics_rows]
def pull_sources(self, artist_ref: Reference = None, song_ref: Reference = None, source_ref: Reference = None, album_ref: Reference = None) -> List[Source]:
"""
Gets a list of sources. if source_ref is passed in the List will most likely only
contain one Element if everything goes accordingly.
**If neither song_ref nor source_ref are passed in it will return ALL sources**
:param artist_ref:
:param song_ref:
:param source_ref:
:param type_str: the thing the source belongs to like eg. "song" or "album"
:return:
"""
where = "1=1"
if song_ref is not None:
where = f"song_id=\"{song_ref.id}\""
elif source_ref is not None:
where = f"id=\"{source_ref.id}\" AND type=\"{SourceTypes.SONG.value}\""
elif artist_ref is not None:
where = f"song_id=\"{artist_ref.id}\" AND type=\"{SourceTypes.ARTIST.value}\""
elif album_ref is not None:
where = f"song_id=\"{album_ref.id}\" AND type=\"{SourceTypes.ALBUM.value}\""
query = SOURCE_QUERY.format(where=where)
self.cursor.execute(query)
source_rows = self.cursor.fetchall()
return [
Source(
page_enum=SourcePages(source_row['src']),
type_enum=SourceTypes(source_row['type']),
url=source_row['url'],
id_=source_row['id']
) for source_row in source_rows
]
def pull_artist_song(self, song_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "SongArtist"
wheres = []
if song_ref is not None:
wheres.append(f"song_id=\"{song_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["song_id"]),
Reference(join["artist_id"]),
bool(join["is_feature"])
) for join in joins]
def pull_artist_album(self, album_ref: Reference = None, artist_ref: Reference = None) -> List[tuple]:
table = "AlbumArtist"
wheres = []
if album_ref is not None:
wheres.append(f"album_id=\"{album_ref.id}\"")
if artist_ref is not None:
wheres.append(f"artist_id=\"{artist_ref.id}\"")
where_str = ""
if len(wheres) > 0:
where_str = "WHERE " + " AND ".join(wheres)
query = f"SELECT * FROM {table} {where_str};"
self.cursor.execute(query)
joins = self.cursor.fetchall()
return [(
Reference(join["album_id"]),
Reference(join["artist_id"])
) for join in joins]
def get_artist_from_row(self, artist_row, exclude_relations: set = None, flat: bool = False) -> Artist:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Artist)
artist_id = artist_row['artist_id']
artist_obj = Artist(
id_=artist_id,
name=artist_row['artist_name'],
source_list=self.pull_sources(artist_ref=Reference(id_=artist_id))
)
if flat:
return artist_obj
# fetch songs :D
for song_ref, _, is_feature in self.pull_artist_song(artist_ref=Reference(id_=artist_id)):
new_songs = self.pull_songs(song_ref=song_ref, exclude_relations=new_exclude_relations)
if len(new_songs) < 1:
continue
new_song = new_songs[0]
if is_feature:
artist_obj.feature_songs.append(new_song)
else:
artist_obj.main_songs.append(new_song)
# fetch albums
for album_ref, _ in self.pull_artist_album(artist_ref=Reference(id_=artist_id)):
new_albums = self.pull_albums(album_ref=album_ref, exclude_relations=new_exclude_relations)
if len(new_albums) < 1:
continue
artist_obj.main_albums.append(new_albums[0])
return artist_obj
def pull_artists(self, artist_ref: Reference = None, exclude_relations: set = None, flat: bool = False) -> List[Artist]:
"""
:param artist_ref:
:param exclude_relations:
:param flat: if it is true it ONLY fetches the artist data
:return:
"""
where = "1=1"
if artist_ref is not None:
where = f"Artist.id=\"{artist_ref.id}\""
query = ARTIST_QUERY.format(where=where)
self.cursor.execute(query)
artist_rows = self.cursor.fetchall()
return [(
self.get_artist_from_row(artist_row, exclude_relations=exclude_relations, flat=flat)
) for artist_row in artist_rows]
def get_song_from_row(self, song_result, exclude_relations: set = None) -> Song:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = set(exclude_relations)
new_exclude_relations.add(Song)
song_id = song_result['song_id']
# maybee fetch album
song_obj = Song(
id_=song_id,
title=song_result['title'],
isrc=song_result['isrc'],
length=song_result['length'],
tracksort=song_result['tracksort'],
genre=song_result['genre'],
target=Target(
id_=song_result['target_id'],
file=song_result['file'],
path=song_result['path']
),
source_list=self.pull_sources(song_ref=Reference(id_=song_id)),
lyrics=self.pull_lyrics(song_ref=Reference(id_=song_id)),
)
if Album not in exclude_relations and song_result['album_id'] is not None:
album_obj = self.pull_albums(album_ref=Reference(song_result['album_id']),
exclude_relations=new_exclude_relations)
if len(album_obj) > 0:
song_obj.album = album_obj[0]
flat_artist = Artist in exclude_relations
main_artists = []
feature_artists = []
for song_ref, artist_ref, is_feature in self.pull_artist_song(song_ref=Reference(song_id)):
if is_feature:
feature_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
else:
main_artists.extend(self.pull_artists(artist_ref=artist_ref, flat=flat_artist))
song_obj.main_artist_list = main_artists
song_obj.feature_artist_list = feature_artists
return song_obj
def pull_songs(self, song_ref: Reference = None, album_ref: Reference = None, exclude_relations: set = set()) -> \
List[Song]:
"""
This function is used to get one song (including its children like Sources etc)
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param song_ref:
:param album_ref:
:return requested_song:
"""
where = "1=1"
if song_ref is not None:
where = f"Song.id=\"{song_ref.id}\""
elif album_ref is not None:
where = f"Song.album_id=\"{album_ref.id}\""
query = SONG_QUERY.format(where=where)
self.cursor.execute(query)
song_rows = self.cursor.fetchall()
return [self.get_song_from_row(
song_result=song_result,
exclude_relations=exclude_relations
) for song_result in song_rows]
def get_album_from_row(self, album_result, exclude_relations=None) -> Album:
if exclude_relations is None:
exclude_relations = set()
new_exclude_relations: set = exclude_relations.copy()
new_exclude_relations.add(Album)
album_id = album_result['album_id']
language = album_result['language']
if language is not None:
language = pycountry.languages.get(alpha_3=album_result['language'])
album_obj = Album(
id_=album_id,
title=album_result['title'],
label=album_result['label'],
album_status=album_result['album_status'],
language=language,
date=ID3Timestamp.strptime(album_result['date'], album_result['date_format']),
country=album_result['country'],
barcode=album_result['barcode'],
is_split=album_result['is_split'],
albumsort=album_result['albumsort'],
source_list=self.pull_sources(album_ref=Reference(id_=album_id))
)
if Song not in exclude_relations:
# getting the tracklist
tracklist: List[Song] = self.pull_songs(
album_ref=Reference(id_=album_id),
exclude_relations=new_exclude_relations
)
album_obj.set_tracklist(tracklist=tracklist)
flat_artist = Artist in exclude_relations
for _, artist_ref in self.pull_artist_album(album_ref=Reference(id_=album_id)):
artists = self.pull_artists(artist_ref, flat=flat_artist, exclude_relations=new_exclude_relations)
if len(artists) < 1:
continue
album_obj.artist_collection.append(artists[0])
return album_obj
def pull_albums(self, album_ref: Reference = None, song_ref: Reference = None, exclude_relations: set = None) -> \
List[Album]:
"""
This function is used to get matching albums/releses
from one song id (a reference object)
:param exclude_relations:
By default all relations are pulled by this funktion. If the class object of for
example the Artists is in the set it won't get fetched.
This is done to prevent an infinite recursion.
:param album_ref:
:return requested_album_list:
"""
if exclude_relations is None:
exclude_relations = set()
query = ALBUM_QUERY_UNJOINED
where = "1=1"
if album_ref is not None:
query = ALBUM_QUERY_UNJOINED
where = f"Album.id=\"{album_ref.id}\""
elif song_ref is not None:
query = ALBUM_QUERY_JOINED
where = f"Song.id=\"{song_ref.id}\""
query = query.format(where=where)
self.cursor.execute(query)
album_rows = self.cursor.fetchall()
return [self.get_album_from_row(
album_result=album_row,
exclude_relations=exclude_relations
) for album_row in album_rows]
if __name__ == "__main__":
cache = Database("")

View File

@ -1,16 +0,0 @@
from .database import Database, DatabaseType
from ..utils.shared import (
TEMP_DATABASE_PATH,
DATABASE_LOGGER
)
logger = DATABASE_LOGGER
class TempDatabase(Database):
def __init__(self) -> None:
super().__init__(db_type=DatabaseType.SQLITE, db_name=TEMP_DATABASE_PATH)
# temp_database = TempDatabase()

View File

@ -1,335 +0,0 @@
from typing import Union, Optional, Dict, DefaultDict, Type, List
from collections import defaultdict
import json
import traceback
from peewee import (
SqliteDatabase,
MySQLDatabase,
PostgresqlDatabase,
Model
)
from .. import objects
from . import data_models
# just a Type for type hintung. You can't do anything with it.
Database = Union[SqliteDatabase, PostgresqlDatabase, MySQLDatabase]
class WritingSession:
"""
Context manager for a database session
Usage:
with Session(database) as session:
# Perform database operations using session object
Args:
database: An instance of a database connection from Peewee
Attributes:
database: An instance of a database connection from Peewee
"""
def __init__(self, database: Database) -> None:
"""
Initialize a database session
Args:
database: An instance of a database connection from Peewee
"""
self.database = database
self.added_song_ids: Dict[str] = dict()
self.added_album_ids: Dict[str] = dict()
self.added_artist_ids: Dict[str] = dict()
self.added_label_ids: Dict[str] = dict()
self.db_objects: DefaultDict[data_models.BaseModel, List[data_models.BaseModel]] = defaultdict(list)
def __enter__(self) -> Type['WritingSession']:
"""
Enter the context of the database session
Args:
database: An instance of a database connection from Peewee
Returns:
self: The instance of the session object
"""
# self.__init__(database=database)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit the context of the database session
Args:
exc_type: The type of the raised exception
exc_val: The value of the raised exception
exc_tb: The traceback of the raised exception
Returns:
bool: True if no exception was raised, False otherwise
"""
if exc_val is not None:
traceback.print_tb(exc_tb)
print(f"Exception of type {exc_type} occurred with message: {exc_val}")
self.commit(reset=False)
return exc_val is None
def add_source(self, source: objects.Source, connected_to: data_models.Source.ContentTypes) -> data_models.Source:
db_source = data_models.Source(
id=source.id,
page=source.page_str,
url=source.url,
content_object=connected_to
).use(self.database)
self.db_objects[data_models.Source].append(db_source)
return db_source
def add_lyrics(self, lyrics: objects.Lyrics, song: data_models.Song) -> data_models.Lyrics:
db_lyrics = data_models.Lyrics(
id=lyrics.id,
text=lyrics.text,
language=lyrics.language,
song=song
).use(self.database)
self.db_objects[data_models.Lyrics].append(db_lyrics)
for source in lyrics.source_list:
self.add_source(source=source, connected_to=db_lyrics)
return db_lyrics
def add_target(self, target: objects.Target, song: data_models.Song) -> data_models.Target:
db_target = data_models.Target(
id=target.id,
path=target.path,
file=target.file,
song=song
).use(self.database)
self.db_objects[data_models.Target].append(db_target)
return db_target
def add_song(self, song: objects.Song) -> Optional[data_models.Song]:
"""
Add a song object to the session
Args:
song: An instance of the Song object
"""
if song.dynamic:
return
if song.id in self.added_song_ids:
return self.added_song_ids[song.id]
db_song: data_models.Song = data_models.Song(
id=song.id,
title=song.title,
isrc=song.isrc,
length=song.length,
tracksort=song.tracksort,
genre=song.genre
).use(self.database)
self.db_objects[data_models.Song].append(db_song)
self.added_song_ids[song.id].append(db_song)
for source in song.source_list:
self.add_source(source=source, connected_to=db_song)
for target in [song.target]:
self.add_target(target, db_song)
for main_artist in song.main_artist_collection:
db_song_artist = data_models.SongArtist(
song=db_song,
artist=self.add_artist(main_artist),
is_feature=False
)
self.db_objects[data_models.SongArtist].append(db_song_artist)
for feature_artist in song.feature_artist_collection:
db_song_artist = data_models.SongArtist(
song=db_song,
artist=self.add_artist(feature_artist),
is_feature=True
)
self.db_objects[data_models.SongArtist].append(db_song_artist)
for album in [song.album]:
db_album_song = data_models.AlbumSong(
song=db_song,
album=self.add_album(album)
)
self.db_objects[data_models.AlbumSong] = db_album_song
return db_song
def add_album(self, album: objects.Album) -> Optional[data_models.Album]:
"""
Add an album object to the session
Args:
album: An instance of the Album object
"""
if album.dynamic:
return
if album.id in self.added_album_ids:
return self.added_album_ids[album.id]
db_album = data_models.Album(
id = album.id,
title = album.title,
album_status = album.album_status.value,
album_type = album.album_type.value,
language = album.iso_639_2_language,
date_string = album.date.timestamp,
date_format = album.date.timeformat,
barcode = album.barcode,
albumsort = album.albumsort
).use(self.database)
self.db_objects[data_models.Album].append(db_album)
self.added_album_ids.add(album.id)
for source in album.source_list:
self.add_source(source, db_album)
for song in album.tracklist:
db_song_album = data_models.AlbumSong(
id = album.id,
album = album,
song = self.add_song(song)
)
self.db_objects[data_models.AlbumSong].append(db_song_album)
for artist in album.artist_collection:
db_album_artist = data_models.ArtistAlbum(
album = album,
artist = self.add_artist(artist)
)
self.db_objects[data_models.Artist].append(db_album_artist)
for label in album.label_collection:
self.db_objects[data_models.LabelAlbum].append(
data_models.LabelAlbum(
label = self.add_label(label=label),
album = db_album
)
)
return db_album
def add_artist(self, artist: objects.Artist) -> Optional[data_models.Artist]:
"""
Add an artist object to the session
Args:
artist: An instance of the Artist object
"""
if artist.dynamic:
return
if artist.id in self.added_artist_ids:
return self.added_artist_ids[artist.id]
db_artist = data_models.Artist(
id = artist.id,
name = artist.name,
country = artist.country_string,
formed_in_date = artist.formed_in.timestamp,
formed_in_format = artist.formed_in.timestamp,
general_genre = artist.general_genre
)
self.db_objects[data_models.Artist].append(db_artist)
self.added_artist_ids[artist.id] = db_artist
for source in artist.source_list:
self.add_source(source, db_artist)
for album in artist.main_albums:
db_album_artist = data_models.ArtistAlbum(
artist = artist,
album = self.add_album(album)
)
self.db_objects[data_models.ArtistAlbum].append(db_album_artist)
for song in artist.feature_songs:
db_artist_song = data_models.SongArtist(
artist = artist,
song = self.add_song(song),
is_feature = True
)
self.db_objects[data_models.SongArtist].append(db_artist_song)
for label in artist.label_collection:
self.db_objects[data_models.LabelArtist].append(
data_models.LabelArtist(
artist = db_artist,
label = self.add_label(label=label)
)
)
return db_artist
def add_label(self, label: objects.Label) -> Optional[data_models.Label]:
if label.dynamic:
return
if label.id in self.added_label_ids:
return self.added_label_ids[label.id]
db_label = data_models.Label(
id = label.id,
name = label.name,
additional_arguments = json.dumps(label.additional_arguments)
)
self.db_objects[data_models.Label]
self.add_label[label.id] = db_label
for album in label.album_collection:
self.db_objects[data_models.LabelAlbum].append(
data_models.LabelAlbum(
album = self.add_album(album=album),
label = db_label
)
)
for artist in label.current_artist_collection:
self.db_objects[data_models.LabelArtist].append(
artist = self.add_artist(artist=artist),
label = db_label
)
return db_label
def commit(self, reset: bool = True):
"""
Commit changes to the database
"""
for model, model_instance_list in self.db_objects.items():
model.Use(self.database).insert_many(model_instance_list)
if reset:
self.__init__(self.database)
if __name__ == "__main__":
with WritingSession(SqliteDatabase(":memory:")) as session:
session.add_song(objects.Song(title="Hs"))

View File

@ -99,7 +99,6 @@ class DatabaseObject:
pass
class MainObject(DatabaseObject):
"""
This is the parent class for all "main" data objects:

View File

@ -7,7 +7,6 @@ from pathlib import Path
# Add the parent directory of the src package to the Python module search path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from music_kraken import objects
"""