From c37fa68937fce82e09ab8067a028503b17f2c4d0 Mon Sep 17 00:00:00 2001 From: lars Date: Sun, 6 Nov 2022 18:10:00 +0100 Subject: [PATCH] refactor --- src/main.py | 5 +- src/metadata/database.py | 320 ++++++++-------- src/metadata/database_structure.sql | 55 --- src/metadata/download.py | 546 +++++++++++++++------------- 4 files changed, 446 insertions(+), 480 deletions(-) delete mode 100644 src/metadata/database_structure.sql diff --git a/src/main.py b/src/main.py index edc800a..6cbdc20 100644 --- a/src/main.py +++ b/src/main.py @@ -17,7 +17,8 @@ NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea" MUSIC_DIR = os.path.expanduser('~/Music') TOR = False -logging.basicConfig(level=logging.INFO) +logger = logging.getLogger() +logger.level = logging.DEBUG def get_existing_genre(): @@ -102,4 +103,4 @@ def cli(start_at: int = 0): if __name__ == "__main__": - cli(start_at=2) + cli(start_at=0) diff --git a/src/metadata/database.py b/src/metadata/database.py index f669999..3a0ce5e 100644 --- a/src/metadata/database.py +++ b/src/metadata/database.py @@ -4,140 +4,130 @@ import logging import json -def get_temp_dir(): - import tempfile +class Database: + def __init__(self, path_to_db: str, db_structure: str, logger: logging.Logger, reset_anyways: bool = False): + self.logger = logger + self.path_to_db = path_to_db - temp_folder = "music-downloader" - temp_dir = os.path.join(tempfile.gettempdir(), temp_folder) - if not os.path.exists(temp_dir): - os.mkdir(temp_dir) - return temp_dir + self.connection = sqlite3.connect(self.path_to_db) + self.cursor = self.connection.cursor() + # init database + self.init_db(database_structure=db_structure, reset_anyways=reset_anyways) -# DATABASE_STRUCTURE_FILE = "database_structure.sql" -DATABASE_STRUCTURE_FILE = "src/metadata/database_structure.sql" -TEMP_DIR = get_temp_dir() -DATABASE_FILE = "metadata.db" -db_path = os.path.join(TEMP_DIR, DATABASE_FILE) + def init_db(self, database_structure: str, reset_anyways: bool = False): + # check if db exists + exists = True + try: + query = 'SELECT * FROM track;' + self.cursor.execute(query) + _ = self.cursor.fetchall() + except sqlite3.OperationalError: + exists = False -connection = sqlite3.connect(db_path) -# connection.row_factory = sqlite3.Row -cursor = connection.cursor() + if not exists: + self.logger.info("Database does not exist yet.") + if reset_anyways or not exists: + # reset the database if reset_anyways is true or if an error has been thrown previously. + self.logger.info("Creating/Reseting Database.") -def init_db(cursor, connection, reset_anyways: bool = False): - # check if db exists - exists = True - try: - query = 'SELECT * FROM track;' - cursor.execute(query) - _ = cursor.fetchall() - except sqlite3.OperationalError: - exists = False + # read the file + with open(database_structure, "r") as database_structure_file: + query = database_structure_file.read() + self.cursor.executescript(query) + self.connection.commit() - if not exists: - logging.info("Database does not exist yet.") + def add_artist( + self, + musicbrainz_artistid: str, + artist: str = None + ): + query = "INSERT OR REPLACE INTO artist (id, name) VALUES (?, ?);" + values = musicbrainz_artistid, artist - if reset_anyways or not exists: - # reset the database if reset_anyways is true or if an error has been thrown previously. - logging.info("Creating/Reseting Database.") + self.cursor.execute(query, values) + self.connection.commit() - # read the file - with open(DATABASE_STRUCTURE_FILE, "r") as database_structure_file: - query = database_structure_file.read() - cursor.executescript(query) - connection.commit() + def add_release_group( + self, + musicbrainz_releasegroupid: str, + artist_ids: list, + albumartist: str = None, + albumsort: int = None, + musicbrainz_albumtype: str = None, + compilation: str = None, + album_artist_id: str = None + ): + # add adjacency + adjacency_list = [] + for artist_id in artist_ids: + adjacency_list.append((artist_id, musicbrainz_releasegroupid)) + adjacency_values = tuple(adjacency_list) + adjacency_query = "INSERT OR REPLACE INTO artist_release_group (artist_id, release_group_id) VALUES (?, ?);" + self.cursor.executemany(adjacency_query, adjacency_values) + self.connection.commit() + # add release group + query = "INSERT OR REPLACE INTO release_group (id, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id) VALUES (?, ?, ?, ?, ?, ?);" + values = musicbrainz_releasegroupid, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id + self.cursor.execute(query, values) + self.connection.commit() -def add_artist( - musicbrainz_artistid: str, - artist: str = None -): - query = "INSERT OR REPLACE INTO artist (id, name) VALUES (?, ?);" - values = musicbrainz_artistid, artist + def add_release( + self, + musicbrainz_albumid: str, + release_group_id: str, + title: str = None, + copyright_: str = None, + album_status: str = None, + language: str = None, + year: str = None, + date: str = None, + country: str = None, + barcode: str = None + ): + query = "INSERT OR REPLACE INTO release_ (id, release_group_id, title, copyright, album_status, language, year, date, country, barcode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" + values = musicbrainz_albumid, release_group_id, title, copyright_, album_status, language, year, date, country, barcode - cursor.execute(query, values) - connection.commit() + self.cursor.execute(query, values) + self.connection.commit() + def add_track( + self, + musicbrainz_releasetrackid: str, + musicbrainz_albumid: str, + feature_aritsts: list, + track: str = None, + isrc: str = None + ): + # add adjacency + adjacency_list = [] + for artist_id in feature_aritsts: + adjacency_list.append((artist_id, musicbrainz_releasetrackid)) + adjacency_values = tuple(adjacency_list) + adjacency_query = "INSERT OR REPLACE INTO artist_track (artist_id, track_id) VALUES (?, ?);" + self.cursor.executemany(adjacency_query, adjacency_values) + self.connection.commit() -def add_release_group( - musicbrainz_releasegroupid: str, - artist_ids: list, - albumartist: str = None, - albumsort: int = None, - musicbrainz_albumtype: str = None, - compilation: str = None, - album_artist_id: str = None -): - # add adjacency - adjacency_list = [] - for artist_id in artist_ids: - adjacency_list.append((artist_id, musicbrainz_releasegroupid)) - adjacency_values = tuple(adjacency_list) - adjacency_query = "INSERT OR REPLACE INTO artist_release_group (artist_id, release_group_id) VALUES (?, ?);" - cursor.executemany(adjacency_query, adjacency_values) - connection.commit() + # add track + query = "INSERT OR REPLACE INTO track (id, release_id, track, isrc) VALUES (?, ?, ?, ?);" + values = musicbrainz_releasetrackid, musicbrainz_albumid, track, isrc + self.cursor.execute(query, values) + self.connection.commit() - # add release group - query = "INSERT OR REPLACE INTO release_group (id, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id) VALUES (?, ?, ?, ?, ?, ?);" - values = musicbrainz_releasegroupid, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id - cursor.execute(query, values) - connection.commit() + @staticmethod + def get_custom_track_query(custom_where: list) -> str: + where_args = [ + "track.release_id == release_.id", + "release_group.id == release_.release_group_id", + "artist_track.artist_id == artist.id", + "artist_track.track_id == track.id" + ] + where_args.extend(custom_where) - -def add_release( - musicbrainz_albumid: str, - release_group_id: str, - title: str = None, - copyright_: str = None, - album_status: str = None, - language: str = None, - year: str = None, - date: str = None, - country: str = None, - barcode: str = None -): - query = "INSERT OR REPLACE INTO release_ (id, release_group_id, title, copyright, album_status, language, year, date, country, barcode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" - values = musicbrainz_albumid, release_group_id, title, copyright_, album_status, language, year, date, country, barcode - - cursor.execute(query, values) - connection.commit() - - -def add_track( - musicbrainz_releasetrackid: str, - musicbrainz_albumid: str, - feature_aritsts: list, - track: str = None, - isrc: str = None -): - # add adjacency - adjacency_list = [] - for artist_id in feature_aritsts: - adjacency_list.append((artist_id, musicbrainz_releasetrackid)) - adjacency_values = tuple(adjacency_list) - adjacency_query = "INSERT OR REPLACE INTO artist_track (artist_id, track_id) VALUES (?, ?);" - cursor.executemany(adjacency_query, adjacency_values) - connection.commit() - - # add track - query = "INSERT OR REPLACE INTO track (id, release_id, track, isrc) VALUES (?, ?, ?, ?);" - values = musicbrainz_releasetrackid, musicbrainz_albumid, track, isrc - cursor.execute(query, values) - connection.commit() - - -def get_custom_track_querry(custom_where: list) -> str: - where_args = [ - "track.release_id == release_.id", - "release_group.id == release_.release_group_id", - "artist_track.artist_id == artist.id", - "artist_track.track_id == track.id" - ] - where_args.extend(custom_where) - - where_arg = " AND ".join(where_args) - query = f""" + where_arg = " AND ".join(where_args) + query = f""" SELECT DISTINCT json_object( 'artists', json_group_array( @@ -176,71 +166,75 @@ FROM track, release_, release_group,artist, artist_track WHERE {where_arg} GROUP BY track.id; - """ - return query + """ + return query + def get_custom_track(self, custom_where: list): + query = Database.get_custom_track_query(custom_where=custom_where) + return [json.loads(i[0]) for i in self.cursor.execute(query)] -def get_custom_track(custom_where: list): - query = get_custom_track_querry(custom_where=custom_where) - return [json.loads(i[0]) for i in cursor.execute(query)] + def get_track_metadata(self, musicbrainz_releasetrackid: str): + # this would be vulnerable if musicbrainz_releasetrackid would be user input + resulting_tracks = self.get_custom_track([f'track.id == "{musicbrainz_releasetrackid}"']) + if len(resulting_tracks) != 1: + return -1 + return resulting_tracks[0] -def get_track_metadata(musicbrainz_releasetrackid: str): - # this would be vulnerable if musicbrainz_releasetrackid would be user input - resulting_tracks = get_custom_track([f'track.id == "{musicbrainz_releasetrackid}"']) - if len(resulting_tracks) != 1: - return -1 + def get_tracks_to_download(self): + return self.get_custom_track(['track.downloaded == 0']) - return resulting_tracks[0] + def get_tracks_without_src(self): + return self.get_custom_track(["(track.url IS NULL OR track.src IS NULL)"]) + def get_tracks_without_isrc(self): + return self.get_custom_track(["track.isrc IS NULL"]) -def get_tracks_to_download(): - return get_custom_track(['track.downloaded == 0']) + def get_tracks_without_filepath(self): + return self.get_custom_track(["(track.file IS NULL OR track.path IS NULL OR track.genre IS NULL)"]) + def update_download_status(self, track_id: str): + pass -def get_tracks_without_src(): - return get_custom_track(["(track.url IS NULL OR track.src IS NULL)"]) - - -def get_tracks_without_isrc(): - return get_custom_track(["track.isrc IS NULL"]) - - -def get_tracks_without_filepath(): - return get_custom_track(["(track.file IS NULL OR track.path IS NULL OR track.genre IS NULL)"]) - - -def update_download_status(track_id: str): - pass - - -def set_download_data(track_id: str, url: str, src: str): - query = f""" + def set_download_data(self, track_id: str, url: str, src: str): + query = f""" UPDATE track SET url = ?, src = ? WHERE '{track_id}' == id; - """ - cursor.execute(query, (url, src)) - connection.commit() + """ + self.cursor.execute(query, (url, src)) + self.connection.commit() - -def set_filepath(track_id: str, file: str, path: str, genre: str): - query = f""" + def set_filepath(self, track_id: str, file: str, path: str, genre: str): + query = f""" UPDATE track SET file = ?, path = ?, genre = ? WHERE '{track_id}' == id; - """ - cursor.execute(query, (file, path, genre)) - connection.commit() + """ + self.cursor.execute(query, (file, path, genre)) + self.connection.commit() -init_db(cursor=cursor, connection=connection, reset_anyways=False) - if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + import tempfile - for track in get_tracks_without_isrc(): - print(track['track'], [artist['name'] for artist in track['artists']]) + temp_folder = "music-downloader" + temp_dir = os.path.join(tempfile.gettempdir(), temp_folder) + if not os.path.exists(temp_dir): + os.mkdir(temp_dir) + + temp_dir = get_temp_dir() + DATABASE_FILE = "metadata.db" + DATABASE_STRUCTURE_FILE = "database_structure.sql" + db_path = os.path.join(TEMP_DIR, DATABASE_FILE) + + logging.basicConfig() + + logger = logging.getLogger("database") + logger.setLevel(logging.DEBUG) + + database = Database(os.path.join(temp_dir, "metadata.db"), os.path.join(temp_dir, "database_structure.sql"), logger, + reset_anyways=True) diff --git a/src/metadata/database_structure.sql b/src/metadata/database_structure.sql deleted file mode 100644 index d4d0e9f..0000000 --- a/src/metadata/database_structure.sql +++ /dev/null @@ -1,55 +0,0 @@ -DROP TABLE IF EXISTS artist; -CREATE TABLE artist ( - id TEXT PRIMARY KEY NOT NULL, - name TEXT -); - -DROP TABLE IF EXISTS artist_release_group; -CREATE TABLE artist_release_group ( - artist_id TEXT NOT NULL, - release_group_id TEXT NOT NULL -); - -DROP TABLE IF EXISTS artist_track; -CREATE TABLE artist_track ( - artist_id TEXT NOT NULL, - track_id TEXT NOT NULL -); - -DROP TABLE IF EXISTS release_group; -CREATE TABLE release_group ( - id TEXT PRIMARY KEY NOT NULL, - albumartist TEXT, - albumsort INT, - musicbrainz_albumtype TEXT, - compilation TEXT, - album_artist_id TEXT -); - -DROP TABLE IF EXISTS release_; -CREATE TABLE release_ ( - id TEXT PRIMARY KEY NOT NULL, - release_group_id TEXT NOT NULL, - title TEXT, - copyright TEXT, - album_status TEXT, - language TEXT, - year TEXT, - date TEXT, - country TEXT, - barcode TEXT -); - -DROP TABLE IF EXISTS track; -CREATE TABLE track ( - id TEXT PRIMARY KEY NOT NULL, - downloaded BOOLEAN NOT NULL DEFAULT 0, - release_id TEXT NOT NULL, - track TEXT, - isrc TEXT, - genre TEXT, - path TEXT, - file TEXT, - url TEXT, - src TEXT -); diff --git a/src/metadata/download.py b/src/metadata/download.py index 24b1dde..330687f 100644 --- a/src/metadata/download.py +++ b/src/metadata/download.py @@ -1,10 +1,8 @@ from typing import List import musicbrainzngs -import pandas as pd import logging -from metadata.object_handeling import get_elem_from_obj, parse_music_brainz_date -from metadata import database +from object_handeling import get_elem_from_obj, parse_music_brainz_date # I don't know if it would be feesable to set up my own mb instance # https://github.com/metabrainz/musicbrainz-docker @@ -17,287 +15,315 @@ musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeI # IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES # https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data -class Artist: - def __init__( - self, - musicbrainz_artistid: str, - release_groups: List = [], - new_release_groups: bool = True - ): - """ - release_groups: list - """ - self.release_groups = release_groups - self.musicbrainz_artistid = musicbrainz_artistid +class MetadataDownloader: + def __init__(self, database, logger: logging.Logger): + self.database = database + self.logger = logger - result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"]) - artist_data = get_elem_from_obj(result, ['artist'], return_if_none={}) + class Artist: + def __init__( + self, + database, + logger, + musicbrainz_artistid: str, + release_groups: List = [], + new_release_groups: bool = True + ): + self.database = database + self.logger = logger + """ + release_groups: list + """ + self.release_groups = release_groups - self.artist = get_elem_from_obj(artist_data, ['name']) + self.musicbrainz_artistid = musicbrainz_artistid - self.save() + result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"]) + artist_data = get_elem_from_obj(result, ['artist'], return_if_none={}) - # STARTING TO FETCH' RELEASE GROUPS. IMPORTANT: DON'T WRITE ANYTHING BESIDES THAT HERE - if not new_release_groups: - return - # sort all release groups by date and add album sort to have them in chronological order. - release_groups = artist_data['release-group-list'] - for i, release_group in enumerate(release_groups): - release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date']) - release_groups.sort(key=lambda x: x['first-release-date']) + self.artist = get_elem_from_obj(artist_data, ['name']) - for i, release_group in enumerate(release_groups): - self.release_groups.append(ReleaseGroup( - musicbrainz_releasegroupid=release_group['id'], - artists=[self], - albumsort=i + 1 - )) + self.save() - def __str__(self): - newline = "\n" - return f"id: {self.musicbrainz_artistid}\nname: {self.artist}\n{newline.join([str(release_group) for release_group in self.release_groups])}" + # STARTING TO FETCH' RELEASE GROUPS. IMPORTANT: DON'T WRITE ANYTHING BESIDES THAT HERE + if not new_release_groups: + return + # sort all release groups by date and add album sort to have them in chronological order. + release_groups = artist_data['release-group-list'] + for i, release_group in enumerate(release_groups): + release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date']) + release_groups.sort(key=lambda x: x['first-release-date']) - def save(self): - logging.info(f"artist: {self}") - database.add_artist( - musicbrainz_artistid=self.musicbrainz_artistid, - artist=self.artist - ) + for i, release_group in enumerate(release_groups): + self.release_groups.append(MetadataDownloader.ReleaseGroup( + self.database, + self.logger, + musicbrainz_releasegroupid=release_group['id'], + artists=[self], + albumsort=i + 1 + )) + def __str__(self): + newline = "\n" + return f"id: {self.musicbrainz_artistid}\nname: {self.artist}\n{newline.join([str(release_group) for release_group in self.release_groups])}" -class ReleaseGroup: - def __init__( - self, - musicbrainz_releasegroupid: str, - artists: List[Artist] = [], - albumsort: int = None, - only_download_distinct_releases: bool = True - ): - """ - split_artists: list -> if len > 1: album_artist=VariousArtists - releases: list - """ + def save(self): + self.logger.info(f"artist: {self}") + self.database.add_artist( + musicbrainz_artistid=self.musicbrainz_artistid, + artist=self.artist + ) - self.musicbrainz_releasegroupid = musicbrainz_releasegroupid - self.artists = artists - self.releases = [] + class ReleaseGroup: + def __init__( + self, + database, + logger, + musicbrainz_releasegroupid: str, + artists = [], + albumsort: int = None, + only_download_distinct_releases: bool = True + ): + self.database = database + self.logger = logger + """ + split_artists: list -> if len > 1: album_artist=VariousArtists + releases: list + """ - result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid, - includes=["artist-credits", "releases"]) - release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={}) - artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={}) - release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={}) + self.musicbrainz_releasegroupid = musicbrainz_releasegroupid + self.artists = artists + self.releases = [] - for artist_data in artist_datas: - artist_id = get_elem_from_obj(artist_data, ['artist', 'id']) + result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid, + includes=["artist-credits", "releases"]) + release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={}) + artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={}) + release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={}) + + for artist_data in artist_datas: + artist_id = get_elem_from_obj(artist_data, ['artist', 'id']) + if artist_id is None: + continue + self.append_artist(artist_id) + self.albumartist = "Various Artists" if len(self.artists) > 1 else self.artists[0].artist + self.album_artist_id = None if self.albumartist == "Various Artists" else self.artists[ + 0].musicbrainz_artistid + + self.albumsort = albumsort + self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type']) + self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None + + self.save() + + if only_download_distinct_releases: + self.append_distinct_releases(release_datas) + else: + self.append_all_releases(release_datas) + + def __str__(self): + newline = "\n" + return f"{newline.join([str(release_group) for release_group in self.releases])}" + + def save(self): + self.logger.info(f"caching release_group {self}") + self.database.add_release_group( + musicbrainz_releasegroupid=self.musicbrainz_releasegroupid, + artist_ids=[artist.musicbrainz_artistid for artist in self.artists], + albumartist=self.albumartist, + albumsort=self.albumsort, + musicbrainz_albumtype=self.musicbrainz_albumtype, + compilation=self.compilation, + album_artist_id=self.album_artist_id + ) + + def append_artist(self, artist_id: str): + for existing_artist in self.artists: + if artist_id == existing_artist.musicbrainz_artistid: + return existing_artist + new_artist = Artist(artist_id, release_groups=[self], new_release_groups=False) + self.artists.append(new_artist) + return new_artist + + def append_release(self, release_data: dict): + musicbrainz_albumid = get_elem_from_obj(release_data, ['id']) + if musicbrainz_albumid is None: + return + self.releases.append(MetadataDownloader.Release(self.database, self.logger, musicbrainz_albumid, release_group=self)) + + def append_distinct_releases(self, release_datas: List[dict]): + titles = {} + + for release_data in release_datas: + title = get_elem_from_obj(release_data, ['title']) + if title is None: + continue + titles[title] = release_data + + for key in titles: + self.append_release(titles[key]) + + def append_all_releases(self, release_datas: List[dict]): + for release_data in release_datas: + self.append_release(release_data) + + class Release: + def __init__( + self, + database, + logger, + musicbrainz_albumid: str, + release_group = None + ): + self.database = database + self.logger = logger + """ + release_group: ReleaseGroup + tracks: list + """ + self.musicbrainz_albumid = musicbrainz_albumid + self.release_group = release_group + self.tracklist = [] + + result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid, includes=["recordings", "labels"]) + release_data = get_elem_from_obj(result, ['release'], return_if_none={}) + label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={}) + recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[]) + + self.title = get_elem_from_obj(release_data, ['title']) + self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name']) + + self.album_status = get_elem_from_obj(release_data, ['status']) + self.language = get_elem_from_obj(release_data, ['text-representation', 'language']) + self.year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0]) + self.date = get_elem_from_obj(release_data, ['date']) + self.country = get_elem_from_obj(release_data, ['country']) + self.barcode = get_elem_from_obj(release_data, ['barcode']) + + self.save() + self.append_recordings(recording_datas) + + def __str__(self): + return f"{self.title} ©{self.copyright} {self.album_status}" + + def save(self): + self.logger.info(f"caching release {self}") + self.database.add_release( + musicbrainz_albumid=self.musicbrainz_albumid, + release_group_id=self.release_group.musicbrainz_releasegroupid, + title=self.title, + copyright_=self.copyright, + album_status=self.album_status, + language=self.language, + year=self.year, + date=self.date, + country=self.country, + barcode=self.barcode + ) + + def append_recordings(self, recording_datas: dict): + for recording_data in recording_datas: + musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['recording', 'id']) + if musicbrainz_releasetrackid is None: + continue + + self.tracklist.append(MetadataDownloader.Track(self.database, self.logger, musicbrainz_releasetrackid, self)) + + class Track: + def __init__( + self, + database, + logger, + musicbrainz_releasetrackid: str, + release = None + ): + self.database = database + self.logger = logger + """ + release: Release + feature_artists: list + """ + + self.musicbrainz_releasetrackid = musicbrainz_releasetrackid + self.release = release + self.artists = [] + + result = musicbrainzngs.get_recording_by_id(self.musicbrainz_releasetrackid, + includes=["artists", "releases", "recording-rels", "isrcs", + "work-level-rels"]) + recording_data = result['recording'] + for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]): + self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id'])) + + self.isrc = get_elem_from_obj(recording_data, ['isrc-list', 0]) + self.title = recording_data['title'] + + self.save() + + def __str__(self): + return f"{self.title}: {self.isrc}" + + def save(self): + self.logger.info(f"caching track {self}") + + self.database.add_track( + musicbrainz_releasetrackid=self.musicbrainz_releasetrackid, + musicbrainz_albumid=self.release.musicbrainz_albumid, + feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists], + track=self.title, + isrc=self.isrc + ) + + def append_artist(self, artist_id: str): if artist_id is None: - continue - self.append_artist(artist_id) - self.albumartist = "Various Artists" if len(self.artists) > 1 else self.artists[0].artist - self.album_artist_id = None if self.albumartist == "Various Artists" else self.artists[0].musicbrainz_artistid + return - self.albumsort = albumsort - self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type']) - self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None + for existing_artist in self.artists: + if artist_id == existing_artist.musicbrainz_artistid: + return existing_artist + new_artist = MetadataDownloader.Artist(self.database, self.logger, artist_id, new_release_groups=False) + self.artists.append(new_artist) + return new_artist - self.save() + def download(self, option: dict): + type_ = option['type'] + mb_id = option['id'] - if only_download_distinct_releases: - self.append_distinct_releases(release_datas) - else: - self.append_all_releases(release_datas) - - def __str__(self): - newline = "\n" - return f"{newline.join([str(release_group) for release_group in self.releases])}" - - def save(self): - logging.info(f"caching release_group {self}") - database.add_release_group( - musicbrainz_releasegroupid=self.musicbrainz_releasegroupid, - artist_ids=[artist.musicbrainz_artistid for artist in self.artists], - albumartist=self.albumartist, - albumsort=self.albumsort, - musicbrainz_albumtype=self.musicbrainz_albumtype, - compilation=self.compilation, - album_artist_id=self.album_artist_id - ) - - def append_artist(self, artist_id: str) -> Artist: - for existing_artist in self.artists: - if artist_id == existing_artist.musicbrainz_artistid: - return existing_artist - new_artist = Artist(artist_id, release_groups=[self], new_release_groups=False) - self.artists.append(new_artist) - return new_artist - - def append_release(self, release_data: dict): - musicbrainz_albumid = get_elem_from_obj(release_data, ['id']) - if musicbrainz_albumid is None: - return - self.releases.append(Release(musicbrainz_albumid, release_group=self)) - - def append_distinct_releases(self, release_datas: List[dict]): - titles = {} - - for release_data in release_datas: - title = get_elem_from_obj(release_data, ['title']) - if title is None: - continue - titles[title] = release_data - - for key in titles: - self.append_release(titles[key]) - - def append_all_releases(self, release_datas: List[dict]): - for release_data in release_datas: - self.append_release(release_data) - - -class Release: - def __init__( - self, - musicbrainz_albumid: str, - release_group: ReleaseGroup = None - ): - """ - release_group: ReleaseGroup - tracks: list - """ - self.musicbrainz_albumid = musicbrainz_albumid - self.release_group = release_group - self.tracklist = [] - - result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid, includes=["recordings", "labels"]) - release_data = get_elem_from_obj(result, ['release'], return_if_none={}) - label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={}) - recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[]) - - self.title = get_elem_from_obj(release_data, ['title']) - self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name']) - - self.album_status = get_elem_from_obj(release_data, ['status']) - self.language = get_elem_from_obj(release_data, ['text-representation', 'language']) - self.year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0]) - self.date = get_elem_from_obj(release_data, ['date']) - self.country = get_elem_from_obj(release_data, ['country']) - self.barcode = get_elem_from_obj(release_data, ['barcode']) - - self.save() - self.append_recordings(recording_datas) - - def __str__(self): - return f"{self.title} ©{self.copyright} {self.album_status}" - - def save(self): - logging.info(f"caching release {self}") - database.add_release( - musicbrainz_albumid=self.musicbrainz_albumid, - release_group_id=self.release_group.musicbrainz_releasegroupid, - title=self.title, - copyright_=self.copyright, - album_status=self.album_status, - language=self.language, - year=self.year, - date=self.date, - country=self.country, - barcode=self.barcode - ) - - def append_recordings(self, recording_datas: dict): - for recording_data in recording_datas: - musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['recording', 'id']) - if musicbrainz_releasetrackid is None: - continue - - self.tracklist.append(Track(musicbrainz_releasetrackid, self)) - - -class Track: - def __init__( - self, - musicbrainz_releasetrackid: str, - release: Release = None - ): - """ - release: Release - feature_artists: list - """ - - self.musicbrainz_releasetrackid = musicbrainz_releasetrackid - self.release = release - self.artists = [] - - result = musicbrainzngs.get_recording_by_id(self.musicbrainz_releasetrackid, includes=["artists", "releases", "recording-rels", "isrcs", "work-level-rels"]) - recording_data = result['recording'] - for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]): - self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id'])) - - self.isrc = get_elem_from_obj(recording_data, ['isrc-list', 0]) - self.title = recording_data['title'] - - self.save() - - def __str__(self): - return f"{self.title}: {self.isrc}" - - def save(self): - logging.info(f"caching track {self}") - - database.add_track( - musicbrainz_releasetrackid=self.musicbrainz_releasetrackid, - musicbrainz_albumid=self.release.musicbrainz_albumid, - feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists], - track=self.title, - isrc=self.isrc - ) - - def append_artist(self, artist_id: str) -> Artist: - if artist_id is None: - return - - for existing_artist in self.artists: - if artist_id == existing_artist.musicbrainz_artistid: - return existing_artist - new_artist = Artist(artist_id, new_release_groups=False) - self.artists.append(new_artist) - return new_artist - - -def download(option: dict): - type_ = option['type'] - mb_id = option['id'] - - metadata_list = [] - if type_ == "artist": - artist = Artist(mb_id) - print(artist) - elif type_ == "release": - metadata_list = download_release(mb_id) - elif type_ == "track": - metadata_list = download_track(mb_id) - - print(metadata_list) - metadata_df = pd.DataFrame(metadata_list) - # metadata_df.to_csv(os.path.join(self.temp, file)) - return metadata_df + if type_ == "artist": + self.Artist(self.database, self.logger, mb_id) + elif type_ == "release_group": + self.ReleaseGroup(self.database, self.logger, mb_id) + elif type_ == "release": + self.Release(self.database, self.logger, mb_id) + elif type_ == "track": + self.Track(self.database, self.logger, mb_id) if __name__ == "__main__": - """ import tempfile import os - TEMP_FOLDER = "music-downloader" - TEMP_DIR = os.path.join(tempfile.gettempdir(), TEMP_FOLDER) - if not os.path.exists(TEMP_DIR): - os.mkdir(TEMP_DIR) - """ + temp_folder = "music-downloader" + temp_dir = os.path.join(tempfile.gettempdir(), temp_folder) + if not os.path.exists(temp_dir): + os.mkdir(temp_dir) - logger = logging.getLogger() - logger.setLevel(logging.INFO) + logging.basicConfig(level=logging.DEBUG) + db_logger = logging.getLogger("database") + db_logger.setLevel(logging.DEBUG) - download({'id': '5cfecbe4-f600-45e5-9038-ce820eedf3d1', 'type': 'artist'}) + import database + + database_ = database.Database(os.path.join(temp_dir, "metadata.db"), + os.path.join(temp_dir, "database_structure.sql"), db_logger, + reset_anyways=True) + + download_logger = logging.getLogger("metadata downloader") + download_logger.setLevel(logging.INFO) + + downloader = MetadataDownloader(database_, download_logger) + + downloader.download({'id': '5cfecbe4-f600-45e5-9038-ce820eedf3d1', 'type': 'artist'}) # download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'}) # download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'})