started refactoring completly

This commit is contained in:
lars 2022-10-26 17:37:20 +02:00
parent 606c6c9dbe
commit 5cb2dff5c3
10 changed files with 738 additions and 479 deletions

View File

@ -99,7 +99,10 @@ For searching, as well as for downloading I use the programm `youtube-dl`, which
There are two bottlenecks with this approach though: There are two bottlenecks with this approach though:
1. `youtube-dl` is just slow. Actually it has to be, to not get blocked by youtube. 1. `youtube-dl` is just slow. Actually it has to be, to not get blocked by youtube.
2. Ofthen musicbrainz just doesn't give the isrc for some songs. 2. Often musicbrainz just doesn't give the isrc for some songs.
**TODO** **TODO**
- look at how the isrc id derived an try to generate it for the tracks without directly getting it from mb. - look at how the isrc id derived an try to generate it for the tracks without directly getting it from mb.
**Progress**
- There is a great site whith a huge isrc database [https://isrc.soundexchange.com/](https://isrc.soundexchange.com/).

View File

@ -50,13 +50,11 @@ def path_stuff(path: str, file_: str):
class Download: class Download:
def __init__(self, session: requests.Session = requests.Session(), file: str = ".cache3.csv", temp: str = "temp", def __init__(self, proxies: dict = None, file: str = ".cache3.csv", temp: str = "temp",
base_path: str = ""): base_path: str = ""):
self.session = session if proxies is not None:
self.session.headers = { musify.set_proxy(proxies)
"Connection": "keep-alive",
"Referer": "https://musify.club/"
}
self.temp = temp self.temp = temp
self.file = file self.file = file

View File

@ -9,8 +9,11 @@ import youtube_music
class Download: class Download:
def __init__(self, metadata_csv: str = ".cache1.csv", session: requests.Session = requests.Session(), def __init__(self, metadata_csv: str = ".cache1.csv", proxies: dict = None,
file: str = ".cache2.csv", temp: str = "temp") -> None: file: str = ".cache2.csv", temp: str = "temp") -> None:
if proxies is not None:
musify.set_proxy(proxies)
self.temp = temp self.temp = temp
self.metadata = pd.read_csv(os.path.join(self.temp, metadata_csv), index_col=0) self.metadata = pd.read_csv(os.path.join(self.temp, metadata_csv), index_col=0)

View File

@ -1,10 +1,9 @@
import metadata import metadata.metadata
import download_links import download_links
import url_to_path import url_to_path
import download import download
import logging import logging
import requests
import os import os
@ -15,7 +14,7 @@ STEP_THREE_CACHE = ".cache3.csv"
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea" NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.expanduser('~/Music') MUSIC_DIR = os.path.expanduser('~/Music')
TOR = False TOR = True
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@ -30,7 +29,7 @@ def get_existing_genre():
def search_for_metadata(query: str): def search_for_metadata(query: str):
search = metadata.Search(query=query, temp=TEMP) search = metadata.metadata.Search(query=query, temp=TEMP)
print(search.options) print(search.options)
while True: while True:
@ -71,9 +70,9 @@ def get_genre():
def cli(start_at: int = 0): def cli(start_at: int = 0):
session = requests.Session() proxies = None
if TOR: if TOR:
session.proxies = { proxies = {
'http': 'socks5h://127.0.0.1:9150', 'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150' 'https': 'socks5h://127.0.0.1:9150'
} }
@ -89,7 +88,7 @@ def cli(start_at: int = 0):
if start_at <= 1: if start_at <= 1:
logging.info("Fetching Download Links") logging.info("Fetching Download Links")
download_links.Download(file=STEP_TWO_CACHE, metadata_csv=STEP_ONE_CACHE, temp=TEMP, session=session) download_links.Download(file=STEP_TWO_CACHE, metadata_csv=STEP_ONE_CACHE, temp=TEMP, proxies=proxies)
if start_at <= 2: if start_at <= 2:
logging.info("creating Paths") logging.info("creating Paths")
@ -97,7 +96,7 @@ def cli(start_at: int = 0):
if start_at <= 3: if start_at <= 3:
logging.info("starting to download the mp3's") logging.info("starting to download the mp3's")
download.Download(session=session, file=STEP_THREE_CACHE, temp=TEMP, base_path=MUSIC_DIR) download.Download(proxies=proxies, file=STEP_THREE_CACHE, temp=TEMP, base_path=MUSIC_DIR)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,461 +0,0 @@
import os.path
import logging
import musicbrainzngs
import pandas as pd
mb_log = logging.getLogger("musicbrainzngs")
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
KNOWN_KIND_OF_OPTIONS = ["artist", "release", "track"]
def output(msg: str):
print(msg)
def get_elem_from_obj(current_object, keys: list, after_process=lambda x: x, return_if_none=None):
current_object = current_object
for key in keys:
if key in current_object or (type(key) == int and key < len(current_object)):
current_object = current_object[key]
else:
return return_if_none
return after_process(current_object)
class Search:
def __init__(self, query: str = None, artist: str = None, temp: str = "temp"):
if query is None and artist is None:
raise ValueError("no query provided")
self.options_history = []
self.current_options = None
self.current_chosen_option = None
self.temp = temp
# initial search
if query is not None:
self.set_options(self.Options([musicbrainzngs.search_artists(query), musicbrainzngs.search_releases(query),
musicbrainzngs.search_recordings(query)]))
elif artist is not None:
self.set_options(self.Options([musicbrainzngs.search_artists(artist=artist)]))
def download(self, file: str = ".cache1.csv"):
kind = self.current_chosen_option['kind']
mb_id = self.current_chosen_option['id']
metadata_list = []
if kind == "artist":
metadata_list = self.download_artist(mb_id)
elif kind == "release":
metadata_list = self.download_release(mb_id)
elif kind == "track":
metadata_list = self.download_track(mb_id)
metadata_df = pd.DataFrame(metadata_list)
metadata_df.to_csv(os.path.join(self.temp, file))
return metadata_df
def download_artist(self, mb_id):
"""
Available includes: recordings, releases, release-groups, works, various-artists, discids, media, isrcs,
aliases, annotation, area-rels, artist-rels, label-rels, place-rels, event-rels, recording-rels,
release-rels, release-group-rels, series-rels, url-rels, work-rels, instrument-rels, tags, user-tags,
ratings, user-ratings
"""
metadata_list = []
result = musicbrainzngs.get_artist_by_id(mb_id, includes=["releases"])
for i, release in enumerate(result["artist"]["release-list"]):
metadata_list.extend(self.download_release(release["id"], i))
return metadata_list
def download_release(self, mb_id, album_sort: int = None):
"""
Available includes: artists, labels, recordings, release-groups, media, artist-credits, discids, isrcs,
recording-level-rels, work-level-rels, annotation, aliases, tags, user-tags, area-rels, artist-rels,
label-rels, place-rels, event-rels, recording-rels, release-rels, release-group-rels, series-rels, url-rels,
work-rels, instrument-rels
"""
def get_additional_artist_info(mb_id_):
r = musicbrainzngs.get_artist_by_id(mb_id_, includes=["releases"])
album_sort = 0
for i, release in enumerate(r["artist"]["release-list"]):
id_ = release["id"]
if id_ == mb_id:
album_sort = i
break
return album_sort
result = musicbrainzngs.get_release_by_id(mb_id, includes=["artists", "recordings", 'release-groups'])
if album_sort is None:
album_sort = get_additional_artist_info(
get_elem_from_obj(result, ['release', 'artist-credit', 0, 'artist', 'id']))
release_type = get_elem_from_obj(result, ['release', 'release-group', 'type'])
tracklist_metadata = []
is_various_artist = len(result['release']['artist-credit']) > 1
tracklist = result['release']['medium-list'][0]['track-list']
track_count = len(tracklist)
this_track = 0
for track in tracklist:
track_id = track["recording"]["id"]
this_track = track["position"]
tracklist_metadata.extend(
self.download_track(track_id, is_various_artist=is_various_artist, track=this_track,
total_tracks=track_count, album_sort=album_sort, album_type=release_type,
release_data=result['release']))
return tracklist_metadata
def download_track(self, mb_id, is_various_artist: bool = None, track: int = None, total_tracks: int = None,
album_sort: int = None, album_type: str = None, release_data: dict = None):
"""
TODO
bpm its kind of possible via the AcousticBrainz API. however, the data may not be of very good
quality and AB is scheduled to go away in some time.
compilation Field that is used by iTunes to mark albums as compilation.
Either enter the value 1 or delete the field. https://en.wikipedia.org/wiki/Compilation_album
How should I get it? I don't fucking know. Now I do. Release Group Type is Compilation
composer, copyright, discsubtitle
'musicbrainz_discid',
'asin',
'performer',
'catalognumber',
'musicbrainz_releasetrackid',
'musicbrainz_releasegroupid',
'musicbrainz_workid',
'acoustid_fingerprint',
'acoustid_id'
DONE
album
title
artist
albumartist
tracknumber
!!!albumsort can sort albums cronological
titlesort is just set to the tracknumber to sort by track order to sort correctly
isrc
musicbrainz_artistid
musicbrainz_albumid
musicbrainz_albumartistid
musicbrainz_albumstatus
language
musicbrainz_albumtype
'releasecountry'
'barcode'
Album Art
"""
"""
Available includes: artists, releases, discids, media, artist-credits, isrcs, work-level-rels, annotation,
aliases, tags, user-tags, ratings, user-ratings, area-rels, artist-rels, label-rels, place-rels, event-rels,
recording-rels, release-rels, release-group-rels, series-rels, url-rels, work-rels, instrument-rels
"""
result = musicbrainzngs.get_recording_by_id(mb_id, includes=["artists", "releases", "recording-rels", "isrcs",
"work-level-rels"])
recording_data = result['recording']
isrc = get_elem_from_obj(recording_data, ['isrc-list', 0])
if release_data is None:
# choosing the last release, because it is the least likely one to be a single
release_data = recording_data['release-list'][-1]
mb_release_id = release_data['id']
title = recording_data['title']
artist = []
mb_artist_ids = []
for artist_ in recording_data['artist-credit']:
name_ = get_elem_from_obj(artist_, ['artist', 'name'])
if name_ is None:
continue
artist.append(name_)
mb_artist_ids.append(get_elem_from_obj(artist_, ['artist', 'id']))
def get_additional_artist_info(mb_id_):
r = musicbrainzngs.get_artist_by_id(mb_id_, includes=["releases"])
album_sort = 0
for i, release in enumerate(r["artist"]["release-list"]):
id_ = release["id"]
if id_ == mb_release_id:
album_sort = i
break
return album_sort
def get_additional_release_info(mb_id_):
r = musicbrainzngs.get_release_by_id(mb_id_,
includes=["artists", "recordings", "recording-rels", 'release-groups'])
is_various_artist_ = len(r['release']['artist-credit']) > 1
tracklist = r['release']['medium-list'][0]['track-list']
track_count_ = len(tracklist)
this_track_ = 0
for track in tracklist:
if track["recording"]["id"] == mb_id:
this_track_ = track["position"]
release_type = get_elem_from_obj(r, ['release', 'release-group', 'type'])
return is_various_artist_, this_track_, track_count_, release_type
album_id = get_elem_from_obj(release_data, ['id'])
album = get_elem_from_obj(release_data, ['title'])
album_status = get_elem_from_obj(release_data, ['status'])
language = get_elem_from_obj(release_data, ['text-representation', 'language'])
year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0])
date = get_elem_from_obj(release_data, ['date'])
if is_various_artist is None or track is None or total_tracks is None or album_type is None:
is_various_artist, track, total_tracks, album_type = get_additional_release_info(album_id)
if album_sort is None:
album_sort = get_additional_artist_info(mb_artist_ids[0])
album_artist = "Various Artists" if is_various_artist else artist[0]
album_artist_id = None if album_artist == "Various Artists" else mb_artist_ids[0]
compilation = "1" if album_type == "Compilation" else None
country = get_elem_from_obj(release_data, ['country'])
barcode = get_elem_from_obj(release_data, ['barcode'])
return [{
'id': mb_id,
'album': album,
'title': title,
'artist': artist,
'album_artist': album_artist,
'tracknumber': str(track),
'albumsort': album_sort,
'titlesort': track,
'isrc': isrc,
'date': date,
'year': year,
'musicbrainz_artistid': mb_artist_ids[0],
'musicbrainz_albumid': mb_release_id,
'musicbrainz_albumartistid': album_artist_id,
'musicbrainz_albumstatus': album_status,
'total_tracks': total_tracks,
'language': language,
'musicbrainz_albumtype': album_type,
'compilation': compilation,
'releasecountry': country,
'barcode': barcode
}]
def browse_artist(self, artist: dict, limit: int = 25):
options_sets = [
{"artist-list": [artist, ], "artist-count": 1},
musicbrainzngs.browse_releases(artist=artist["id"], limit=limit),
musicbrainzngs.browse_recordings(artist=artist["id"], limit=limit)
]
return self.set_options(self.Options(options_sets))
def browse_release(self, release: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(release=release["id"], limit=limit),
{"release-list": [release, ], "release-count": 1},
musicbrainzngs.browse_recordings(release=release["id"], limit=limit)
]
return self.set_options(self.Options(options_sets))
def browse_track(self, track: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(recording=track["id"], limit=limit),
musicbrainzngs.browse_releases(recording=track["id"], limit=limit),
{"recording-list": [track, ], "recording-count": 1}
]
return self.set_options(self.Options(options_sets))
def choose(self, index, limit: int = 25, ignore_limit_for_tracklist: bool = True):
if not self.current_options.choose(index):
return self.current_options
self.current_chosen_option = self.current_options.get_current_option()
kind = self.current_chosen_option['kind']
if kind == 'artist':
return self.browse_artist(self.current_chosen_option, limit=limit)
if kind == 'release':
release_limit = limit if not ignore_limit_for_tracklist else 100
return self.browse_release(self.current_chosen_option, limit=release_limit)
if kind == 'track':
track_limit = limit if not ignore_limit_for_tracklist else 100
return self.browse_track(self.current_chosen_option, limit=track_limit)
return self.current_options
def get_options(self):
return self.current_options
def set_options(self, option_instance):
self.options_history.append(option_instance)
self.current_options = option_instance
return option_instance
def get_previous_options(self):
self.options_history.pop(-1)
self.current_options = self.options_history[-1]
return self.current_options
options = property(fget=get_options)
class Options:
def __init__(self, results: list):
self.results = results
self.artist_count = 0
self.release_count = 0
self.track_count = 0
self.result_list = []
self.set_options_values()
self.current_option_ind = None
def get_current_option(self):
if self.current_option_ind is None:
raise Exception("It must first be chosen, which option to get, before getting it")
return self.result_list[self.current_option_ind]
def choose(self, index: int) -> bool:
if len(self.result_list) <= index - 1:
return False
self.current_option_ind = index
return True
def get_string_for_artist(self, artist: dict) -> str:
string = f"'{artist['name']}'"
if "country" in artist:
string += f" from {artist['country']}"
if 'disambiguation' in artist:
string += f", '{artist['disambiguation']}'"
return string + "\n"
def get_string_for_release(self, release: dict) -> str:
string = ""
if "type" in release:
string += f"the {release['type']} titled "
string += f"'{release['title']}'"
if "artist-credit-phrase" in release:
string += f" by: {release['artist-credit-phrase']}"
return string + "\n"
def get_string_for_tracks(self, tracks: dict) -> str:
# I know it's not the best practice but whatever
return self.get_string_for_release(tracks)
def get_string_for_option(self, option: dict) -> str:
kind = option['kind']
if kind == "artist":
return self.get_string_for_artist(option)
if kind == "release":
return self.get_string_for_release(option)
if kind == "track":
return self.get_string_for_tracks(option)
return "Error\n"
def __str__(self) -> str:
string = f"artists: {self.artist_count}; releases {self.release_count}; tracks {self.track_count}\n"
for i, option in enumerate(self.result_list):
string += f"{i})\t{option['kind']}:\t" + self.get_string_for_option(option)
return string
def set_options_values(self):
for option_set in self.results:
if "artist-list" in option_set:
self.set_artist_values(option_set)
continue
if "release-list" in option_set:
self.set_release_values(option_set)
continue
if "recording-list" in option_set:
self.set_track_values(option_set)
continue
def set_artist_values(self, option_set: dict):
self.artist_count += option_set['artist-count']
for artist in option_set['artist-list']:
artist['kind'] = "artist"
self.result_list.append(artist)
def set_release_values(self, option_set: dict):
self.release_count += option_set['release-count']
for release in option_set['release-list']:
release['kind'] = "release"
self.result_list.append(release)
def set_track_values(self, option_set: dict):
self.track_count += option_set['recording-count']
for track in option_set['recording-list']:
track['kind'] = "track"
self.result_list.append(track)
def automated_demo():
search = Search(query="psychonaut 4")
print(search.options)
print(search.choose(0))
search.download()
print(search.choose(2))
search.download()
print(search.choose(4))
print(search.download())
def interactive_demo():
search = Search(query=input("initial query: "))
print(search.options)
while True:
input_ = input(
"d to download, q to quit, .. for previous options, . for current options, int for this element: ").lower()
input_.strip()
if input_ == "q":
break
if input_ == ".":
print(search.options)
continue
if input_ == "..":
print(search.get_previous_options())
continue
if input_.isdigit():
print(search.choose(int(input_)))
continue
if input_ == "d":
search.download()
break
if __name__ == "__main__":
# interactive_demo()
# automated_demo()
search = Search(query="psychonaut 4")
# search.download_release("27f00fb8-983c-4d5c-950f-51418aac55dc")
search.download_release("1aeb676f-e556-4b17-b45e-64ab69ef0375")
# for track_ in search.download_artist("c0c720b5-012f-4204-a472-981403f37b12"):
# print(track_)
# res = search.download_track("83a30323-aee1-401a-b767-b3c1bdd026c0")
# res = search.download_track("5e1ee2c5-502c-44d3-b1bc-22803441d8c6")
res = search.download_track("86b43bec-eea6-40ae-8624-c1e404204ba1")
# res = search.download_track("5cc28584-10c6-40e2-b6d4-6891e7e7c575")
for key in res[0]:
if res[0][key] is None:
continue
print(key, res[0][key])

424
src/metadata/download.py Normal file
View File

@ -0,0 +1,424 @@
from typing import List
import musicbrainzngs
import pandas as pd
import logging
from datetime import date
from object_handeling import get_elem_from_obj, parse_music_brainz_date
mb_log = logging.getLogger("musicbrainzngs")
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
# IMPORTANT
# https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data
class Artist:
def __init__(
self,
musicbrainz_artistid: str,
release_groups: List = [],
new_release_groups: bool = True
):
"""
release_groups: list
"""
self.release_groups = release_groups
self.musicbrainz_artistid = musicbrainz_artistid
result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"])
artist_data = get_elem_from_obj(result, ['artist'], return_if_none={})
self.artist = get_elem_from_obj(artist_data, ['name'])
if not new_release_groups:
return
# sort all release groups by date and add album sort to have them in chronological order.
release_groups = artist_data['release-group-list']
for i, release_group in enumerate(release_groups):
release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
release_groups.sort(key=lambda x: x['first-release-date'])
for i, release_group in enumerate(release_groups):
self.release_groups.append(ReleaseGroup(
musicbrainz_releasegroupid=release_group['id'],
artists=[self],
albumsort=i + 1
))
def __str__(self):
newline = "\n"
return f"id: {self.musicbrainz_artistid}\nname: {self.artist}\n{newline.join([str(release_group) for release_group in self.release_groups])}"
class ReleaseGroup:
def __init__(
self,
musicbrainz_releasegroupid: str,
artists: List[Artist] = [],
albumsort: int = None,
only_download_distinct_releases: bool = True
):
"""
split_artists: list -> if len > 1: album_artist=VariousArtists
releases: list
"""
self.musicbrainz_releasegroupid = musicbrainz_releasegroupid
self.artists = artists
self.releases = []
result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid,
includes=["artist-credits", "releases"])
release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={})
artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={})
release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={})
for artist_data in artist_datas:
artist_id = get_elem_from_obj(artist_data, ['artist', 'id'])
if artist_id is None:
continue
self.append_artist(artist_id)
self.albumartist = "Various Artists" if len(self.artists) >= 1 else self.artists[0].artist
self.albumsort = albumsort
self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type'])
self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None
if only_download_distinct_releases:
self.append_distinct_releases(release_datas)
else:
self.append_all_releases(release_datas)
def __str__(self):
newline = "\n"
return f"{newline.join([str(release_group) for release_group in self.releases])}"
def append_artist(self, artist_id: str) -> Artist:
for existing_artist in self.artists:
if artist_id == existing_artist.musicbrainz_artistid:
return existing_artist
new_artist = Artist(artist_id, release_groups=[self], new_release_groups=False)
self.artists.append(new_artist)
return new_artist
def append_release(self, release_data: dict):
musicbrainz_albumid = get_elem_from_obj(release_data, ['id'])
if musicbrainz_albumid is None:
return
self.releases.append(Release(musicbrainz_albumid, release_group=self))
def append_distinct_releases(self, release_datas: List[dict]):
titles = {}
for release_data in release_datas:
title = get_elem_from_obj(release_data, ['title'])
if title is None:
continue
titles[title] = release_data
for key in titles:
self.append_release(titles[key])
def append_all_releases(self, release_datas: List[dict]):
for release_data in release_datas:
self.append_release(release_data)
class Release:
def __init__(
self,
musicbrainz_albumid: str,
release_group: ReleaseGroup = None
):
"""
release_group: ReleaseGroup
tracks: list
"""
self.musicbrainz_albumid = musicbrainz_albumid
self.release_group = release_group
self.tracklist = []
result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid, includes=["recordings", "labels"])
release_data = get_elem_from_obj(result, ['release'], return_if_none={})
label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={})
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
self.title = get_elem_from_obj(release_data, ['title'])
self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name'])
self.append_recordings(recording_datas)
def append_recordings(self, recording_datas: dict):
for recording_data in recording_datas:
musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['id'])
if musicbrainz_releasetrackid is None:
continue
self.tracklist.append(musicbrainz_releasetrackid)
def __str__(self):
return f"{self.title} ©{self.copyright}"
class Track:
def __init__(
self,
musicbrainz_releasetrackid: str,
release: Release = None
):
"""
release: Release
feature_artists: list
"""
self.musicbrainz_releasetrackid = musicbrainz_releasetrackid
self.release = release
def download(option: dict):
type_ = option['type']
mb_id = option['id']
metadata_list = []
if type_ == "artist":
artist = Artist(mb_id)
print(artist)
elif type_ == "release":
metadata_list = download_release(mb_id)
elif type_ == "track":
metadata_list = download_track(mb_id)
print(metadata_list)
metadata_df = pd.DataFrame(metadata_list)
# metadata_df.to_csv(os.path.join(self.temp, file))
return metadata_df
def download_artist(mb_id):
"""
Available includes: recordings, releases, release-groups, works, various-artists, discids, media, isrcs,
aliases, annotation, area-rels, artist-rels, label-rels, place-rels, event-rels, recording-rels,
release-rels, release-group-rels, series-rels, url-rels, work-rels, instrument-rels, tags, user-tags,
ratings, user-ratings
"""
metadata_list = []
# from this dict everything will be taken
following_data = {}
result = musicbrainzngs.get_artist_by_id(mb_id, includes=["release-groups", "releases"])
artist_data = result['artist']
# sort all release groups by date and add album sort to have them in chronological order.
release_groups = artist_data['release-group-list']
for i, release_group in enumerate(release_groups):
release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
release_groups.sort(key=lambda x: x['first-release-date'])
for i, release_group in enumerate(release_groups):
release_groups[i]['albumsort'] = i + 1
def numeric_release_type(release_type: str) -> int:
if release_type == "Album" or release_type == "EP":
return 1
return 2
release_groups.sort(key=lambda x: numeric_release_type(x['type']))
for release_group in release_groups:
download_release_groups()
def download_release(mb_id, album_sort: int = None):
"""
Available includes: artists, labels, recordings, release-groups, media, artist-credits, discids, isrcs,
recording-level-rels, work-level-rels, annotation, aliases, tags, user-tags, area-rels, artist-rels,
label-rels, place-rels, event-rels, recording-rels, release-rels, release-group-rels, series-rels, url-rels,
work-rels, instrument-rels
"""
def get_additional_artist_info(mb_id_):
r = musicbrainzngs.get_artist_by_id(mb_id_, includes=["releases"])
album_sort = 0
for i, release in enumerate(r["artist"]["release-list"]):
id_ = release["id"]
if id_ == mb_id:
album_sort = i
break
return album_sort
result = musicbrainzngs.get_release_by_id(mb_id, includes=["artists", "recordings", 'release-groups'])
if album_sort is None:
album_sort = get_additional_artist_info(
get_elem_from_obj(result, ['release', 'artist-credit', 0, 'artist', 'id']))
release_type = get_elem_from_obj(result, ['release', 'release-group', 'type'])
tracklist_metadata = []
is_various_artist = len(result['release']['artist-credit']) > 1
tracklist = result['release']['medium-list'][0]['track-list']
track_count = len(tracklist)
for track in tracklist:
track_id = track["recording"]["id"]
this_track = track["position"]
tracklist_metadata.extend(
download_track(track_id, is_various_artist=is_various_artist, track=this_track,
total_tracks=track_count, album_sort=album_sort, album_type=release_type,
release_data=result['release']))
return tracklist_metadata
def download_track(mb_id, is_various_artist: bool = None, track: int = None, total_tracks: int = None,
album_sort: int = None, album_type: str = None, release_data: dict = None):
"""
TODO
bpm its kind of possible via the AcousticBrainz API. however, the data may not be of very good
quality and AB is scheduled to go away in some time.
compilation Field that is used by iTunes to mark albums as compilation.
Either enter the value 1 or delete the field. https://en.wikipedia.org/wiki/Compilation_album
How should I get it? I don't fucking know. Now I do. Release Group Type is Compilation
composer, copyright, discsubtitle
'musicbrainz_discid',
'asin',
'performer',
'catalognumber',
'musicbrainz_releasetrackid',
'musicbrainz_releasegroupid',
'musicbrainz_workid',
'acoustid_fingerprint',
'acoustid_id'
DONE
album
title
artist
albumartist
tracknumber
!!!albumsort can sort albums cronological
titlesort is just set to the tracknumber to sort by track order to sort correctly
isrc
musicbrainz_artistid
musicbrainz_albumid
musicbrainz_albumartistid
musicbrainz_albumstatus
language
musicbrainz_albumtype
'releasecountry'
'barcode'
Album Art
"""
"""
Available includes: artists, releases, discids, media, artist-credits, isrcs, work-level-rels, annotation,
aliases, tags, user-tags, ratings, user-ratings, area-rels, artist-rels, label-rels, place-rels, event-rels,
recording-rels, release-rels, release-group-rels, series-rels, url-rels, work-rels, instrument-rels
"""
result = musicbrainzngs.get_recording_by_id(mb_id, includes=["artists", "releases", "recording-rels", "isrcs",
"work-level-rels"])
recording_data = result['recording']
isrc = get_elem_from_obj(recording_data, ['isrc-list', 0])
if release_data is None:
# choosing the last release, because it is the least likely one to be a single
release_data = recording_data['release-list'][-1]
mb_release_id = release_data['id']
title = recording_data['title']
artist = []
mb_artist_ids = []
for artist_ in recording_data['artist-credit']:
name_ = get_elem_from_obj(artist_, ['artist', 'name'])
if name_ is None:
continue
artist.append(name_)
mb_artist_ids.append(get_elem_from_obj(artist_, ['artist', 'id']))
def get_additional_artist_info(mb_id_):
r = musicbrainzngs.get_artist_by_id(mb_id_, includes=["releases"])
album_sort = 0
for i, release in enumerate(r["artist"]["release-list"]):
id_ = release["id"]
if id_ == mb_release_id:
album_sort = i
break
return album_sort
def get_additional_release_info(mb_id_):
r = musicbrainzngs.get_release_by_id(mb_id_,
includes=["artists", "recordings", "recording-rels", 'release-groups'])
is_various_artist_ = len(r['release']['artist-credit']) > 1
tracklist = r['release']['medium-list'][0]['track-list']
track_count_ = len(tracklist)
this_track_ = 0
for track in tracklist:
if track["recording"]["id"] == mb_id:
this_track_ = track["position"]
release_type = get_elem_from_obj(r, ['release', 'release-group', 'type'])
return is_various_artist_, this_track_, track_count_, release_type
album_id = get_elem_from_obj(release_data, ['id'])
album = get_elem_from_obj(release_data, ['title'])
album_status = get_elem_from_obj(release_data, ['status'])
language = get_elem_from_obj(release_data, ['text-representation', 'language'])
year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0])
date = get_elem_from_obj(release_data, ['date'])
if is_various_artist is None or track is None or total_tracks is None or album_type is None:
is_various_artist, track, total_tracks, album_type = get_additional_release_info(album_id)
if album_sort is None:
album_sort = get_additional_artist_info(mb_artist_ids[0])
album_artist = "Various Artists" if is_various_artist else artist[0]
album_artist_id = None if album_artist == "Various Artists" else mb_artist_ids[0]
compilation = "1" if album_type == "Compilation" else None
country = get_elem_from_obj(release_data, ['country'])
barcode = get_elem_from_obj(release_data, ['barcode'])
return [{
'id': mb_id,
'album': album,
'title': title,
'artist': artist,
'album_artist': album_artist,
'tracknumber': str(track),
'albumsort': album_sort,
'titlesort': track,
'isrc': isrc,
'date': date,
'year': year,
'musicbrainz_artistid': mb_artist_ids[0],
'musicbrainz_albumid': mb_release_id,
'musicbrainz_albumartistid': album_artist_id,
'musicbrainz_albumstatus': album_status,
'total_tracks': total_tracks,
'language': language,
'musicbrainz_albumtype': album_type,
'compilation': compilation,
'releasecountry': country,
'barcode': barcode
}]
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
download({'id': '5cfecbe4-f600-45e5-9038-ce820eedf3d1', 'type': 'artist'})
# download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'})
# download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'})

142
src/metadata/metadata.py Normal file
View File

@ -0,0 +1,142 @@
import logging
import musicbrainzngs
import options
from object_handeling import get_elem_from_obj
mb_log = logging.getLogger("musicbrainzngs")
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
KNOWN_KIND_OF_OPTIONS = ["artist", "release", "track"]
class Search:
def __init__(self, query: str = None, artist: str = None, temp: str = "temp"):
if query is None and artist is None:
raise ValueError("no query provided")
self.options_history = []
self.current_options = None
self.current_chosen_option = None
self.temp = temp
# initial search
if query is not None:
self.set_options(
options.Options([musicbrainzngs.search_artists(query), musicbrainzngs.search_releases(query),
musicbrainzngs.search_recordings(query)]))
elif artist is not None:
self.set_options(options.Options([musicbrainzngs.search_artists(artist=artist)]))
def browse_artist(self, artist: dict, limit: int = 25):
options_sets = [
{"artist-list": [artist, ], "artist-count": 1},
musicbrainzngs.browse_releases(artist=artist["id"], limit=limit),
musicbrainzngs.browse_recordings(artist=artist["id"], limit=limit)
]
return self.set_options(options.Options(options_sets))
def browse_release(self, release: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(release=release["id"], limit=limit),
{"release-list": [release, ], "release-count": 1},
musicbrainzngs.browse_recordings(release=release["id"], limit=limit)
]
return self.set_options(options.Options(options_sets))
def browse_track(self, track: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(recording=track["id"], limit=limit),
musicbrainzngs.browse_releases(recording=track["id"], limit=limit),
{"recording-list": [track, ], "recording-count": 1}
]
return self.set_options(options.Options(options_sets))
def choose(self, index, limit: int = 25, ignore_limit_for_tracklist: bool = True):
if not self.current_options.choose(index):
return self.current_options
self.current_chosen_option = self.current_options.get_current_option()
kind = self.current_chosen_option['kind']
if kind == 'artist':
return self.browse_artist(self.current_chosen_option, limit=limit)
if kind == 'release':
release_limit = limit if not ignore_limit_for_tracklist else 100
return self.browse_release(self.current_chosen_option, limit=release_limit)
if kind == 'track':
track_limit = limit if not ignore_limit_for_tracklist else 100
return self.browse_track(self.current_chosen_option, limit=track_limit)
return self.current_options
def get_options(self):
return self.current_options
def set_options(self, option_instance):
self.options_history.append(option_instance)
self.current_options = option_instance
return option_instance
def get_previous_options(self):
self.options_history.pop(-1)
self.current_options = self.options_history[-1]
return self.current_options
options = property(fget=get_options)
def automated_demo():
search = Search(query="psychonaut 4")
print(search.options)
print(search.choose(0))
search.download()
print(search.choose(2))
search.download()
print(search.choose(4))
print(search.download())
def interactive_demo():
search = Search(query=input("initial query: "))
print(search.options)
while True:
input_ = input(
"d to download, q to quit, .. for previous options, . for current options, int for this element: ").lower()
input_.strip()
if input_ == "q":
break
if input_ == ".":
print(search.options)
continue
if input_ == "..":
print(search.get_previous_options())
continue
if input_.isdigit():
print(search.choose(int(input_)))
continue
if input_ == "d":
search.download()
break
if __name__ == "__main__":
# interactive_demo()
# automated_demo()
search = Search(query="psychonaut 4")
# search.download_release("27f00fb8-983c-4d5c-950f-51418aac55dc")
search.download_release("1aeb676f-e556-4b17-b45e-64ab69ef0375")
# for track_ in search.download_artist("c0c720b5-012f-4204-a472-981403f37b12"):
# print(track_)
# res = search.download_track("83a30323-aee1-401a-b767-b3c1bdd026c0")
# res = search.download_track("5e1ee2c5-502c-44d3-b1bc-22803441d8c6")
res = search.download_track("86b43bec-eea6-40ae-8624-c1e404204ba1")
# res = search.download_track("5cc28584-10c6-40e2-b6d4-6891e7e7c575")
for key in res[0]:
if res[0][key] is None:
continue
print(key, res[0][key])

View File

@ -0,0 +1,22 @@
from datetime import date
def get_elem_from_obj(current_object, keys: list, after_process=lambda x: x, return_if_none=None):
current_object = current_object
for key in keys:
if key in current_object or (type(key) == int and key < len(current_object)):
current_object = current_object[key]
else:
return return_if_none
return after_process(current_object)
def parse_music_brainz_date(mb_date: str) -> date:
year = 1
month = 1
day = 1
first_release_date = mb_date
if first_release_date.count("-") == 2:
year, month, day = [int(i) for i in first_release_date.split("-")]
elif first_release_date.count("-") == 0:
year = int(first_release_date)
return date(year, month, day)

118
src/metadata/options.py Normal file

File diff suppressed because one or more lines are too long

View File

@ -11,6 +11,10 @@ session.headers = {
} }
def set_proxy(proxies):
session.proxies = proxies
def get_musify_url(row): def get_musify_url(row):
title = row.title title = row.title
artists = row.artist artists = row.artist
@ -75,6 +79,8 @@ def search_for_track(row):
soup = get_soup_of_search(f"{artist[0]} - {track}") soup = get_soup_of_search(f"{artist[0]} - {track}")
tracklist_container_soup = soup.find_all("div", {"class": "playlist"}) tracklist_container_soup = soup.find_all("div", {"class": "playlist"})
if len(tracklist_container_soup) == 0:
return None
if len(tracklist_container_soup) != 1: if len(tracklist_container_soup) != 1:
raise Exception("Connfusion Error. HTML Layout of https://musify.club changed.") raise Exception("Connfusion Error. HTML Layout of https://musify.club changed.")
tracklist_container_soup = tracklist_container_soup[0] tracklist_container_soup = tracklist_container_soup[0]
@ -113,12 +119,17 @@ if __name__ == "__main__":
import pandas as pd import pandas as pd
import json import json
TOR = True
if TOR:
set_proxy({
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
})
df = pd.read_csv("../temp/.cache1.csv") df = pd.read_csv("../temp/.cache1.csv")
for idx, row in df.iterrows(): for idx, row in df.iterrows():
row['artist'] = json.loads(row['artist'].replace("'", '"')) row['artist'] = json.loads(row['artist'].replace("'", '"'))
print("-" * 200) print("-" * 200)
print("fast")
print(get_musify_url(row))
print("slow") print("slow")
print(get_musify_url_slow(row)) print(get_musify_url_slow(row))